<a href="https://colab.research.google.com/github/Waldemar77/Pipeline_MeLi_test/blob/origin/MeLi_TestPipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [82]:
!pip install orjson
import pandas as pd
import orjson
import os
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Reading prints.json and taps.json files using pandas and orjson file for nested json lines file. Furthermore, creating a final dataframe to show last week prints.

In [83]:
# >>> Input path for prints.json
json_base_path = "/content/drive/MyDrive/MeLi test/nested_json_files"

# Initialize an empty list to store processed rows
rows = []
df_whole = pd.DataFrame()
df_prints = pd.DataFrame()
df_taps = pd.DataFrame()

# Iterate over each JSON file in the base path
for json_file in os.listdir(json_base_path):

  # Construct the full path to the JSON file
  json_path = os.path.join(json_base_path, json_file)

  # Read and process the JSON lines file iteratively
  try:
    with open(json_path, 'r') as file:
      for line in file:
        # Parse each line as JSON using orjson
        record = orjson.loads(line)
        # Flatten the nested JSON structure
        flattened_record = {
            'day': record['day'],
            'user_id': record['user_id'],
            'event_data.position': record['event_data']['position'],
            'event_data.value_prop': record['event_data']['value_prop']
        }
        # Append the flattened record to the list
        rows.append(flattened_record)

      # Creating dataframe with rows list
      temp_df = pd.DataFrame(rows)

      # Adding a new column to identify json file
      temp_df['event_type'] = f'{json_file}'

      # casting to date format
      temp_df['day'] = pd.to_datetime(temp_df['day'])

      # casting string values to integer format
      #temp_df['user_id'] = temp_df['user_id'].astype(int)
      #temp_df['event_data.position'] = temp_df['event_data.position'].astype(int)

      # Concatenating the temporary DataFrame with the main DataFrame
      df_whole = pd.concat([df_whole, temp_df], ignore_index=True)
  except Exception as e:
    print(f"Error processing {json_file}: {e}")

# >>> Transforming and creating aux dataframes
try:
  # Saving taps records
  df_taps = df_whole[df_whole['event_type'] == 'taps.json']

  # Saving prints records
  df_prints = df_whole[df_whole['event_type'] == 'prints.json']

  # calculating the first 3 weeks for df_taps and df_prints, calculated with the las day
  df_taps = df_taps[df_taps['day'] <= df_taps['day'].max() - pd.Timedelta(days=7)]
  df_prints = df_prints[df_prints['day'] <= df_prints['day'].max() - pd.Timedelta(days=7)]

  # creating new dataframe with only last week prints, calculated with the last day
  df_prints_lastW = df_prints[df_prints['day'] >= df_prints['day'].max() - pd.Timedelta(days=7)]

  # dropping column event_data.position on df_prints_lastW
  df_prints_lastW = df_prints_lastW.drop('event_data.position', axis=1)

  # renaming event_data.value_prop
  df_prints_lastW = df_prints_lastW.rename(columns={'event_data.value_prop': 'value_prop'})
  df_taps = df_taps.rename(columns={'event_data.value_prop': 'value_prop'})
  df_prints = df_prints.rename(columns={'event_data.value_prop': 'value_prop'})

  # Calculating for each user_id and value_prop in df_prints_lastW
  #how many user_id and value_prop are found in df_taps
  df_taps_count = df_taps.groupby(['user_id', 'value_prop']).size().reset_index(name='taps_count')

  # Calculating quantity of prints first 3 weeks
  df_prints_count = df_prints.groupby(['user_id', 'value_prop']).size().reset_index(name='prints_count')
except Exception as e:
  print(f"Error processing ETL: {e}")

In [84]:
# Importin csv file
csv_base_path = '/content/drive/MyDrive/MeLi test/csv_files/'

df_payment = pd.DataFrame()

# Reading csv file
try:
  # Iterate over each CSV file in the base path
  for csv_file in os.listdir(csv_base_path):

    # Construct the full path to the CSV file
    if csv_file.endswith('.csv'):
      csv_path = os.path.join(csv_base_path, csv_file)
      df_payment = pd.read_csv(csv_path, sep=',', encoding='utf-8', header=0)

      # Casting date field
      df_payment['pay_date'] = pd.to_datetime(df_payment['pay_date'])

  # dataframe for quantity of payments by user_id and value_prop
  df_payment_qty = df_payment.groupby(['user_id', 'value_prop']).count().reset_index()
  df_payment_qty = df_payment_qty[['user_id', 'value_prop', 'pay_date']]
  df_payment_qty.rename(columns={'pay_date': 'payment_qty'}, inplace=True)

  # dataframe with amount of payments by user_id and value_prop, sumarize by 'total'
  df_payment = df_payment.drop('pay_date', axis=1)
  df_payment_sum = df_payment.groupby(['user_id', 'value_prop']).sum().reset_index()
  df_payment_sum.rename(columns={'total': 'payment_sum'}, inplace=True)
except Exception as e:
  print(f"Error processing csv loading and transformation: {e}")

#print(df_payment.info())
#print(df_payment_qty.head())
#print(df_payment_qty.query('user_id == 25'))
#print(df_payment_sum.query('user_id == 25'))


In [85]:
try:
  # Joining df_prints_lastW with df_prints_count by user_id and value_prop
  df_prints_lastW = df_prints_lastW.merge(df_prints_count, on=['user_id', 'value_prop'], how='left')

  # Joining df_prints_lastW with df_taps_count by user_id and value_prop
  df_prints_lastW = df_prints_lastW.merge(df_taps_count, on=['user_id', 'value_prop'], how='left')

  # Joining df_prints_lastW with df_payment_qty by user_id and value_prop
  df_prints_lastW = df_prints_lastW.merge(df_payment_qty, on=['user_id', 'value_prop'], how='left')

  # Joining df_prints_lastW with df_payment_sum by user_id and event_data.value_prop
  df_prints_lastW = df_prints_lastW.merge(df_payment_sum, on=['user_id', 'value_prop'], how='left')

  # Filling the missing values with 0
  df_prints_lastW.fillna(0, inplace=True)

  # Creating a new column with True if taps_count > 0, otherwise, False
  df_prints_lastW['has_taps'] = df_prints_lastW['taps_count'] > 0

  # Deleting duplicates on value_prop
  df_prints_lastW = df_prints_lastW.drop_duplicates(subset=['user_id','value_prop'])
  df_prints_lastW = df_prints_lastW.drop('event_type', axis=1)
except Exception as e:
  print(f"Error processing joins on definitive dataframe 'df_prints_lastW' and aux dataframes: {e}")

In [86]:
#print(df_prints_lastW.query("taps_count > 1"))
#print(df_prints_lastW.query("has_taps == True"))
print(df_prints_lastW.query("user_id == 25"))

              day  user_id          value_prop  prints_count  taps_count  \
5131   2020-11-18       25    credits_consumer             2         1.0   
5132   2020-11-18       25               point             2         1.0   
64558  2020-11-18       25  cellphone_recharge             2         0.0   
64560  2020-11-18       25             prepaid             2         0.0   
149779 2020-11-23       25          send_money             1         0.0   

        payment_qty  payment_sum  has_taps  
5131            0.0         0.00      True  
5132            2.0       176.72      True  
64558           2.0        54.67     False  
64560           1.0        27.69     False  
149779          2.0        24.36     False  
