In [1]:
!pip install PyAthena[SQLAlchemy]
!aws configure get region
from sqlalchemy import create_engine

!pip install CurrencyConverter
import tr_dash_util as util
import numpy as np 
import html 
import json
import re
import pandas as pd
from datetime import datetime, timedelta
from currency_converter import CurrencyConverter, ECB_URL
from tr_dash_util import upload_dataframe_to_s3, extract_eReg_orders, clean_TR_dataframe, fetch_data_from_s3
from functools import reduce
import pytz

import boto3
# Set time and environment
today = datetime.now() - timedelta(days=1)
prod = True

# S3 path for Athena
s3_staging_dir = "s3://ets-aws-plalab-dii-prod-analyticsbucket-1ktrlhzbrcbkb/athena_query_results/"

# Athena connection string
connection_string = f"awsathena+rest://:@athena.us-east-1.amazonaws.com:443/labsprodeventsdatabase-x806vjuzpbrd?s3_staging_dir={s3_staging_dir}"
engine = create_engine(connection_string)

def set_date_range(start_date='2024-02-01', end_date='2024-05-27'):
    """
    Returns the start and end dates as a tuple.
    """
    return start_date, end_date

def pull_parquet_files_pandas(file_name, bucket_name='ets-aws-plalab-dii-prod-analyticsbucket-1ktrlhzbrcbkb'):
    """
    Pulls Parquet files from a specified S3 bucket and concatenates them into a single DataFrame.
    """
    s3_client = boto3.client('s3')
    prefix = f'sagemaker/test_ready/intermediate_files/file_name={file_name}/'

    def list_parquet_files(bucket, prefix):
        """List all Parquet files within the structured partitioning scheme in S3."""
        file_paths = []
        paginator = s3_client.get_paginator('list_objects_v2')
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for content in page.get('Contents', []):
                key = content.get('Key')
                if key.endswith('.parquet'):  # Ensure we're only capturing Parquet files
                    file_paths.append(f"s3://{bucket}/{key}")
        return file_paths

    file_paths = list_parquet_files(bucket_name, prefix)
    df = pd.DataFrame()
    for file_path in file_paths:
        temp_df = pd.read_parquet(file_path, engine='pyarrow')
        df = pd.concat([df, temp_df], ignore_index=True)
    return df

def pull_parquet_files_pandas_Target(file_name, bucket_name='ets-dii-testready-ds-analytics'):
    """
    Pulls Parquet files from a specified S3 bucket and concatenates them into a single DataFrame.
    """
    s3_client = boto3.client('s3')
    prefix = f'Eben/Features/file_name={file_name}/'

    def list_parquet_files(bucket, prefix):
        """List all Parquet files within the structured partitioning scheme in S3."""
        file_paths = []
        paginator = s3_client.get_paginator('list_objects_v2')
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for content in page.get('Contents', []):
                key = content.get('Key')
                if key.endswith('.parquet'):  # Ensure we're only capturing Parquet files
                    file_paths.append(f"s3://{bucket}/{key}")
        return file_paths

    file_paths = list_parquet_files(bucket_name, prefix)
    df = pd.DataFrame()
    for file_path in file_paths:
        temp_df = pd.read_parquet(file_path, engine='pyarrow')
        df = pd.concat([df, temp_df], ignore_index=True)
    return df

def pull_parquet_files_pandas_features(file_name, bucket_name='ets-dii-testready-ds-analytics'):
    """
    Pulls Parquet files from a specified S3 bucket and concatenates them into a single DataFrame.

    Args:
        file_name (str): The name of the file (or file identifier) to search for in the S3 bucket.
        bucket_name (str, optional): The name of the S3 bucket to search in. Defaults to 'ets-dii-testready-ds-analytics'.

    Returns:
        pandas.DataFrame: A DataFrame containing the concatenated data from all found Parquet files.
    """
    s3_client = boto3.client('s3')
    prefix = f'features/file_name={file_name}/'

    def list_parquet_files(bucket, prefix):
        """List all Parquet files within the structured partitioning scheme in S3."""
        file_paths = []
        paginator = s3_client.get_paginator('list_objects_v2')
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for content in page.get('Contents', []):
                key = content.get('Key')
                if key.endswith('.parquet'):  # Ensure we're only capturing Parquet files
                    file_paths.append(f"s3://{bucket}/{key}")
        return file_paths

    file_paths = list_parquet_files(bucket_name, prefix)

    # Initialize an empty DataFrame
    df = pd.DataFrame()

    for file_path in file_paths:
        # Read each Parquet file into a DataFrame
        temp_df = pd.read_parquet(file_path, engine='pyarrow')
        # Concatenate to the main DataFrame
        df = pd.concat([df, temp_df], ignore_index=True)
    return df

def get_first_purchase_dates(orders_df):
    """
    Extracts the first purchase date for each user.
    Args:
        orders_df (pd.DataFrame): DataFrame containing order data with 'user_id' and 'order_sumitted'.
    Returns:
        pd.DataFrame: DataFrame with 'user_id' and 'first_purchase_date'.
    """
    # Convert order_sumitted to datetime
    orders_df['order_sumitted'] = pd.to_datetime(orders_df['order_sumitted'], unit='s', errors='coerce')
    
    # Filter out invalid dates (NaT values)
    #orders_df = orders_df.dropna(subset=['order_sumitted'])
    
    # Get the first purchase date for each user
    first_purchase_dates = orders_df.groupby('user_id')['order_sumitted'].min().reset_index()
    first_purchase_dates.rename(columns={'order_sumitted': 'first_purchase_date'}, inplace=True)
    
    return first_purchase_dates

def filter_events_before_first_purchase(events_df, first_purchase_dates):
    """
    Filters events to include only those that occurred before the user's first purchase.
    Args:
        events_df (pd.DataFrame): DataFrame containing event data with 'user_id' and 'date'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
    Returns:
        pd.DataFrame: Filtered DataFrame.
    """
    events_df['date'] = pd.to_datetime(events_df['date'])
    merged_df = events_df.merge(first_purchase_dates, on='user_id', how='left')
    filtered_df = merged_df[merged_df['date'] < merged_df['first_purchase_date']].copy()
    return filtered_df

def aggregate_study_plans(data, first_purchase_dates, date_range):
    """
    Aggregates study plan data at the user level within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing study plan data with columns 'date', 'user_id', 'country', 'plan_num'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'first_date', 'country', and 'number_of_study_plans'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    user_level_data = filtered_data.groupby('user_id').agg({
        'date': 'min',
        'country': 'first',
        'plan_num': 'count'
    }).reset_index()
    user_level_data.rename(columns={'date': 'first_date', 'plan_num': 'number_of_study_plans'}, inplace=True)
    return user_level_data

def max_streak_within_date_range(data, first_purchase_dates, date_range):
    """
    Calculates the maximum streak for each user within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing streak data with columns 'date', 'user_id', 'max_days_in_a_row', 'country_code'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'max_streak'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    filtered_data.loc[:, 'country_code'] = filtered_data['country_code'].fillna('Unknown')
    user_level_data = filtered_data.groupby('user_id').agg({
        'max_days_in_a_row': 'max',
        'country_code': 'first'
    }).reset_index()
    user_level_data.rename(columns={'max_days_in_a_row': 'max_streak'}, inplace=True)
    return user_level_data

def count_free_test_usage(data, first_purchase_dates, date_range):
    """
    Counts the number of times each user has used the 'Free' test type within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing prep data with columns 'date', 'user_id', 'country_code', 'prep_type', 'prep_time'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'free_test_count'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    free_test_data = filtered_data[filtered_data['prep_type'] == 'Free']
    free_test_count = free_test_data.groupby('user_id').agg({
        'prep_type': 'count',
        'country_code': 'first'
    }).reset_index()
    free_test_count.rename(columns={'prep_type': 'free_test_count'}, inplace=True)
    return free_test_count

def aggregate_free_test_section(data, first_purchase_dates, date_range):
    """
    Aggregates free test section data at the user level within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing free test section data with columns 'date', 'user_id', 'event_name', 'skill', 'country_code', 'backend_timestamp', 'session'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'free_test_section_count'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    user_level_data = filtered_data.groupby('user_id').agg({
        'country_code': 'first',
        'event_name': 'count'
    }).reset_index()
    user_level_data.rename(columns={'event_name': 'free_test_section_count'}, inplace=True)
    return user_level_data

def aggregate_page_view(data, first_purchase_dates, date_range):
    """
    Aggregates page view data at the user level within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing page view data with columns 'date', 'user_id', 'event_name', 'screen', 'country_code', 'backend_timestamp', 'session'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'page_view_count'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    user_level_data = filtered_data.groupby('user_id').agg({
        'country_code': 'first',
        'event_name': 'count'
    }).reset_index()
    user_level_data.rename(columns={'event_name': 'page_view_count'}, inplace=True)
    return user_level_data

def aggregate_paid_prep(data, first_purchase_dates, date_range):
    """
    Aggregates paid prep data at the user level within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing paid prep data with columns 'date', 'user_id', 'event_name', 'prep_type', 'au_num', 'country_code', 'backend_timestamp', 'session'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'paid_prep_count'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    user_level_data = filtered_data.groupby('user_id').agg({
        'country_code': 'first',
        'event_name': 'count'
    }).reset_index()
    user_level_data.rename(columns={'event_name': 'paid_prep_count'}, inplace=True)
    return user_level_data

def aggregate_view_feedback(data, first_purchase_dates, date_range):
    """
    Aggregates view feedback data at the user level within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing view feedback data with columns 'date', 'user_id', 'event_name', 'prep_type', 'skill', 'country_code', 'backend_timestamp', 'session'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'view_feedback_count'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    user_level_data = filtered_data.groupby('user_id').agg({
        'country_code': 'first',
        'event_name': 'count'
    }).reset_index()
    user_level_data.rename(columns={'event_name': 'view_feedback_count'}, inplace=True)
    return user_level_data

def aggregate_view_plan(data, first_purchase_dates, date_range):
    """
    Aggregates view plan data at the user level within a specified date range, excluding events after first purchase.
    Args:
        data (pd.DataFrame): The original DataFrame containing view plan data with columns 'date', 'user_id', 'event_name', 'country_code', 'backend_timestamp', 'session'.
        first_purchase_dates (pd.DataFrame): DataFrame with 'user_id' and 'first_purchase_date'.
        date_range (tuple): Tuple containing the start and end dates.
    Returns:
        pd.DataFrame: A DataFrame aggregated at the user level with columns 'user_id', 'country_code', and 'view_plan_count'.
    """
    start_date, end_date = date_range
    data['date'] = pd.to_datetime(data['date'])
    filtered_data = data[(data['date'] >= start_date) & (data['date'] <= end_date)]
    filtered_data = filter_events_before_first_purchase(filtered_data, first_purchase_dates)
    user_level_data = filtered_data.groupby('user_id').agg({
        'country_code': 'first',
        'event_name': 'count'
    }).reset_index()
    user_level_data.rename(columns={'event_name': 'view_plan_count'}, inplace=True)
    return user_level_data

def process_data():
    """
    Processes data by pulling Parquet files from S3, aggregating study plan data, calculating maximum streaks,
    counting free test usage, and aggregating free test section, page view, paid prep, view feedback, and view plan data for users within a specified date range.
    Saves the results as Parquet files.
    """
    date_range = set_date_range()
    
    # Pull data from S3
    study_plan_data = pull_parquet_files_pandas(file_name='personalized_plan')
    max_streak_data = pull_parquet_files_pandas(file_name='user_max_streak')
    free_test_data = pull_parquet_files_pandas(file_name='prep_time')
    free_test_section_data = pull_parquet_files_pandas_features(file_name='free_test_section')
    page_view_data = pull_parquet_files_pandas_features(file_name='page_view')
    paid_prep_data = pull_parquet_files_pandas_features(file_name='paid_prep')
    view_feedback_data = pull_parquet_files_pandas_features(file_name='view_feedback')
    view_plan_data = pull_parquet_files_pandas_features(file_name='view_plan')
    target_data = pull_parquet_files_pandas_Target(file_name='Target')

    # Get first purchase dates
    first_purchase_dates = get_first_purchase_dates(target_data)

    # Process data
    modified_aggregated_study_plan_data = aggregate_study_plans(study_plan_data, first_purchase_dates, date_range)
    modified_max_streak_user_data = max_streak_within_date_range(max_streak_data, first_purchase_dates, date_range)
    modified_free_test_usage_data = count_free_test_usage(free_test_data, first_purchase_dates, date_range)
    aggregated_free_test_section_data = aggregate_free_test_section(free_test_section_data, first_purchase_dates, date_range)
    aggregated_page_view_data = aggregate_page_view(page_view_data, first_purchase_dates, date_range)
    aggregated_paid_prep_data = aggregate_paid_prep(paid_prep_data, first_purchase_dates, date_range)
    aggregated_view_feedback_data = aggregate_view_feedback(view_feedback_data, first_purchase_dates, date_range)
    aggregated_view_plan_data = aggregate_view_plan(view_plan_data, first_purchase_dates, date_range)

    # Save results to Parquet files
    modified_aggregated_study_plan_data.to_parquet('modified_aggregated_study_plan_data.parquet', index=False)
    modified_max_streak_user_data.to_parquet('modified_max_streak_user_data.parquet', index=False)
    modified_free_test_usage_data.to_parquet('modified_free_test_usage_data.parquet', index=False)
    aggregated_free_test_section_data.to_parquet('aggregated_free_test_section_data.parquet', index=False)
    aggregated_page_view_data.to_parquet('aggregated_page_view_data.parquet', index=False)
    aggregated_paid_prep_data.to_parquet('aggregated_paid_prep_data.parquet', index=False)
    aggregated_view_feedback_data.to_parquet('aggregated_view_feedback_data.parquet', index=False)
    aggregated_view_plan_data.to_parquet('aggregated_view_plan_data.parquet', index=False)

    # Output results
    print("Modified Aggregated Study Plan Data:")
    print(modified_aggregated_study_plan_data)
    print("\nModified Max Streak Data:")
    print(modified_max_streak_user_data)
    print("\nModified Free Test Usage Data:")
    print(modified_free_test_usage_data)
    print("\nAggregated Free Test Section Data:")
    print(aggregated_free_test_section_data)
    print("\nAggregated Page View Data:")
    print(aggregated_page_view_data)
    print("\nAggregated Paid Prep Data:")
    print(aggregated_paid_prep_data)
    print("\nAggregated View Feedback Data:")
    print(aggregated_view_feedback_data)
    print("\nAggregated View Plan Data:")
    print(aggregated_view_plan_data)

# Call the process_data function to execute the workflow
process_data()

us-east-1


  engine = create_engine(connection_string)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  events_df['date'] = pd.to_datetime(events_df['date'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  events_df['date'] = pd.to_datetime(events_df['date'])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  events_df['date'] = pd.to_datetime(events_df['date'])

Modified Aggregated Study Plan Data:
       user_id                 first_date country  number_of_study_plans
0     13F987F6 2024-02-01 12:05:38.203954     CHN                      3
1     13F98A7A 2024-02-01 12:05:38.203954     CAN                      1
2     13F98AHC 2024-02-11 09:10:24.676880     USA                      1
3     13F98B34 2024-02-01 12:05:38.203954     PRT                      1
4     13F98E1G 2024-02-02 12:15:02.136571     DJI                      8
...        ...                        ...     ...                    ...
4331  9PFD1E4D 2024-04-05 08:15:03.533333     USA                      2
4332  9QFE51C5 2024-05-20 08:15:02.720665     NPL                      1
4333  9QG43163 2024-02-05 08:15:02.332097     USA                     12
4334  9TG483G9 2024-03-09 19:26:23.757586     JPN                      3
4335  9YG30GB1 2024-03-27 08:15:03.101694     USA                      5

[4336 rows x 4 columns]

Modified Max Streak Data:
       user_id  max_streak country_