In [1]:
import os
import pandas as pd
from tqdm import tqdm
import awswrangler as wr
import math
import numpy as np
from datetime import datetime, timedelta


In [2]:
#DSW workspace config settings
DMDG_WORKSPACE_BUCKET = f'sanofi-datalake-dmdg-workspaces-prod'
WORKSPACE_PREFIX = 'projet-rncp-20230705155503681'
TABLE_PATH = f's3://sanofi-datalake-dmdg-workspaces-prod/projet-rncp-20230705155503681/__TABLES__/'
DATABASE_NAME ='prod-dmdg-workspace-projet-rncp-20230705155503681'

In [3]:
def read_query(query):
    # Execute a query, return the data
    df = wr.athena.read_sql_query(
        sql=query,
        database=DATABASE_NAME,
        ctas_approach=False
    )
    
    return df

In [4]:
import warnings

def custom_convert_to_float(value):
    try:
        return float(value)
    except (ValueError, TypeError):
        # Gérer les cas spécifiques de conversion pour les valeurs non numériques
        if value.endswith('p7'):
            # Exemple de traitement pour la valeur '22p7'
            value = value.replace('p7', '.7')
            return float(value)
        else:
            # Autres cas de traitement ou renvoyer NaN pour les valeurs non convertibles
            return float('nan')
        
        
def push_table_by_batch(table_name,data,batch_size=1000000):
    
    # Ignorer le warning SettingWithCopyWarning
    warnings.filterwarnings("ignore", category=pd.core.common.SettingWithCopyWarning)
    
    # Push 1 table, take name of the table and data to push
    path = f'{TABLE_PATH}{table_name}'
    
    num_batches = math.ceil(len(data)/batch_size)
    total_rows_pushed = 0  # Variable pour stocker le nombre total de lignes poussées
    
    for i in tqdm(range(num_batches),desc="Processing batches"):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(data))
        batch = data.iloc[start_idx:end_idx]
        #print(batch)
        
        # Convertir les colonnes appropriées en type numérique
        numeric_cols = ['tempsarret','rs_sv_arret_p']  # Remplacez par les noms des colonnes à convertir
        for col in numeric_cols:
            batch[col] = batch[col].apply(custom_convert_to_float)

        #batch[numeric_cols] = batch[numeric_cols].apply(pd.to_numeric, errors='coerce')
        
        #Load table
        wr.s3.to_parquet(
            df=batch,
            path=path,
            database=DATABASE_NAME,
            dataset=True,
            table=table_name,
            mode='append',
            index=False,
            compression=None,
        )
        
        total_rows_pushed += len(batch)  # Ajouter le nombre de lignes du paquet au total

    print(f"Nombre total de lignes poussées dans la table AWS : {total_rows_pushed}")
    
    return True

In [5]:
%%time

fusion_df = pd.read_pickle('fusion_df.pkl')
fusion_df

CPU times: user 21.9 s, sys: 11.1 s, total: 33 s
Wall time: 19min 47s


Unnamed: 0,gid,hor_theo,hor_app,etat,type,source,rs_sv_arret_p,rs_sv_cours_a,mdate,hor_real,tempsarret
0,168488116,2023-01-08T09:53:38+01:00,2023-01-08T09:53:38+01:00,NON_REALISE,REGULIER,SAEIV_TRAM,127940,2519295,2023-01-08T03:31:30+01:00,,
1,168488117,2023-01-08T09:54:52+01:00,2023-01-08T09:54:52+01:00,NON_REALISE,REGULIER,SAEIV_TRAM,127938,2519295,2023-01-08T03:31:30+01:00,,
2,168492575,2023-01-08T16:15:39+01:00,2023-01-08T16:15:39+01:00,NON_REALISE,REGULIER,SAEIV_TRAM,128069,2518974,2023-01-08T03:31:30+01:00,,
3,168492576,2023-01-08T16:17:06+01:00,2023-01-08T16:17:06+01:00,NON_REALISE,REGULIER,SAEIV_TRAM,128071,2518974,2023-01-08T03:31:30+01:00,,
4,168492577,2023-01-08T16:19:16+01:00,2023-01-08T16:19:16+01:00,NON_REALISE,REGULIER,SAEIV_TRAM,128076,2518974,2023-01-08T03:31:30+01:00,,
...,...,...,...,...,...,...,...,...,...,...,...
70587949,268486129,2023-04-11T21:17:27+02:00,2023-04-11T21:17:27+02:00,REALISE,REGULIER,SAEIV_BUS,125269,2685669,2023-04-11T22:14:16+02:00,2023-04-11T21:17:37+02:00,3356.0
70587950,268486130,2023-04-11T21:18:09+02:00,2023-04-11T21:18:09+02:00,REALISE,REGULIER,SAEIV_BUS,126617,2685669,2023-04-11T22:15:10+02:00,2023-04-11T22:14:15+02:00,0.0
70587951,268486131,2023-04-11T21:19:00+02:00,2023-04-11T21:19:00+02:00,REALISE,REGULIER,SAEIV_BUS,125592,2685669,2023-04-11T22:15:38+02:00,2023-04-11T22:15:06+02:00,0.0
70587952,268486132,2023-04-11T21:21:00+02:00,2023-04-11T21:21:00+02:00,REALISE,REGULIER,SAEIV_BUS,128348,2685669,2023-04-11T22:17:39+02:00,2023-04-11T22:17:06+02:00,0.0


In [6]:
# def transform_dataframe(fusion_df):
#     print("Étape 1 : Filtrer les colonnes avec état == 'REALISE'")
#     fusion_df = fusion_df[fusion_df["etat"] == "REALISE"].copy()
    
#     print("Étape 2 : Supprimer les colonnes 'gid' et 'hor_app'")
#     fusion_df.drop(["gid", "hor_app"], axis=1, inplace=True)
    
#     print("Étape 3 : Convertir les colonnes en datetime")
#     fusion_df["hor_theo"] = pd.to_datetime(fusion_df["hor_theo"])
#     fusion_df["hor_real"] = pd.to_datetime(fusion_df["hor_real"])
#     fusion_df["mdate"] = pd.to_datetime(fusion_df["mdate"])
    
#     print("Étape 4 : Calculer le retard en minutes")
#     fusion_df["retard"] = (fusion_df["hor_real"] - fusion_df["hor_theo"]).dt.total_seconds() / 60.0
    
#     print("Étape 5 : Remplacer les retards supérieurs à 15 minutes ou inférieurs à -15 minutes par la moyenne totale du retard")
#     mean_retard = fusion_df["retard"].mean()
#     fusion_df.loc[(fusion_df["retard"] > 15) | (fusion_df["retard"] < -15), "retard"] = mean_retard
    
#     print("Étape 6 : Ajouter les colonnes année, mois et jour de semaine")
#     fusion_df["annee"] = fusion_df["mdate"].dt.year
#     fusion_df["mois"] = fusion_df["mdate"].dt.month
#     fusion_df["jour_semaine"] = fusion_df["mdate"].dt.strftime("%A")
    
#     print("Transformation terminée !")
#     return fusion_df



In [13]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from tqdm import tqdm
import warnings
from pandas.core.common import SettingWithCopyWarning

def custom_convert_to_float(value):
    try:
        return float(value)
    except (ValueError, TypeError):
        # Gérer les cas spécifiques de conversion pour les valeurs non numériques
        if value.endswith('p7'):
            # Exemple de traitement pour la valeur '22p7'
            value = value.replace('p7', '.7')
            return float(value)
        else:
            # Autres cas de traitement ou renvoyer NaN pour les valeurs non convertibles
            return float('nan')

def transform_dataframe(fusion_df,table_name):
    batch_size = 10000  # Nombre de lignes à traiter par lot
    num_batches = len(fusion_df) // batch_size + 1
    dataframes = []  # Liste pour stocker les dataframes de chaque lot
    path = f'{TABLE_PATH}{table_name}'
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=SettingWithCopyWarning)
        warnings.filterwarnings("ignore", category=UserWarning, module='pandas')

        for i in tqdm(range(num_batches), desc="Traitement des lots"):
            start_idx = i * batch_size
            end_idx = start_idx + batch_size
            batch_df = fusion_df.loc[start_idx:end_idx].copy()

            # Étape 1 : Filtrer les colonnes avec état == 'REALISE'
            batch_df = batch_df.query('etat == "REALISE"')
            # Étape 2 : Supprimer les colonnes 'gid' et 'hor_app'
            batch_df.drop(["gid", "hor_app"], axis=1, inplace=True)
            
            # Supprimer les valeurs non-finies dans la colonne "hor_theo" et "hor_real"
            batch_df.dropna(subset=["hor_theo"], inplace=True)
            batch_df.dropna(subset=["hor_real"], inplace=True)
            #Supprimer les valeurs non-finies dans la colonne "mdate"
            batch_df.dropna(subset=["mdate"], inplace=True)
            
            
            if batch_df.empty:
                continue  # Ignorer les lots vides
            # Étape 3 : Convertir les colonnes en datetime
            batch_df["hor_theo"] = pd.to_datetime(batch_df["hor_theo"], errors='coerce',utc=True)
            batch_df["hor_real"] = pd.to_datetime(batch_df["hor_real"], errors='coerce',utc=True)
            batch_df["mdate"] = pd.to_datetime(batch_df["mdate"], errors='coerce',utc=True)

            # Étape 4 : Ajouter les colonnes année, mois et jour de semaine
            batch_df["annee"] = -1
            batch_df["mois"] = -1
            batch_df["jour_semaine"] = ""
            
            #date = pd.to_datetime(batch_df["mdate"])
            date = batch_df["mdate"]
            try:
                batch_df["annee"] = date.dt.year
                batch_df["mois"] = date.dt.month
                batch_df["jour_semaine"] = date.dt.strftime("%A")
                # Étape 5 : Calculer le retard en minutes
                batch_df["retard"] = (batch_df["hor_real"] - batch_df["hor_theo"]).dt.total_seconds().astype(int) // 60
                # Étape 6 : Remplacer les retards supérieurs à 15 minutes ou inférieurs à -15 minutes par la moyenne totale du retard
                mean_retard = batch_df["retard"].mean()
                batch_df["retard"].fillna(mean_retard, inplace=True)
                batch_df.loc[(batch_df["retard"] > 15) | (batch_df["retard"] < -15), "retard"] = mean_retard
                # Convertir les colonnes appropriées en type numérique
                numeric_cols = ['tempsarret','rs_sv_arret_p']  # Remplacez par les noms des colonnes à convertir
                for col in numeric_cols:
                    batch_df[col] = batch_df[col].apply(custom_convert_to_float)

                #batch[numeric_cols] = batch[numeric_cols].apply(pd.to_numeric, errors='coerce')
                
                #Load table
                wr.s3.to_parquet(
                    df=batch_df,
                    path=path,
                    database=DATABASE_NAME,
                    dataset=True,
                    table=table_name,
                    mode='append',
                    index=False,
                    compression=None,
                )
            except (AttributeError,ValueError) as e:
                print(e)
                continue
                
            #print(batch_df)

            #dataframes.append(batch_df)

    #transformed_df = pd.concat(dataframes)  # Concaténer tous les dataframes en un seul

    return True


In [15]:
%%time
result_df = transform_dataframe(fusion_df,"DWH_SV_HORAI_A")

Traitement des lots:  96%|█████████▌| 6769/7059 [3:00:52<05:52,  1.22s/it]  

Cannot convert non-finite values (NA or inf) to integer


Traitement des lots: 100%|██████████| 7059/7059 [3:08:43<00:00,  1.60s/it]

CPU times: user 36min 4s, sys: 1min 59s, total: 38min 4s
Wall time: 3h 8min 43s





In [23]:
result_df

Unnamed: 0,hor_theo,etat,type,source,rs_sv_arret_p,rs_sv_cours_a,mdate,hor_real,tempsarret,retard,annee,mois,jour_semaine
760,2023-01-08 03:56:00+01:00,REALISE,REGULIER,SAEIV_TRAM,127916,2519775,2023-01-08T04:35:47+01:00,2023-01-08 04:31:14+01:00,20.0,-0.115363,2023,1,Sunday
761,2023-01-08 03:57:25+01:00,REALISE,REGULIER,SAEIV_TRAM,127918,2519775,2023-01-08T04:35:47+01:00,2023-01-08 04:32:39+01:00,20.0,-0.115363,2023,1,Sunday
762,2023-01-08 03:59:02+01:00,REALISE,REGULIER,SAEIV_TRAM,127920,2519775,2023-01-08T04:35:47+01:00,2023-01-08 04:34:14+01:00,22.0,-0.115363,2023,1,Sunday
763,2023-01-08 04:01:22+01:00,REALISE,REGULIER,SAEIV_TRAM,127922,2519775,2023-01-08T04:41:44+01:00,2023-01-08 04:36:34+01:00,20.0,-0.115363,2023,1,Sunday
764,2023-01-08 04:02:22+01:00,REALISE,REGULIER,SAEIV_TRAM,127922,2519772,2023-01-08T04:41:44+01:00,2023-01-08 04:37:14+01:00,20.0,-0.115363,2023,1,Sunday
...,...,...,...,...,...,...,...,...,...,...,...,...,...
989996,2023-01-29 19:55:00+01:00,REALISE,REGULIER,SAEIV_BUS,126006.0,2559387,2023-01-29T19:59:11+01:00,2023-01-29 19:58:19+01:00,8.0,3.000000,2023,1,Sunday
989997,2023-01-29 20:02:00+01:00,REALISE,REGULIER,SAEIV_BUS,129169.0,2563070,2023-01-29T19:59:11+01:00,2023-01-29 19:58:31+01:00,4.0,-4.000000,2023,1,Sunday
989998,2023-01-29 19:54:00+01:00,REALISE,REGULIER,SAEIV_BUS,126044.0,2558686,2023-01-29T19:59:11+01:00,2023-01-29 19:58:44+01:00,25.0,4.000000,2023,1,Sunday
989999,2023-01-29 19:54:00+01:00,REALISE,REGULIER,SAEIV_BUS,125170.0,2560541,2023-01-29T19:59:11+01:00,2023-01-29 19:58:17+01:00,22.0,4.000000,2023,1,Sunday


In [46]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from tqdm import tqdm
import warnings
from pandas.core.common import SettingWithCopyWarning
from multiprocessing import Pool, Manager

def process_batch(batch_df,output_list):
    warnings.filterwarnings("ignore", category=SettingWithCopyWarning)
    warnings.filterwarnings("ignore", category=UserWarning, module='pandas')

    # Étape 1 : Filtrer les colonnes avec état == 'REALISE'
    batch_df = batch_df.query('etat == "REALISE"')

    # Étape 2 : Supprimer les colonnes 'gid' et 'hor_app'
    batch_df.drop(["gid", "hor_app"], axis=1, inplace=True)
    
    # Supprimer les valeurs non-finies dans la colonne "hor_theo" et "hor_real"
    batch_df.dropna(subset=["hor_theo"], inplace=True)
    batch_df.dropna(subset=["hor_real"], inplace=True)

    # Étape 3 : Convertir les colonnes en datetime
    batch_df["hor_theo"] = pd.to_datetime(batch_df["hor_theo"], errors='coerce')
    batch_df["hor_real"] = pd.to_datetime(batch_df["hor_real"], errors='coerce')
    batch_df["mdate"] = pd.to_datetime(batch_df["mdate"], errors='coerce')

    # Étape 4 : Calculer le retard en minutes
    batch_df["retard"] = (batch_df["hor_real"] - batch_df["hor_theo"]).dt.total_seconds().astype(int) // 60

    # Étape 5 : Remplacer les retards supérieurs à 15 minutes ou inférieurs à -15 minutes par la moyenne totale du retard
    mean_retard = batch_df["retard"].mean()
    batch_df["retard"].fillna(mean_retard, inplace=True)
    batch_df.loc[(batch_df["retard"] > 15) | (batch_df["retard"] < -15), "retard"] = mean_retard

    # Étape 6 : Ajouter les colonnes année, mois et jour de semaine
    batch_df["annee"] = -1
    batch_df["mois"] = -1
    batch_df["jour_semaine"] = ""
            
    # Supprimer les valeurs non-finies dans la colonne "mdate"
    batch_df.dropna(subset=["mdate"], inplace=True)
            
    for index, row in batch_df.iterrows():
        try:
            date = pd.to_datetime(row["mdate"])
            batch_df.at[index, "annee"] = date.year
            batch_df.at[index, "mois"] = date.month
            batch_df.at[index, "jour_semaine"] = date.strftime("%A")
        except AttributeError:
            continue

    output_list.append(batch_df)  # Ajouter le batch_df à la liste partagée



def transform_dataframe(fusion_df):
    batch_size = 10000  # Nombre de lignes à traiter par lot
    num_batches = len(fusion_df) // batch_size + 1

    with Pool() as pool, Manager() as manager:
        output_list = manager.list()  # Liste partagée pour stocker les dataframes transformés

        for i in tqdm(range(num_batches), desc="Traitement des lots"):
            start_idx = i * batch_size
            end_idx = start_idx + batch_size
            batch_df = fusion_df.loc[start_idx:end_idx].copy()

            pool.apply_async(process_batch, (batch_df, output_list))

        pool.close()
        pool.join()

        transformed_df = pd.concat(output_list, ignore_index=True)  # Concaténer les dataframes de la liste

    return transformed_df


In [None]:
%%time
fusion_df = transform_dataframe(fusion_df)

Traitement des lots: 100%|██████████| 7059/7059 [00:22<00:00, 319.51it/s]


In [44]:
# Transformations à faire : 
# Drop gid/hor_app
# Ajouter colonne : retard
# Transformations de format de date
# Supprimer toutes les lignes en état NON_REALISE ou ANNULE
# mdate to datetime
# Ajouter les colonnes année/mois/jour de semaine (en se basant sur mdate)
# Dans la colonne retard quand c'est 