# Martech - Query Data

## Initialise Connection

In [12]:
from astra_db import AstraSession
from concurrent.futures import ThreadPoolExecutor

astra = AstraSession()
astra.prepare_statements()

## Define Entities

In [16]:
# Define Entities

# customer_id
customer_id_array = [
    '0b6ec30f-8b1a-406f-82f6-89222a017449',
    '8435a9b9-9e88-495c-a297-dfd25d44428b',
    '8e12d570-e47d-43b9-9435-b2020899860b',
    '9e53cd14-0316-46ff-b913-30228fcc5b73',
    'a6e198e0-674d-4a27-ba01-595a78fda2ef',
    'af9e65bd-4474-40c9-8d16-7b82035d3e0f',
    'b191cef3-4624-4e88-9a93-73de506e4be7',
    'bcabe5dd-8675-4a88-94ed-a9faa9473355',
    'cf3a5d68-e66f-4ee7-a2d1-2666c5366540',
    'f68e8bc9-86b7-4c8e-ba74-87d31d27a531',
    'bcabe5dd-8675-4a88-94ed-a9faa9473355',
]

# channel
channel_array = ['email', 'sms', 'push', 'in-app', 'direct']

# category_group
category_group_array = ['ctgy_grp_1', 'ctgy_grp_2', 'ctgy_grp_3', 'ctgy_grp_4', 'ctgy_grp_5']

# category
category_array = ['ctgy_1', 'ctgy_2', 'ctgy_3', 'ctgy_4', 'ctgy_5']

# activity_name
activity_name_array = ['credit_card', 'mortgage', 'business', 'retail', 'investment']

## Query Data

In [17]:
# function to iterate over each day in the range between start_date and end_date
def iterate_hours(start, end):
    for bucket in range(start, end):
        month = int(str(bucket)[4:6])
        if month > 12:
            continue
        day = int(str(bucket)[6:8])
        if day > 31:
            continue
        
        yield bucket

In [36]:
start_date = 20250101
end_date = 20250108

### PROFILECAP

#### Serial Execution

In [None]:
# comm_date_bucket - yyyymmdd - 20241201
row_count = 0
for bucket in iterate_hours(start_date, end_date):
    rows = astra.session.execute(astra.get_profile_cap_stmt, [customer_id_array[0], 'email', bucket])
    row_count += sum(1 for _ in rows)  # Count rows

print(f"Count: {row_count}")

#### Parallel Execution

In [None]:
def execute_statement(bucket):
    rows = astra.session.execute(astra.get_profile_cap_stmt, [customer_id_array[0], 'email', bucket])
    return sum(1 for _ in rows)  # Count rows

row_count = 0
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = [executor.submit(execute_statement, bucket) for bucket in list(iterate_hours(start_date, end_date))]
    row_count += sum(future.result() for future in futures)

print(f"Count: {row_count}")

### ACTYCAP

#### Serial Execution

In [None]:
# comm_date_bucket - yyyymmddhh - 2024120100
row_count = 0
for bucket in iterate_hours(start_date, end_date):
    rows = astra.session.execute(astra.get_acty_cap_stmt, ['credit_card', bucket])
    row_count += sum(1 for _ in rows)  # Count rows

print(f"Count: {row_count}")

#### Parallel Execution

In [None]:
def execute_statement(bucket):
    rows = astra.session.execute(astra.get_acty_cap_stmt, ['credit_card', bucket])
    return sum(1 for _ in rows)  # Count rows

row_count = 0
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = [executor.submit(execute_statement, bucket) for bucket in list(iterate_hours(start_date, end_date))]
    row_count += sum(future.result() for future in futures)

print(f"Count: {row_count}")

### PRTYCAP

#### Serial Execution

In [None]:
# comm_date_bucket - yyyymmddhh - 2024120100
row_count = 0
for bucket in iterate_hours(start_date, end_date):
    rows = astra.session.execute(astra.get_prty_cap_stmt, ['ctgy_grp_1', 'ctgy_1', bucket])
    row_count += sum(1 for _ in rows)  # Count rows

print(f"Count: {row_count}")

#### Parallel Execution

In [None]:
def execute_statement(bucket):
    rows = astra.session.execute(astra.get_prty_cap_stmt, ['ctgy_grp_1', 'ctgy_1', bucket])
    return sum(1 for _ in rows)  # Count rows

row_count = 0
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = [executor.submit(execute_statement, bucket) for bucket in list(iterate_hours(start_date, end_date))]
    row_count += sum(future.result() for future in futures)

print(f"Count: {row_count}")

### CHANNELCAP

#### Serial Execution

In [None]:
# comm_date_bucket - yyyymmddhh - 2024120100
row_count = 0
for bucket in iterate_hours(start_date, end_date):
    rows = astra.session.execute(astra.get_channel_cap_stmt, ['email', bucket])
    row_count += sum(1 for _ in rows)  # Count rows

print(f"Count: {row_count}")

#### Parallel Execution

In [None]:
def execute_prepare_statement(bucket):
    rows = astra.session.execute(astra.get_channel_cap_stmt, ['email', bucket])
    return sum(1 for _ in rows)  # Count rows

row_count = 0
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = [executor.submit(execute_prepare_statement, bucket) for bucket in list(iterate_hours(start_date, end_date))]
    row_count += sum(future.result() for future in futures)

print(f"Count: {row_count}")

## Cleanup

In [3]:
astra.shutdown()