# Install pandas

In [3]:
!pip install pandas

Defaulting to user installation because normal site-packages is not writeable


# Main Code

In [46]:
import logging
import json
import pandas as pd
from datetime import datetime

DTYPE_PRINTS = {
    'day': 'str',
    'user_id': 'int',
    'position': 'int',
    'value_prop': 'str'
}

DTYPE_TAPS = {
    'day': 'str',
    'user_id': 'int',
    'position': 'int',
    'value_prop': 'str'
}

DTYPE_PAYS = {
    'pay_date': 'str',
    'total': 'float',
    'user_id': 'int',
    'value_prop': 'str'
}

DTYPE_EXPECTED_RESULTS = {
    'user_id': 'int',
    'day': 'datetime64[ns]',
    'value_prop': 'str',
    'clicked': 'bool',
    'views_3w': 'int',
    'clicks_3w': 'int',
    'payments_3w': 'int',
    'total_spent_3w': 'float'
}

PRINTS_PATH = 'prints.json'
TAPS_PATH = 'taps.json'
PAYS_PATH = 'pays.csv'
EXPECTED_RESULT_PATH = "expected_result.csv"

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Libraries calls ok")
START_TIME = datetime.now()
logging.info(f"Start Time : {START_TIME.strftime('%Y%m%d_%H%M%S')} ")

def read_json_lines_to_dataframe(json_lines_path, dtypes):
    """
    Builds a pandas DataFrame from JSON lines with specified column names and data types.
    
    Parameters:
    json_lines_path (str): Path to a file containing JSON lines.
    dtypes (dict): Dictionary mapping column names to their expected data types.
    
    Returns:
    pd.DataFrame: DataFrame containing the specified columns and data types.
    """
    try:
        with open(json_lines_path, 'r') as file:
            json_lines_str = file.read()

        json_lines = [json.loads(line) for line in json_lines_str.strip().split('\n')]
        df = pd.json_normalize(json_lines)

        columns = list(dtypes.keys())

        column_mapping = {
            'day': columns[0],
            'user_id': columns[1],
            'event_data.position': columns[2],
            'event_data.value_prop': columns[3]
        }
        df = df.rename(columns=column_mapping)
        df = df[columns]
        
        for col, dtype in dtypes.items():
            df[col] = df[col].astype(dtype)
            
        return df
    
    except FileNotFoundError:
        logging.info(f"Error: The file at {json_lines_path} was not found.")
    except ValueError as e:
        logging.info(f"Error: Could not parse JSON. {e}")
    except Exception as e:
        logging.info(f"An unexpected error occurred: {e}")

def read_csv_to_dataframe(file_path, dtypes):
    """
    Reads a CSV file into a DataFrame with specified data types for columns.
    
    Parameters:
    file_path (str): Path to the CSV file.
    dtypes (dict): Dictionary mapping column names to their data types.
    
    Returns:
    pd.DataFrame: DataFrame containing the CSV data with specified data types.
    """
    try:
        df = pd.read_csv(file_path)
        columns = list(dtypes.keys())

        column_mapping = {
            'pay_date': columns[0],
            'total': columns[1],
            'user_id': columns[2],
            'value_prop': columns[3]
        }
        df = df.rename(columns=column_mapping)
        df = df[columns]
        
        for col, dtype in dtypes.items():
            df[col] = df[col].astype(dtype)
            
        return df
    except Exception as e:
        logging.info(f"An error occurred while reading the CSV file: {e}")

def process_and_save_dataframe(df_prints, df_taps, df_pays, output_csv_path):
    """Process the input DataFrames to calculate various metrics for user interactions and save the processed data to a CSV file.

    Args:
        df_prints (pandas.DataFrame): DataFrame containing print data.
        df_taps (pandas.DataFrame): DataFrame containing tap data.
        df_pays (pandas.DataFrame): DataFrame containing pay data.
        output_csv_path (str): Path to save the processed data CSV file.

    Returns:
        pandas.DataFrame: Processed DataFrame containing calculated metrics.

    The function performs the following steps:
    1. Strips whitespace from 'value_prop' columns in all input DataFrames.
    2. Converts date columns to datetime and normalizes them.
    3. Calculates the last print date and the start date of the last week.
    4. Filters prints for the last week based on the last week start date.
    5. Merges taps data with prints to add a 'clicked' column.
    6. Merges taps and pays data with prints.
    7. Filters prints, taps, and pays data for the last three weeks.
    8. Aggregates views, clicks, payments, and total spent in the last three weeks.
    9. Merges aggregated data with prints.
    10. Selects required columns and fills NA values with 0.
    11. Converts 'user_id' to int, 'value_prop' to str, 'clicked' to bool, 'views_3w' and 'clicks_3w' to int,
        'payments_3w' to int, and 'total_spent_3w' to float.
    12. Rounds 'total_spent_3w' to 4 decimal places.
    13. Saves the processed DataFrame to the specified CSV file.
    """
    try:
        logging.info("Starting data processing")

        logging.info("Stripping whitespace from 'value_prop' columns")
        df_prints['value_prop'] = df_prints['value_prop'].str.strip()
        df_taps['value_prop'] = df_taps['value_prop'].str.strip()
        df_pays['value_prop'] = df_pays['value_prop'].str.strip()

        logging.info("Converting date columns to datetime and normalizing")
        df_prints['day'] = pd.to_datetime(df_prints['day']).dt.normalize()
        df_taps['day'] = pd.to_datetime(df_taps['day']).dt.normalize()
        df_pays['pay_date'] = pd.to_datetime(df_pays['pay_date']).dt.normalize()

        logging.info("Calculating the last print date and last week start date")
        last_print_date = df_prints['day'].max()
        last_week_start = last_print_date - pd.DateOffset(weeks=1)

        logging.info("Filtering prints for the last week")
        df_prints_last_week = df_prints[df_prints['day'] >= last_week_start]

        logging.info("Merging taps data with prints to add clicked column")
        df_prints_last_week = df_prints_last_week.merge(
            df_taps[['user_id', 'value_prop']].drop_duplicates(),
            on=['user_id', 'value_prop'],
            how='left',
            indicator=True
        )
        df_prints_last_week['clicked'] = df_prints_last_week['_merge'] == 'both'
        df_prints_last_week = df_prints_last_week.dropna(how='any')
        
        logging.info("Adding columns print_date to df_prints_last_week")
        df_prints_last_week['print_date'] = df_prints_last_week['day']

        logging.info("number of times the user viewed each value prop in the 3 weeks prior to each print.")
        df_prints_merged = df_prints.merge(df_prints_last_week[['user_id', 'value_prop', 'print_date']],
                                        on=['user_id', 'value_prop'], 
                                        how='left')
        df_prints_merged = df_prints_merged.dropna(how='any')
        df_prints_filtered = df_prints_merged[(df_prints_merged['day'] >= df_prints_merged['print_date'] - pd.DateOffset(weeks=3)) &
                                             (df_prints_merged['day'] < df_prints_merged['print_date'])][['user_id','day','print_date','value_prop']]
        logging.info("Aggregating views in the last three weeks")
        user_value_prop_views = df_prints_filtered.groupby(
            ['user_id', 'value_prop','print_date']).size().reset_index(name='views_3w')

        logging.info("number of times the user clicked each value prop in the 3 weeks prior to each print.")
        df_taps_merged = df_taps.merge(df_prints_last_week[['user_id', 'value_prop', 'print_date']],
                                        on=['user_id', 'value_prop'], 
                                        how='left')
        df_taps_merged = df_taps_merged.dropna(how='any')
        df_taps_filtered = df_taps_merged[(df_taps_merged['day'] >= df_taps_merged['print_date'] - pd.DateOffset(weeks=3)) &
                                           (df_taps_merged['day'] < df_taps_merged['print_date'])][['user_id','print_date','value_prop']]
        logging.info("Aggregating clicks in the last three weeks")
        user_value_prop_clicks = df_taps_filtered.groupby(
            ['user_id', 'value_prop','print_date']).size().reset_index(name='clicks_3w')

        logging.info("number of payments that the user made for each value prop in the 3 weeks prior to each print.")
        df_pays_merged = df_pays.merge(df_prints_last_week[['user_id', 'value_prop', 'print_date']], 
                                        on=['user_id', 'value_prop'], 
                                        how='left')
        df_pays_merged = df_pays_merged.dropna(how='any')
        df_pays_filtered = df_pays_merged[(df_pays_merged['pay_date'] >= df_pays_merged['print_date'] - pd.DateOffset(weeks=3)) &
                                       (df_pays_merged['pay_date'] < df_pays_merged['print_date'])]
        logging.info("Aggregating payments and total spent in the last three weeks")
        user_value_prop_pays = df_pays_filtered.groupby(['user_id', 'value_prop','print_date']).agg(
            payments_3w=('total', 'size'), total_spent_3w=('total', 'sum')).reset_index()

        logging.info("Merging aggregated data with prints")
        df_prints_last_week = df_prints_last_week.merge(user_value_prop_views, how='left', on=['user_id', 'value_prop','print_date'])
        df_prints_last_week = df_prints_last_week.merge(user_value_prop_clicks, how='left', on=['user_id', 'value_prop','print_date'])
        df_prints_last_week = df_prints_last_week.merge(user_value_prop_pays, how='left', on=['user_id', 'value_prop','print_date'])

        logging.info("Selecting required columns and filling NAs with 0")
        df_prints_last_week = df_prints_last_week[[
            'user_id', 'day', 'value_prop', 'clicked', 'views_3w', 'clicks_3w', 'payments_3w', 'total_spent_3w']]
        df_prints_last_week.fillna(0, inplace=True)
        df_prints_last_week['total_spent_3w'] = df_prints_last_week['total_spent_3w'].round(4)

        logging.info(f"Saving processed DataFrame to {output_csv_path}")
        df_prints_last_week = df_prints_last_week.sort_values(by=['user_id', 'value_prop'])
        dtype_mapping = {
            'user_id': int,
            'day': 'datetime64[ns]',
            'value_prop': str,
            'clicked': bool,
            'views_3w': int,
            'clicks_3w': int,
            'payments_3w': int,
            'total_spent_3w': float
        }

        for col, dtype in DTYPE_EXPECTED_RESULTS.items():
            df_prints_last_week[col] = df_prints_last_week[col].astype(dtype)
            
        df_prints_last_week.to_csv(output_csv_path, index=False)
        logging.info("Processed DataFrame saved")

        return df_prints_last_week

    except Exception as e:
        logging.error(f"Error occurred: {str(e)}")
        raise

def main():
    try:
        logging.info("Reading prints.json")
        df_prints = read_json_lines_to_dataframe(PRINTS_PATH, DTYPE_PRINTS)
        
        logging.info("Reading taps.json")
        df_taps = read_json_lines_to_dataframe(TAPS_PATH, DTYPE_TAPS)
        
        logging.info("Reading pays.csv")
        df_pays = read_csv_to_dataframe(PAYS_PATH, DTYPE_PAYS)

        logging.info("Transform data and saving output dataset in output3.csv")
        df_expected_result = process_and_save_dataframe(df_prints, df_taps, df_pays, expected_result_path)

    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")

if __name__ == '__main__':
    main()
    END_TIME = datetime.now()
    logging.info(f"End Time : {END_TIME.strftime('%Y%m%d_%H%M%S')} ")
    logging.info(f"Duration: {END_TIME - START_TIME}")
    logging.info('PROCESS_EXECUTED_SUCCESSFULLY')

2024-06-10 19:32:34,470 - INFO - Libraries calls ok
2024-06-10 19:32:34,471 - INFO - Start Time : 20240610_193234 
2024-06-10 19:32:34,475 - INFO - Reading prints.json
2024-06-10 19:32:39,950 - INFO - Reading taps.json
2024-06-10 19:32:40,475 - INFO - Reading pays.csv
2024-06-10 19:32:40,792 - INFO - Transform data and saving output dataset in output3.csv
2024-06-10 19:32:40,793 - INFO - Starting data processing
2024-06-10 19:32:40,794 - INFO - Stripping whitespace from 'value_prop' columns
2024-06-10 19:32:41,131 - INFO - Converting date columns to datetime and normalizing
2024-06-10 19:32:41,440 - INFO - Calculating the last print date and last week start date
2024-06-10 19:32:41,443 - INFO - Filtering prints for the last week
2024-06-10 19:32:41,457 - INFO - Merging taps data with prints to add clicked column
2024-06-10 19:32:41,577 - INFO - Adding columns print_date to df_prints_last_week
2024-06-10 19:32:41,579 - INFO - number of times the user viewed each value prop in the 3 week

####  Manual testing to verify per user that expected_result.csv is ok , using the original files prints.json, taps.json, and pays.csv

In [47]:
import logging
import json
import pandas as pd
from datetime import datetime

DTYPE_PRINTS = {
    'day': 'str',
    'user_id': 'int',
    'position': 'int',
    'value_prop': 'str'
}

DTYPE_TAPS = {
    'day': 'str',
    'user_id': 'int',
    'position': 'int',
    'value_prop': 'str'
}

DTYPE_PAYS = {
    'pay_date': 'str',
    'total': 'float',
    'user_id': 'int',
    'value_prop': 'str'
}

PRINTS_PATH = 'prints.json'
TAPS_PATH = 'taps.json'
PAYS_PATH = 'pays.csv'

USER_ID_TO_TEST = 1


logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Libraries calls ok")
START_TIME = datetime.now()
logging.info(f"Start Time : {START_TIME.strftime('%Y%m%d_%H%M%S')} ")

def read_json_lines_to_dataframe(json_lines_path, dtypes):
    """
    Builds a pandas DataFrame from JSON lines with specified column names and data types.
    
    Parameters:
    json_lines_path (str): Path to a file containing JSON lines.
    dtypes (dict): Dictionary mapping column names to their expected data types.
    
    Returns:
    pd.DataFrame: DataFrame containing the specified columns and data types.
    """
    try:
        with open(json_lines_path, 'r') as file:
            json_lines_str = file.read()

        json_lines = [json.loads(line) for line in json_lines_str.strip().split('\n')]
        df = pd.json_normalize(json_lines)

        columns = list(dtypes.keys())

        column_mapping = {
            'day': columns[0],
            'user_id': columns[1],
            'event_data.position': columns[2],
            'event_data.value_prop': columns[3]
        }
        df = df.rename(columns=column_mapping)
        df = df[columns]
        
        for col, dtype in dtypes.items():
            df[col] = df[col].astype(dtype)
            
        return df
    
    except FileNotFoundError:
        logging.info(f"Error: The file at {json_lines_path} was not found.")
    except ValueError as e:
        logging.info(f"Error: Could not parse JSON. {e}")
    except Exception as e:
        logging.info(f"An unexpected error occurred: {e}")

def read_csv_to_dataframe(file_path, dtypes):
    """
    Reads a CSV file into a DataFrame with specified data types for columns.
    
    Parameters:
    file_path (str): Path to the CSV file.
    dtypes (dict): Dictionary mapping column names to their data types.
    
    Returns:
    pd.DataFrame: DataFrame containing the CSV data with specified data types.
    """
    try:
        df = pd.read_csv(file_path)
        columns = list(dtypes.keys())

        column_mapping = {
            'pay_date': columns[0],
            'total': columns[1],
            'user_id': columns[2],
            'value_prop': columns[3]
        }
        df = df.rename(columns=column_mapping)
        df = df[columns]
        
        for col, dtype in dtypes.items():
            df[col] = df[col].astype(dtype)
            
        return df
    except Exception as e:
        logging.info(f"An error occurred while reading the CSV file: {e}")

def main():
    try:
        df_prints = read_json_lines_to_dataframe(PRINTS_PATH, DTYPE_PRINTS)
        df_taps = read_json_lines_to_dataframe(TAPS_PATH, DTYPE_TAPS)
        df_pays = read_csv_to_dataframe(PAYS_PATH, DTYPE_PAYS)

        df_prints['day'] = pd.to_datetime(df_prints['day']).dt.normalize()

        df_prints_last_week = df_prints[df_prints['day'] >= df_prints['day'].max() - pd.DateOffset(weeks=1)]

        logging.info(f"Prints of last week for user_id = {USER_ID_TO_TEST} \n"
             f"{df_prints_last_week[df_prints_last_week['user_id'] == USER_ID_TO_TEST].sort_values(by='value_prop').to_string(index=False)} \n")

        logging.info(f"All prints for user_id = {USER_ID_TO_TEST} \n"
             f"{df_prints[df_prints['user_id'] == USER_ID_TO_TEST].sort_values(by='value_prop').to_string(index=False)} \n")

        logging.info(f"All clicks for user_id = {USER_ID_TO_TEST} \n"
             f"{df_taps[df_taps['user_id'] == USER_ID_TO_TEST].sort_values(by='value_prop').to_string(index=False)} \n")

        logging.info(f"All payments for user_id = {USER_ID_TO_TEST} \n"
             f"{df_pays[df_pays['user_id'] == USER_ID_TO_TEST].sort_values(by='value_prop').to_string(index=False)} \n")
        
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")


if __name__ == '__main__':
    main()
    END_TIME = datetime.now()
    logging.info(f"End Time : {END_TIME.strftime('%Y%m%d_%H%M%S')} ")
    logging.info(f"Duration: {END_TIME - START_TIME}")
    logging.info('PROCESS_EXECUTED_SUCCESSFULLY')

2024-06-10 19:32:48,381 - INFO - Libraries calls ok
2024-06-10 19:32:48,383 - INFO - Start Time : 20240610_193248 
2024-06-10 19:32:55,167 - INFO - Prints of last week for user_id = 1 
       day  user_id  position         value_prop
2020-11-23        1         2 cellphone_recharge
2020-11-30        1         2 cellphone_recharge
2020-11-30        1         3         link_cobro
2020-11-30        1         1              point
2020-11-23        1         0         send_money
2020-11-23        1         1          transport
2020-11-30        1         0          transport 

2024-06-10 19:32:55,172 - INFO - All prints for user_id = 1 
       day  user_id  position         value_prop
2020-11-30        1         2 cellphone_recharge
2020-11-23        1         2 cellphone_recharge
2020-11-03        1         3   credits_consumer
2020-11-14        1         2   credits_consumer
2020-11-30        1         3         link_cobro
2020-11-03        1         1         link_cobro
2020-11-12       