In [10]:
from pandas import DataFrame
import pandas as pd
import json

In [11]:
#Load a json or csv file to DataFrame
def read_file(file_path: str) -> DataFrame:
    data = []
    if file_path.endswith('.json'):
        try: 
            with open(f'{file_path}', 'r') as f:
                for line in f:
                    data.append(json.loads(line))
            f.close()
            df = pd.json_normalize(data)
            df.rename(columns = {'day':'date', 'event_data.position':'position', 'event_data.value_prop':'value_prop'}, inplace = True)
            return df
        except FileNotFoundError as fnf_error:
            print(fnf_error)
        except:
            print("Something went wrong")
    elif file_path.endswith('.csv'):
        try: 
            df = pd.read_csv("pays.csv")
            df = df.groupby(['pay_date','user_id','value_prop'])['total'].sum()
            return df
        except FileNotFoundError as fnf_error:
            print(fnf_error)
        except:
            print("Something went wrong")
    else:
        print("Wrong extension file, only reading json or csv files")

In [12]:
def validate_key_fields(df: DataFrame) -> DataFrame:
    duplicates = df.groupby(['date', 'user_id', 'position']).filter(lambda x: len(x) > 1).value_counts()
    if len(duplicates) > 0:
        return df.drop_duplicates(keep='last').reset_index()
    else:
        return df

In [13]:
def create_join_table(prints: DataFrame, taps: DataFrame, pays: DataFrame) -> DataFrame:
    prints = validate_key_fields(prints)
    taps = validate_key_fields(taps)
    
    prints_taps = pd.merge(prints, taps, on=['date', 'user_id', 'position'], suffixes=('_print', '_tap'), how='left')
    joined_data = pd.merge(prints_taps, pays, left_on=['date','user_id','value_prop_print'], right_on=['pay_date','user_id','value_prop'], how='left')
    joined_data['click_flag'] = 1
    joined_data.loc[joined_data['value_prop_tap'].isnull(), 'click_flag'] = 0
    joined_data['date'] = pd.to_datetime(joined_data['date'])
    joined_data['year_week_id'] = joined_data['date'].dt.year.astype('int32') * 100 + \
                                    joined_data['date'].dt.isocalendar().week.astype('int32')
    joined_data['row_num_week'] = joined_data['year_week_id'].rank(method ='dense',ascending=True).astype('int32')
    return joined_data


In [20]:
def create_ouput_dataset(df: DataFrame, wk: int, schema:dict) -> DataFrame:
    max_week = df['row_num_week'].max() 
    early_weeks = df.query(f'row_num_week >= {max_week}-{wk} and row_num_week < {max_week}')
    agg_early_weeks = early_weeks.groupby(['user_id','value_prop_print']).aggregate({'value_prop_print':'count', 'click_flag':'sum','total':['count','sum']})
    agg_early_weeks = pd.DataFrame(agg_early_weeks.to_records())
    agg_early_weeks.columns = ['user_id', 'value_prop_print', f'views_last{wk}_weeks', f'clicks_last{wk}_weeks', f'pays_last{wk}_weeks', f'import_pay_last{wk}_weeks']

    last_week = df.query(f'row_num_week == {max_week}')
    tmp_output = pd.merge(last_week, agg_early_weeks, on=['user_id', 'value_prop_print'], how='left')
    output = tmp_output[['date', 'user_id', 'position', 'value_prop_print', 'click_flag', f'views_last{wk}_weeks', f'clicks_last{wk}_weeks', f'pays_last{wk}_weeks', f'import_pay_last{wk}_weeks']].fillna(0)
    output = output.astype(schema)
    return output

In [22]:
schema = {
    'date': 'datetime64[ns]',
    'user_id': 'int32',
    'position': 'int32',
    'value_prop_print': 'str',
    'click_flag': 'int32',
    'views_last3_weeks': 'int32',
    'clicks_last3_weeks': 'int32',
    'pays_last3_weeks': 'int32',
    'import_pay_last3_weeks': 'float64'
    }

prints = read_file('/Users/seba/Documents/Data Engineering Development/MELI DE/prints.json')
taps = read_file('/Users/seba/Documents/Data Engineering Development/MELI DE/taps.json')
pays = read_file('/Users/seba/Documents/Data Engineering Development/MELI DE/pays.csv')

test = create_join_table(prints, taps, pays)
test = create_ouput_dataset(test, 3, schema)
test


Unnamed: 0,date,user_id,position,value_prop_print,click_flag,views_last3_weeks,clicks_last3_weeks,pays_last3_weeks,import_pay_last3_weeks
0,2020-11-30,59706,0,send_money,0,0,0,0,0.0
1,2020-11-30,32191,0,link_cobro,0,1,0,0,0.0
2,2020-11-30,32191,1,transport,0,0,0,0,0.0
3,2020-11-30,32191,2,send_money,0,0,0,0,0.0
4,2020-11-30,53960,0,prepaid,0,0,0,0,0.0
...,...,...,...,...,...,...,...,...,...
14662,2020-11-30,50807,0,send_money,0,0,0,0,0.0
14663,2020-11-30,50807,1,prepaid,0,1,0,0,0.0
14664,2020-11-30,50807,2,credits_consumer,0,2,0,0,0.0
14665,2020-11-30,1487,0,point,0,0,0,0,0.0
