## Imports

In [1]:
import pandas as pd
import json
import numpy as np

#Imports Bigquery
from google.cloud import bigquery
from google.oauth2 import service_account


## GCP config

In [2]:
#Localisation du projet name et table_id sur gcp
#TODO : Remplacer le nom du projet et du dataset sur GCP 
project="marbotic"
dataset = "marbotic_dataset"

In [3]:
#intégration des credentials
#TODO : Remplacer le path de credential d'accès à notre projet GCP
key_path = "/Users/antonin/code/AntoninAnq/gcp/marbotic-7d02fac30bd8.json"

credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],)

## Preprocessin du JSON

In [4]:
#conversion du fichier json en json line delimiter pour utiliser pd.read_json avec une chunk size. (Nombre de ligne processée en une fois)
#!cat ../raw_data/export_23-5_minified.json | jq -c '.[]' > ../raw_data/export_converted_2.json

## GCP upload table events

In [5]:
#TODO : Remplacer la localisation des données sources
FILE_PATH = '../raw_data/export_converted_2.json'

In [11]:
def upload_events_table(path,credentials, project,dataset,table):
    CHUNKSIZE = 100000
    table_id = f"{dataset}.{table}"
    df = pd.read_json(path, lines=True, chunksize=CHUNKSIZE) #Chunk size pour avoir 14 chunk
    
    client = bigquery.Client(project,credentials)
    
    for i,c in enumerate(df):
        index_init = np.linspace(1,len(c),len(c),dtype='int32')
        ep=pd.DataFrame()
        c.reset_index(inplace=True)
        ep=c.copy()
        ep.drop(['user_properties','event_properties'],axis=1,inplace=True)
        ep['user_id'] = ep['user_id'].apply(lambda x: 0 if (x is None or type(x)== str or  np.isnan(x)) else int(x))
        ep['id'] = (i*CHUNKSIZE) + index_init
        ep.drop(['index'],axis=1,inplace=True)
        #ep.to_csv('../raw_data/events.csv',index=False)
        job = client.load_table_from_dataframe(ep, table_id) 
        job.result()  # Wait for the job to complete.
        table = client.get_table(table_id)  # Make an API request.
        print(
            "Loaded {} rows and {} columns to {}".format(
                table.num_rows, len(table.schema), table_id
            )
        )

In [12]:
#TODO : Uncomment this line if you need to upload again events table
#upload_events_table(FILE_PATH,credentials, project,dataset,"events_f")

Loaded 100000 rows and 15 columns to marbotic_dataset.events_f


KeyboardInterrupt: 

## GCP upload table user_id

In [13]:
#TODO : Remplacer la localisation des données sources
FILE_PATH_USER_ID = '../raw_data/users_info_202206071551.csv'

In [16]:
def upload_user_id_table(path,credentials, project,dataset,table):
    CHUNKSIZE = 5000
    
    table_id = f"{dataset}.{table}"
    df = pd.read_csv(path,chunksize=CHUNKSIZE,sep=";")
    client = bigquery.Client(project,credentials)
    
    for i,c in enumerate(df):
        ep=pd.DataFrame()
        c.reset_index(inplace=True)
        ep=c.copy()
        ep.drop(['Country','Language','News letter','Type','Games','Products','Pieces','index'],axis=1,inplace=True)
        ep.rename(columns={'User id':'User_id'},inplace=True)
    
        cat = ep['Purchases'].copy().map(lambda x: ", ".join(x)
                                            if isinstance(x, list) else x)
        names = list(
            set([
                x.strip().strip("'").strip("['")
                for x in ', '.join(', '.join(', '.join(
                    list(set([str(x) for x in cat]))).split('\n')).split(
                        "' '")).split(',')
            ]))
    
        names_transf = [
            'Purchases' + '_' + x.replace(' ', '_').replace(']', '').replace('"', '') for x in names
            if x != 'nan' and x != ''
        ]
        names = [x for x in names if x != 'nan' and x != '']
        for ind, name in enumerate(names_transf):
            ep[f'{name}'] = ep['Purchases'].map(
                lambda x: 1 if isinstance(x, str) and f'{names[ind]}' in x else
                (1 if isinstance(x, list) and
                 len([n for n in x if f'{names[ind]}' in n]) > 0
                 else 0))
            
        ep.drop(['Purchases'],axis=1,inplace=True)
            
        job = client.load_table_from_dataframe(ep, table_id)  
        job.result()  
        table = client.get_table(table_id)
        print(
            "Loaded {} rows and {} columns to {}".format(
                table.num_rows, len(table.schema), table_id
            )
        )

In [15]:
#TODO : Uncomment this line if you need to upload again user_id table
#upload_user_id_table(FILE_PATH_USER_ID,credentials, project,dataset,"user_id_f")

Loaded 5000 rows and 13 columns to marbotic_dataset.user_id_f
Loaded 10000 rows and 13 columns to marbotic_dataset.user_id_f
Loaded 13679 rows and 13 columns to marbotic_dataset.user_id_f


**Creation de la colonne purchases**

On créé une colonne "purchases" pour spécifier si l'utilisateur à déjà acheté au moins un produit.  
On le fait via la console GCP avec les commande SQL ci dessous.  

Dans un premier temps on doit créé la colonne "purchases" de type booléen qui sera vide.  
Puis on initialise toutes les valeurs à False avec la requête ci dessous.

```   
UPDATE `marbotic.marbotic_dataset.user_id_f`  
SET purchases = False  
WHERE TRUE  
```


Enfin on mets à jour les profils utilisateurs ayant effectué au moins un achat avec la requête ci dessous.  

```
UPDATE `marbotic.marbotic_dataset.user_id_f`   
SET purchases = True  
WHERE Purchases_PY1Y = 1  
OR  
Purchases_PLY = 1  
OR   
Purchases_MEGR = 1  
OR  
Purchases_PLM = 1  
OR   
Purchases_ME1 = 1  
OR   
Purchases_EY3M = 1  
OR  
Purchases_tier_upgrade_0_to_1 = 1  
OR  
Purchases_PM3M = 1  
OR  
Purchases_MPGR = 1  
OR  
Purchases_EY1Y = 1  
OR  
Purchases_PY3M = 1  
```

## GCP upload user_properties

In [32]:
def upload_user_properties_table(path,credentials, project,dataset,table):

    CHUNKSIZE = 100000
    table_id = f"{dataset}.{table}"
    df = pd.read_json(path, lines=True, chunksize=CHUNKSIZE) #Chunk size pour avoir 14 chunk
    
    client = bigquery.Client(project,credentials)
    
    for i,c in enumerate(df):
        index_init = np.linspace(1,len(c),len(c),dtype='int32')
        ep=pd.DataFrame()
        c.reset_index(inplace=True)
        ep=pd.DataFrame(c["user_properties"].to_list()).copy()
    
        for col_concat in ['Products', 'Pieces', 'Games']:
            cat = ep[f'{col_concat}'].copy().map(lambda x: ", ".join(x)
                                                if isinstance(x, list) else x)
            names = list(
                set([
                    x.strip().strip("'").strip("['")
                    for x in ', '.join(', '.join(', '.join(
                        list(set([str(x) for x in cat]))).split('\n')).split(
                            "' '")).split(',')
                ]))
            names_transf = [
                col_concat + '_' + x.replace(' ', '_') for x in names
                if x != 'nan' and x != ''
            ]
            names = [x for x in names if x != 'nan' and x != '']
            for ind, name in enumerate(names_transf):
    
                ep[f'{name}'] = ep[f'{col_concat}'].map(
                    lambda x: 1 if isinstance(x, str) and f'{names[ind]}' in x else
                    (1 if isinstance(x, list) and
                     len([n for n in x if f'{names[ind]}' in n]) > 0
                     else 0))
    
        ep.drop(['Products', 'Pieces', 'Games'], axis=1, inplace=True)
        ep['event_id']=c["event_id"].copy()
        ep['client_event_time']=c['client_event_time'].copy()
        ep['user_creation_time']=c['user_creation_time'].copy()
        ep['user_id']=c['user_id'].copy()
        ep['user_id'] = ep['user_id'].apply(lambda x: 0 if (x is None or type(x)== str or  np.isnan(x)) else int(x))
        ep['id'] = (i*CHUNKSIZE) + index_init
        
        job_config = bigquery.LoadJobConfig()
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
        job_config.schema_update_options = [
        bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
        ]
        
        job = client.load_table_from_dataframe(ep, table_id, job_config=job_config)
        job.result()  
        table = client.get_table(table_id)  
        print(
            "Loaded {} rows and {} columns to {}".format(
                table.num_rows, len(table.schema), table_id
            )
            )

In [33]:
#TODO : Uncomment this line if you need to upload again user_properties table
#upload_user_properties_table(FILE_PATH,credentials, project,dataset,"user_properties_f")

Loaded 100000 rows and 39 columns to marbotic_dataset.user_properties_f
Loaded 200000 rows and 39 columns to marbotic_dataset.user_properties_f
Loaded 300000 rows and 39 columns to marbotic_dataset.user_properties_f
Loaded 400000 rows and 39 columns to marbotic_dataset.user_properties_f
Loaded 500000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 600000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 700000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 800000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 900000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 1000000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 1100000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 1200000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 1300000 rows and 40 columns to marbotic_dataset.user_properties_f
Loaded 1320229 rows and 40 columns to marbotic_dataset.user_

## GCP upload event_properties

In [34]:
#Update columns name for big query acceptance
columns_rename = {'[Game] Code':'Game_Code','[Profile] Age (days)':'Profile_Age_days','[Scene] Name':'Scene_Name',
                 '[Game] Language':'Game_Language', '[Scene] Previous':'Scene_Previous', '[Time] Spent':'Time_Spent',
                  '[Scene] Next':'Scene_Next', '[Action] Element Type':'Action_Element_Type',
                  '[Scene] Section':'Scene_Section', '[Action] Element Name':'Action_Element_Name',
                 '[Activity] Name':'Activity_Name', '[Scaffolding] Level':'Scaffolding_Level',
                  '[Game] Piece Code':'Game_Piece_Code','[Activity] Nb Wrong Answer':'Activity_Nb_Wrong_Answer',
                  '[Activity] Solved':'Activity_Solved','[Activity] Piece Stamped':'Activity_Piece_Stamped',
                  '[Activity] Piece Asked':'Activity_Piece_Asked','[Activity] Modality':'Activity_Modality',
                  '[Time] Slot':'Time_Slot', '[Error] Type':'Error_Type',
                  '[Activation] Game Code':'Activation_Game_Code',
                  '[Activation] Product Code':'Activation_Product_Code',
                 '[Activation] Piece Code':'Activation_Piece_Code', '[Popup] Name':'Popup_Name',
                  '[Renewal] Type':'Renewal_Type','[Key] Type':'Key_Type','[Toast] Name':'Toast_Name'}

In [35]:
def upload_event_properties_table(path,credentials, project,dataset,table):
    CHUNKSIZE = 100000
    
    table_id = f"{dataset}.{table}"
    df = pd.read_json(path, lines=True, chunksize=CHUNKSIZE) #Chunk size pour avoir 14 chunk
    
    client = bigquery.Client(project,credentials)
    
    for i,c in enumerate(df):
        index_init = np.linspace(1,len(c),len(c),dtype='int32')
        ep=pd.DataFrame()
        c.reset_index(inplace=True)
        ep=pd.DataFrame(c["event_properties"].to_list()).copy()
        ep['event_id']=c["event_id"].copy()
        ep['event_type']=c["event_type"].copy()
        ep['session_id']=c["session_id"].copy()
        ep['id'] = (i*CHUNKSIZE) + index_init
        ep.rename(columns=columns_rename,inplace=True)
        #ep.to_csv('../raw_data/event_properties.csv',index=False)
        
        job_config = bigquery.LoadJobConfig()
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
        job_config.schema_update_options = [
        bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
        ]
        
        job = client.load_table_from_dataframe(ep, table_id,job_config=job_config)  
        job.result()  
        table = client.get_table(table_id) 
        print(
            "Loaded {} rows and {} columns to {}".format(
                table.num_rows, len(table.schema), table_id
            )
        )  

In [36]:
#TODO : Uncomment this line if you need to upload again event_properties table
#upload_event_properties_table(FILE_PATH,credentials, project,dataset,"event_properties_f")

Loaded 100000 rows and 28 columns to marbotic_dataset.event_properties_f
Loaded 200000 rows and 28 columns to marbotic_dataset.event_properties_f
Loaded 300000 rows and 28 columns to marbotic_dataset.event_properties_f
Loaded 400000 rows and 28 columns to marbotic_dataset.event_properties_f
Loaded 500000 rows and 28 columns to marbotic_dataset.event_properties_f
Loaded 600000 rows and 28 columns to marbotic_dataset.event_properties_f
Loaded 700000 rows and 29 columns to marbotic_dataset.event_properties_f
Loaded 800000 rows and 30 columns to marbotic_dataset.event_properties_f
Loaded 900000 rows and 30 columns to marbotic_dataset.event_properties_f
Loaded 1000000 rows and 30 columns to marbotic_dataset.event_properties_f
Loaded 1100000 rows and 31 columns to marbotic_dataset.event_properties_f
Loaded 1200000 rows and 31 columns to marbotic_dataset.event_properties_f
Loaded 1300000 rows and 31 columns to marbotic_dataset.event_properties_f
Loaded 1320229 rows and 31 columns to marbotic_