# Notebook de création d'un csv jobs_events des données fractionnées

Ce notebook génère 2 csv :

- raw_merge_job_events_dataset.csv qui fusionne les données du dataset de brut (une ligne par job id)

- raw_concat_job_events_dataset.csv qui concatène les données du dataset de brut (une ligne par tag)

Etapes : 

- fractionnement de la colonne payload

- fractionnement des sous-colonnes

- fusion des sous-colonnes entre elle (chaque job à une ligne et regroupe les données des 3 tags)

- concaténation des sous-colonnes (chaque job à plusieurs ligne : job_start, job_preview, job_end)


# A) Imports

## Librairies

In [None]:
import os, json, ast
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

## Fonctions

In [None]:
# fonction retournant un dataframe à partir du payload pour un tag ciblé
def payload_dataframe_by_tag(input_df, tag):
    # creation du dataframe avec selection par tag
    df = input_df.loc[input_df['tag'] == tag]
    # creation du dataframe du payload fractionné
    payload_df = df.payload.apply(lambda x : json.loads(x)).apply(pd.Series)
    # merge des 2 dataframes
    tag_df = df.merge(payload_df,left_index=True, right_index=True)
    # suppression de la colonne 'payload' et de la colonne 'tag'
    tag_df.drop(['payload','tag'], axis=1, inplace=True)
    # remise à 0 des index
    tag_df.reset_index(level=None, drop=True, inplace=True, col_level=0, col_fill='')
    return tag_df

In [None]:
# fonction retournant le dataframe d'une colonne fractionnée
# col=colonne à fractionner
# df=dataframe source
# data=dict des colonnes du df à conserver dans le df à retourner
def convert_col_to_df(col:str, df:pd.DataFrame, data:dict=None):
    
    # création du dictionnaire de données vide
    if data == None :
        data = {}
    # ou liste des clés du dictionnaire input
    else :
        data_keys = list(data.keys())

    # on converti le type des valeurs str en list
    if not isinstance(df[col].loc[0], list) and not isinstance(df[col].loc[0], dict):
        try :
            df[col] = df[col].apply(lambda x : json.loads(x))
        except:
            df[col] = df[col].fillna(0)

    # liste des clés du dictionnaire de la colonne à partir de la première occurence
    # on recherche la première occurence non vide et de type list 
    # pour l'affecter à une variable first
    for i in range(0, (len(df[col]))):
        value = df[col].loc[i]
        if isinstance(value, list):
            if len(value) > 0 :
                first = value[0]
                #print('first: ', type(first), first)
                break
        if isinstance(value, dict):
            if len(value) > 0 :
                first = value
                #print('first: ', type(first), first)
                break

    # on liste les clés du dictionnaire de l'occurence
    col_keys = first.keys()
    for ck in col_keys :
        data[ck+'_'+col] = []
    # print(data)
    
    # on itére dans la serie pour récupérer les valeurs et les stocker dans le dictionnaire data
    for i in range(df.index.start, df.index.stop):
        # evaluation des valeurs 'str' en 'list'
        values = df[col].loc[i]
        if isinstance(values, list) and len(values) > 0 :
            # ajout des valeurs dans le dictionnaire 'd'
            for value in values :
                for k in value.keys():
                    data[k+'_'+col].append(value.get(k))
                for dk in data_keys:
                    data[dk].append(df[dk].loc[i])
        if isinstance(values, dict) :
            # ajout des valeurs dans le dictionnaire 'd'
            for k in values.keys() :
                data[k+'_'+col].append(values.get(k))
            for dk in data_keys:
                data[dk].append(df[dk].loc[i])
                
    # re-assignation de la variable df
    df = pd.DataFrame(data)

    return df

## Data

In [None]:
# source path to raw metrics dataset
filename = 'job_events.csv'
path = '../data/raw/'
# target path to save merge raw job events dataset
save_csv_merge = '../data/jobs/raw_merge_job_events_dataset.csv'
save_csv_concat = '../data/jobs/raw_concat_job_events_dataset.csv'

In [None]:
# téléchargement dans le repertoire 'data' d'un fichiers 'csv' depuis le blob
from azure_blob import download_blob_file
download_blob_file(file_name=filename, local_path=path)
job_events = os.path.join(path, filename)

# B) Dataframe

## a) Création

In [None]:
# création d'un dataframe à partir du csv de données
job_events_df = pd.read_csv(job_events).sort_values(by='received_at')
job_events_df.info()

In [None]:
# réindexation
job_events_df.reset_index(level=None, drop=True, inplace=True, col_level=0, col_fill='')
job_events_df.head(5)

In [None]:
# on verifie que les valeurs de la colonne id n'ont pas de doublon
any(job_events_df.id.duplicated())

## b) Fractionnement du payload

Le contenu du payload diffère selon le tag donc on subdivise le dataset en fonction du tag pour fractionner le payload

In [None]:
# liste des tag
job_events_df.tag.unique().tolist()

In [None]:
job_events_df

In [None]:
# creation des dataframes du payload fractionné pour chaque tag
job_event_payload = job_events_df.drop(['machine_id','received_at'], axis=1).copy()
job_started_df = payload_dataframe_by_tag(input_df=job_event_payload, tag='job-started')
job_preview_df = payload_dataframe_by_tag(input_df=job_event_payload, tag='job-preview-ready')
job_ended_df = payload_dataframe_by_tag(input_df=job_event_payload, tag='job-ended')

In [None]:
# on verifie que les valeurs de la colonne jobId soient unique dans chaque df
print('job-started tag :', any(job_started_df.jobId.duplicated()), job_started_df.jobId.nunique())
print('job-preview tag :', any(job_preview_df.jobId.duplicated()), job_preview_df.jobId.nunique())
print('job-ended tag :', any(job_ended_df.jobId.duplicated()), job_ended_df.jobId.nunique())

### a. Dataframe de tag job start

In [None]:
# visualisation des valeurs
job_started_df.head(3)

In [None]:
# suppression des colonnes ne contenant aucune valeurs :
job_started_df = job_started_df.drop(['memjet','octopus'], axis=1)

In [None]:
# liste des colonnes contenant des valeurs de type list ou dict à fractionner
job_started_col_to_split = []
for col in job_started_df.columns :
    if isinstance(job_started_df[col].loc[0], list) or isinstance(job_started_df[col].loc[0], dict):
        job_started_col_to_split.append(col)

job_started_col_to_split

In [None]:
col_to_drop = []

#### 1) Fractionnement colonne iper

In [None]:
job_started_df.iper.loc[0]

In [None]:
# on fractionne une colonne
iper = convert_col_to_df('iper', job_started_df, {'id':[]})
iper.head(3)

In [None]:
# suppression de colonne
#iper = iper.drop(['bars_iper'], axis=1)

In [None]:
# liste le nombre de valeurs uniques par colonne
iper_col_to_drop = []
for col in iper.drop(['bars_iper'], axis=1).columns:
    print(col, iper[col].nunique())
    if iper[col].nunique() <= 1 :
        iper_col_to_drop.append(col)
col_to_drop.append(iper_col_to_drop)
iper_col_to_drop

#### 2) Fractionnement colonne user

In [None]:
# on visualise les valeurs
job_started_df.user.loc[0]

In [None]:
# on fractionne une colonne
user = convert_col_to_df('user', job_started_df, {'id':[]})
user.head(2)

#### 3) Fractionnement colonne ifoil

In [None]:
# on visualise les valeurs
job_started_df.ifoil.loc[0]

In [None]:
# on fractionne une colonne
ifoil = convert_col_to_df('ifoil', job_started_df, {'id':[]})
ifoil.head(2)

In [None]:
# liste le nombre de valeurs uniques par colonne
ifoil_col_to_drop = []
for col in ifoil.drop(['stampAreas_ifoil'], axis=1).columns:
    print(col, ifoil[col].nunique())
    if ifoil[col].nunique() <= 1 :
        ifoil_col_to_drop.append(col)
col_to_drop.append(ifoil_col_to_drop)
ifoil_col_to_drop

#### 4) Fractionnement colonne layout

In [None]:
# on visualise les valeurs
job_started_df.layout.loc[0]

In [None]:
# on fractionne une colonne
layout_df = convert_col_to_df('layout', job_started_df, {'id':[]})
layout_df.head(3)

In [None]:
# on fractionne une colonne
imageLayout_layout = convert_col_to_df('imageLayout_layout', layout_df, {'id':[]})
imageLayout_layout.head(2)

In [None]:
# on fractionne une colonne
paperFormat_layout = convert_col_to_df('paperFormat_layout', layout_df, {'id':[]})
paperFormat_layout.head(2)

In [None]:
# on fusionne les colonnes fractionnées
merge_imageLayout_paperFormat = pd.merge(imageLayout_layout, paperFormat_layout, how='outer', on='id')
merge_layout = pd.merge(merge_imageLayout_paperFormat, layout_df, how='outer', on='id')
merge_layout = merge_layout.drop(['imageLayout_layout','paperFormat_layout'], axis=1)
merge_layout.head(3)

In [None]:
# liste le nombre de valeurs uniques par colonne
layout_col_to_drop = []
for col in merge_layout.columns:
    print(col, merge_layout[col].nunique())
    if merge_layout[col].nunique() <= 1 :
        layout_col_to_drop.append(col)
col_to_drop.append(layout_col_to_drop)
layout_col_to_drop

#### 5) Fractionnement colonne irDryers

In [None]:
job_started_df.irDryers.loc[0]

In [None]:
# on fractionne une colonne
irDryers = convert_col_to_df('irDryers', job_started_df, {'id':[]})
irDryers.head(3)

In [None]:
# liste le nombre de valeurs uniques par colonne
irDryers_col_to_drop = []
for col in irDryers.columns:
    print(col, irDryers[col].nunique())
    if irDryers[col].nunique() <= 1 :
        irDryers_col_to_drop.append(col)
col_to_drop.append(irDryers_col_to_drop)
irDryers_col_to_drop

#### 6) Fractionnement colonne uvDryers

In [None]:
job_started_df.uvDryers.loc[0]

In [None]:
# on fractionne une colonne
uvDryers = convert_col_to_df('uvDryers', job_started_df, {'id':[]})
uvDryers.head(3)

In [None]:
# liste le nombre de valeurs uniques par colonne
uvDryers_col_to_drop = []
for col in uvDryers.columns:
    print(col, uvDryers[col].nunique())
    if uvDryers[col].nunique() <= 1 :
        uvDryers_col_to_drop.append(col)
col_to_drop.append(uvDryers_col_to_drop)
uvDryers_col_to_drop

#### 7) Fractionnement colonne remoteScannerRegistration

In [None]:
job_started_df.remoteScannerRegistration.loc[0]

In [None]:
# on fractionne la colonne
remoteScannerRegistration = convert_col_to_df('remoteScannerRegistration', job_started_df, {'id':[]})
remoteScannerRegistration.head(3)

In [None]:
# on liste les sous-colonnes à fractionner, leurs valeurs sont de type list ou dict:
remoteScannerRegistration_col_to_split = []
for col in remoteScannerRegistration.columns:
    if isinstance(remoteScannerRegistration[col].loc[0], list) or isinstance(remoteScannerRegistration[col].loc[0], dict):
        remoteScannerRegistration_col_to_split.append(col)

remoteScannerRegistration_col_to_split

##### remoteScannerRegistration > gridMode

In [None]:
# on fractionne une colonne
gridMode = convert_col_to_df('gridMode_remoteScannerRegistration', remoteScannerRegistration, {'id':[]})
gridMode.head(3)

In [None]:
# on fractionne une colonne
descriptor = convert_col_to_df('descriptor_gridMode_remoteScannerRegistration', gridMode, {'id':[]})
descriptor.head(3)

In [None]:
# on fusionne les colonnes fractionnées
merge_gridMode = pd.merge(gridMode, descriptor, how='outer', on='id')
merge_gridMode = merge_gridMode.drop(['descriptor_gridMode_remoteScannerRegistration'], axis=1)
merge_gridMode.head(2)

##### remoteScannerRegistration > registration

In [None]:
# on fractionne une colonne
registration = convert_col_to_df('registration_remoteScannerRegistration', remoteScannerRegistration, {'id':[]})
registration.head(3)

##### remoteScannerRegistration > cropmarksMode

In [None]:
# on fractionne une colonne
cropmarksMode = convert_col_to_df('cropmarksMode_remoteScannerRegistration', remoteScannerRegistration, {'id':[]})
cropmarksMode.head(3)

In [None]:
cropmarksMode_1 = convert_col_to_df('cropmark1_cropmarksMode_remoteScannerRegistration', cropmarksMode, {'id':[]})
cropmarksMode_2 = convert_col_to_df('cropmark2_cropmarksMode_remoteScannerRegistration', cropmarksMode, {'id':[]})
merge_cropmarksModes = pd.merge(cropmarksMode_1, cropmarksMode_2, how='outer', on='id')
merge_cropmarksMode = pd.merge(cropmarksMode, merge_cropmarksModes, how='outer', on='id')
merge_cropmarksMode = merge_cropmarksMode.drop(['cropmark1_cropmarksMode_remoteScannerRegistration','cropmark2_cropmarksMode_remoteScannerRegistration'], axis=1)
merge_cropmarksMode.head(2)

##### remoteScannerRegistration > manualLighting

In [None]:
# on fractionne une colonne
manualLighting = convert_col_to_df('manualLighting_remoteScannerRegistration', remoteScannerRegistration, {'id':[]})
manualLighting.head(3)

In [None]:
platePoint = convert_col_to_df('platePoint_manualLighting_remoteScannerRegistration', manualLighting, {'id':[]})
substratePoint = convert_col_to_df('substratePoint_manualLighting_remoteScannerRegistration', manualLighting, {'id':[]})
merge_plate_substrate = pd.merge(platePoint, substratePoint, how='outer', on='id')
merge_manualLighting = pd.merge(manualLighting, merge_plate_substrate, how='outer', on='id')
merge_manualLighting = merge_manualLighting.drop(['platePoint_manualLighting_remoteScannerRegistration','substratePoint_manualLighting_remoteScannerRegistration'], axis=1)
merge_manualLighting.head(2)

##### remoteScannerRegistration > fullScannerMode

In [None]:
# on fractionne une colonne
fullScannerMode = convert_col_to_df('fullScannerMode_remoteScannerRegistration', remoteScannerRegistration, {'id':[]})
fullScannerMode.head(3)

##### remoteScannerRegistration >  specialSubstrate

In [None]:
# on fractionne une colonne
specialSubstrate = convert_col_to_df('specialSubstrate_remoteScannerRegistration', remoteScannerRegistration, {'id':[]})
specialSubstrate.head(3)

##### fusion des sous-colonnes remoteScannerRegistration

In [None]:
merge_registration_gridMode_df = pd.merge(registration, merge_gridMode, how='outer', on='id')
merge_cropmarksMode_df = pd.merge(merge_registration_gridMode_df, merge_cropmarksMode, how='outer', on='id')
merge_manualLighting_df = pd.merge(merge_cropmarksMode_df, merge_manualLighting, how='outer', on='id')
merge_fullScannerMode_df = pd.merge(merge_manualLighting_df, fullScannerMode, how='outer', on='id')
merge_specialSubstrate_df = pd.merge(merge_fullScannerMode_df, specialSubstrate, how='outer', on='id')
merge_remoteScannerRegistration = pd.merge(merge_specialSubstrate_df, remoteScannerRegistration, how='outer', on='id')


In [None]:
# suppression des sous-colonnes qui ont été fractionnées
merge_remoteScannerRegistration = merge_remoteScannerRegistration.drop(remoteScannerRegistration_col_to_split, axis=1)
merge_remoteScannerRegistration.head(2)

##### suppression des colonnes contenant des valeurs nulles ou une valeur unique

In [None]:
# liste le nombre de valeurs uniques par colonne
remoteScannerRegistration_df_col_to_drop = []
for col in merge_remoteScannerRegistration.columns:
    #print(col, merge_remoteScannerRegistration_df[col].nunique())
    if merge_remoteScannerRegistration[col].nunique() <= 1 :
        remoteScannerRegistration_df_col_to_drop.append(col)

print('nombre total de colonnes :', merge_remoteScannerRegistration.shape[1])
print('nombre de colonnes à supprimer :', len(remoteScannerRegistration_df_col_to_drop))
remoteScannerRegistration_df_col_to_drop

In [None]:
# suppression des colonnes
merge_remoteScannerRegistration = merge_remoteScannerRegistration.drop(remoteScannerRegistration_df_col_to_drop, axis=1)

#### 8) Fusion des colonnes fractionnées de job_started_df

In [None]:
merge_iper_user = pd.merge(iper, user, how='outer', on='id')
merge_ifoil_df = pd.merge(merge_iper_user, ifoil, how='outer', on='id')
merge_layout_df = pd.merge(merge_ifoil_df, merge_layout, how='outer', on='id')
merge_irDryers_df = pd.merge(merge_layout_df, irDryers, how='outer', on='id')
merge_uvDryers_df = pd.merge(merge_irDryers_df, uvDryers, how='outer', on='id')
merge_remoteScannerRegistration_df = pd.merge(merge_uvDryers_df, merge_remoteScannerRegistration, how='outer', on='id')
merge_remoteScannerRegistration_df.head(2)

In [None]:
# suppression des colonnes contenant des valeurs uniques
for cols in col_to_drop :
    for col in cols :
        merge_remoteScannerRegistration_df = merge_remoteScannerRegistration_df.drop(col, axis=1)
merge_remoteScannerRegistration_df.head(2)

In [None]:
merge_job_started_df = pd.merge(job_started_df, merge_remoteScannerRegistration_df, how='outer', on='id')
merge_job_started_df = merge_job_started_df.drop(job_started_col_to_split, axis=1)
merge_job_started_df.head(3)

### b. Dataframe de tag job preview

In [None]:
# on visualise les données
job_preview_df.head(3)

In [None]:
# # on visualise l'image
# import PIL.Image as Image
# import io, base64
# byte_data = job_preview_df.image.loc[0]
# b = base64.b64decode(byte_data)
# img = Image.open(io.BytesIO(b))
# img.show()
# img_name = job_preview_df.path.loc[0].split('/')[-1]
# img.save(img_name)

In [None]:
# on supprime la colonne machineId
job_preview_df = job_preview_df.drop(['machineId'], axis=1)

Ces données sont utiles pour afficher les images des job mais pas pertinentes pour l'exploration ou la prédiction.

### c. Dataframe de tag job end

In [None]:
# on visualise les données
job_ended_df.head(3)

In [None]:
# on supprime la colonne machineId
job_ended_df = job_ended_df.drop(['machineId'], axis=1)

La colonne varnishConsumption est fractionnable

#### 1. Fractionnement colonne 'varnishConsumption'

In [None]:
job_ended_df.varnishConsumption.loc[0]

In [None]:
varnishConsumption = convert_col_to_df('varnishConsumption', job_ended_df, {'id':[]})
varnishConsumption.head(2)

##### Fractionnement colonne 'operatorSideTanks_varnishConsumption'

In [None]:
operatorSideTanks = convert_col_to_df('operatorSideTanks_varnishConsumption', varnishConsumption, {'id':[]})
operatorSideTanks.head(3)

##### Suppression colonne 'technicalSideTanks_varnishConsumption'

In [None]:
# la colonne ne contient aucune valeur
varnishConsumption = varnishConsumption.drop(['technicalSideTanks_varnishConsumption'], axis=1)

##### Fusion des colonnes varnishConsumption

In [None]:
merge_varnishConsumption = pd.merge(varnishConsumption, operatorSideTanks, how='outer', on='id')
merge_varnishConsumption = merge_varnishConsumption.drop(['operatorSideTanks_varnishConsumption'], axis=1)
merge_varnishConsumption.head(3)

#### 2. Fusion des sous-colonnes job_ended

In [None]:
merge_job_ended_df = pd.merge(job_ended_df, merge_varnishConsumption, how='outer', on='id')
merge_job_ended_df = merge_job_ended_df.drop(['varnishConsumption'], axis=1)
merge_job_ended_df.head(3)

In [None]:
# liste le nombre de valeurs uniques par colonne
merge_job_ended_df_col_to_drop = []
for col in merge_job_ended_df.columns:
    if merge_job_ended_df[col].nunique() <= 1 :
        merge_job_ended_df_col_to_drop.append(col)

print('nombre total de colonnes :', merge_job_ended_df.shape[1])
print('nombre de colonnes à supprimer :', len(merge_job_ended_df_col_to_drop))
merge_job_ended_df_col_to_drop

In [None]:
# suppression des colonnes à valeur unique
merge_job_ended_df = merge_job_ended_df.drop(merge_job_ended_df_col_to_drop, axis=1)

## c) Creation du dataframe final

### 1. Vérification

In [None]:
# concatenation des dataframes du payload
print('job_started shape :', merge_job_started_df.shape)
print('job_preview shape :', job_preview_df.shape)
print('job_ended shape :', merge_job_ended_df.shape)

In [None]:
# on verifie l'intégrité des jobId
print(merge_job_started_df['jobId'].isin(job_started_df['jobId']).value_counts())
print(merge_job_ended_df['jobId'].isin(job_ended_df['jobId']).value_counts())

Il y a toujours autant de jobId dans les df fusionnés et les df de départ ne comportaient aucun doublon.

On peut donc les fusionner sur la valeur de jobId.

### 2. Par fusion

In [None]:
# on fusionnes les datasets des tag start et tag end par job id
merge_start_end_df = pd.merge(
    merge_job_started_df.drop(['id'],axis=1), 
    merge_job_ended_df.drop(['id'],axis=1), 
    how='outer', 
    on='jobId',
    suffixes=['_start', '_end'])

In [None]:
merge_payload_df = pd.merge(
    merge_start_end_df, 
    job_preview_df.drop(['id'],axis=1), 
    how='outer', 
    on='jobId')

In [None]:
merge_payload_df.head(2)

In [None]:
merge_payload_df.info()

#### Output csv

In [None]:
merge_payload_df.to_csv(save_csv_merge)

### 3. Par concaténation

Si l'on souhaite conserver un dataset avec un tag par ligne on effectue une concaténation des datasets tag start et end

In [None]:
# on concatene les dataset des tag start et tag end
concat_job_events_df = pd.concat([merge_job_started_df,merge_job_ended_df])
concat_job_events_df.reset_index(level=None, drop=True, inplace=True, col_level=0, col_fill='')

In [None]:
print(concat_job_events_df.info())
concat_job_events_df.head(3)

#### Output csv

In [None]:
concat_job_events_df.to_csv(save_csv_concat)