In [194]:
import pandas as pd
import pickle
import json
import numpy as n
import os
import dask.dataframe as dd
import csv

In [2]:
def get_siretdf_from_original_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Utilisation d'un dataframe intermediaire pour traiter les Siret unique

    Retour:
        - pd.DataFrame
    """

    dfSIRET = pd.DataFrame.copy(df[['idTitulaires', 'typeIdentifiant', 'denominationSociale']])
    dfSIRET = dfSIRET.drop_duplicates(subset=['idTitulaires'], keep='first')
    dfSIRET.reset_index(inplace=True, drop=True)
    dfSIRET.idTitulaires = dfSIRET.idTitulaires.astype(str)

    dfSIRET["idTitulaires"] = np.where(~dfSIRET["idTitulaires"].str.isdigit(), '00000000000000', dfSIRET.idTitulaires)

    dfSIRET.reset_index(inplace=True, drop=True)

    dfSIRET.rename(columns={
        "idTitulaires": "siret",
        "typeIdentifiant": "siren"}, inplace=True)
    dfSIRET.siren = dfSIRET.siret.str[:9] # 9 = taille du Siren
    dfSIRET.denominationSociale = dfSIRET.denominationSociale.astype(str)

    return dfSIRET
def getArchiveErrorSIRET() -> pd.DataFrame:
    """
    Récupération des siret erronés

    Retour:
        - pd.DataFrame
    """
    archiveErrorSIRET = pd.DataFrame(columns=['siret', 'siren', 'denominationSociale'])
    return archiveErrorSIRET
def get_enrichissement_insee(dfSIRET: pd.DataFrame, path_to_data: str) -> list:
    """
    Ajout des informations Adresse/Activité des entreprises via la base siren Insee

    Retour:
        - list:
            - list[0]: pd.DataFrame -- données principales
            - list[1]: pd.DataFrame -- données ou le SIRET n'est pas renseigné
    """
    # dans StockEtablissement_utf8, il y a principalement : siren, siret, nom établissement, adresse, activité principale
    path = os.path.join(path_to_data, conf_data["base_sirene_insee"])
    columns = [
        'siren',
        'nic',
        'siret',
        'typeVoieEtablissement',
        'libelleVoieEtablissement',
        'codePostalEtablissement',
        'libelleCommuneEtablissement',
        'codeCommuneEtablissement',
        'activitePrincipaleEtablissement',
        'nomenclatureActivitePrincipaleEtablissement']  # Colonne à utiliser dans la base Siren
    dtypes = {
        'siret': 'string',
        'typeVoieEtablissement': 'string',
        'libelleVoieEtablissement': 'string',
        'codePostalEtablissement': 'string',
        'libelleCommuneEtablissement': 'string',
    }

    result = pd.DataFrame(columns=columns)
    chunksize = 1000000
    for gm_chunk in pd.read_csv(path, chunksize=chunksize, sep=',', encoding='utf-8', usecols=columns, dtype=dtypes):
        resultTemp = pd.merge(dfSIRET['siret'], gm_chunk, on=['siret'], copy=False)
        result = pd.concat([result, resultTemp], axis=0, copy=False)
        del resultTemp
    result = result.drop_duplicates(subset=['siret'], keep='first')

    enrichissement_insee_siret = pd.merge(dfSIRET, result, how='outer', on=['siret'], copy=False)
    enrichissement_insee_siret.rename(columns={"siren_x": "siren"}, inplace=True)
    enrichissement_insee_siret.drop(columns=["siren_y"], axis=1, inplace=True)
    nanSiret = enrichissement_insee_siret[enrichissement_insee_siret.activitePrincipaleEtablissement.isnull()]
    enrichissement_insee_siret = enrichissement_insee_siret[
        enrichissement_insee_siret.activitePrincipaleEtablissement.notnull()]
    nanSiret = nanSiret.loc[:, ["siret", "siren", "denominationSociale"]]

    # Concaténation des deux resultats
    enrichissementInsee = enrichissement_insee_siret

    temp_df = pd.merge(nanSiret, result, indicator=True, how="outer", on='siren', copy=False)
    del result
    nanSiret = temp_df[temp_df['activitePrincipaleEtablissement'].isnull()]
    nanSiret = nanSiret.iloc[:, :3]
    nanSiret.reset_index(inplace=True, drop=True)

    return [enrichissementInsee, nanSiret]


In [3]:
with open('df_nettoye', 'rb') as df_nettoye:
    df = pickle.load(df_nettoye)

In [4]:
dfSIRET = get_siretdf_from_original_data(df)
archiveErrorSIRET = getArchiveErrorSIRET()

### Vestige de code avec DaskDataFrames, que j'avais reussi a faire marcher sur l'autre PC avant que ça crash. 

In [60]:
columns = [
    'siren',
    'nic',
    'siret',
    'typeVoieEtablissement',
    'libelleVoieEtablissement',
    'codePostalEtablissement',
    'libelleCommuneEtablissement',
    'codeCommuneEtablissement',
    'activitePrincipaleEtablissement',
    'nomenclatureActivitePrincipaleEtablissement']  # Colonne à utiliser dans la base Siren
dtypes = {
    'siret': 'string',
    'typeVoieEtablissement': 'string',
    'libelleVoieEtablissement': 'string',
    'codePostalEtablissement': 'string',
    'libelleCommuneEtablissement': 'string',
    'codeCommuneEtablissement': 'object',
}
ddcache = dd.read_csv(r"/home/gaspard/Documents/decp-augmente/data/StockEtablissement_utf8.csv", sep=',', encoding='utf-8', usecols=columns, dtype=dtypes)
ddtest = ddcache.map_partitions( find_missing_siret, dfSIRET, dfcache=pd.DataFrame(data={"siret": ["empty_cache"]}))

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.. If you don't want the partitions to be aligned, and are calling `map_partitions` directly, pass `align_dataframes=False`.

### FIn du vestige

Est ce append ou concat est le plsu rapide ? <br>
Append semble être un chouilla plus rapide <br>
Même si ça va à l'inverse de ce qui est dit sur internet, mais notre cas d'utilisation est un peu différent. <br>
Mais c'est vraiment négligeable ici

In [132]:
dfcachee = loading_cache(r"/home/gaspard/Documents/decp-augmente/cache/cache_StockEtablissement_utf8.csv")

In [126]:
dfcache = actualiser_cache(dfSIRET, r"/home/gaspard/Documents/decp-augmente/data/StockEtablissement_utf8.csv", dfcache=pd.DataFrame())

  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())
  dfcache = dfcache.append(gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.siret.tolist())].copy())


In [273]:
def split_on_column_match(dfSIRET: pd.DataFrame, dfcache: pd.DataFrame, column: str):
    """
    Arguments
    -------------
    Two dataframes
    
    
    Returns
    -----------
    Two series, the second one is the object in column are matching in both dataframes (so the elements in cache), the first one is the one not matching n column.
    """
    boolean_mask = dfSIRET.loc[:, str(column)].isin(dfcache.loc[:, str(column)].tolist())
    return dfSIRET.loc[~boolean_mask, str(column)].copy(), dfSIRET.loc[boolean_mask, str(column)].copy()

In [282]:
def loading_cache(path_to_cache):
    with open(path_to_cache, 'rb') as df_cache:
        df = pickle.load(df_cache)
    return df

def actualiser_cache(dfSiret_to_add, path_to_db, dfcache, chunksize=1000000):
    """
    Arguments
    ----------
    dfSiret_to_add (Series) avec une unique colonne

    Returns
    --------------
    

    La fonction parcourt la bdd Insee par chunk. Pour chaque chunk on regarde si il y a des correspondances de siret entre dfSiret_to_add et la bdd insee.
    Si il y a un match, on ajoute alors les lignes de la bdd insee au dataframe cache. Sinon c'est que les siret sont à la fois valide, mais non présent dans le cache.
    On les sépare (on retire ceux trouvé de dfSIRET_to_add) pour pouvoir les mettre dans un second cache.
    """

    for gm_chunk in pd.read_csv(path_to_db, chunksize=chunksize, sep=',', encoding='utf-8', usecols=columns, dtype=dtypes):
        # Ajouter à df cache les infos qu'il faut
        matching = gm_chunk.loc[gm_chunk.siret.isin(dfSiret_to_add.tolist())].copy() # La copie du dataframe qui match parmis le chunk en cours
        dfSiret_to_add = dfSiret_to_add[~dfSiret_to_add.isin(matching.siret.tolist())] 
        dfcache = dfcache.append(matching)
    return dfcache, dfSiret_to_add
def write_cache(dfcache, path_to_cache):
    with open(path_to_cache, 'wb') as pathcache:
        pickle.dump(dfcache, pathcache)
        print('cache ecrit')
    return None

In [283]:
def is_luhn_valid(x: int) -> bool:
    """
    Application de la formule de Luhn à un nombre
    Permet la verification du numero SIREN et Siret d'un acheteur/etablissement

    Retour:
        - bool
    """
    try:
        luhn_corr = [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]
        list_number_in_x = [int(i) for i in list(str(x))]
        l2 = [luhn_corr[i] if (index + 1) % 2 == 0 else i for index, i in enumerate(list_number_in_x[::-1])]
        if sum(l2) % 10 == 0:
            return True
        elif str(x)[:9] == "356000000":  # SIREN de la Poste
            if sum(list_number_in_x) % 5 == 0:
                return True
        return False
    except:
        return False

In [465]:
def enrichissement_insee_cache(dfSIRET: pd.DataFrame, path_to_data: str, path_to_cache_bdd: str, path_to_cache_not_inbdd: str) -> list:
    """
    Ajout des informations Adresse/Activité des entreprises via la base siren Insee par un système de double cache.
    Pour bien comprendre la fonction il y a plusieurs cas possibles concernant un SIRET. Il y a le cas où le siret est valide et match avec la bdd insee (on gère ça avec le premier cache)
    Le cas où le siret est invalide ou OOOOOOOO (siret artificiel inscrit en amont dans enrichissement.py) il n'y a aucune chance de trouver ça dans la bdd insee donc.
    Le dernier cas où un siret est valide mais pas présent en bdd, pour ceux-ci on créé un second cache.
    Dans un cache (dfcache) sont stockés les informations en rpovenance de la bdD insee que l'on gère 
    Dans le second cache (list_siret_not_found) sont stockés les siret valides que l'on doit gérer mais qui ne sotn pas dans la bdd insee

    Arguments
    -----------------
    dfSIRET 
    path_to_data chemin vers le fichier csv de la BdD Insee Etablissement_utf8
    path_to_cache_bdd chemin vers le cache du fichier csv de la BdD Insee 'Etablissement_utf8'
    path_to_cache_not_inbdd:  chemin vers le cache des siret non trouvé dans le fichier csv de la BdD Insee Etablissement_utf8


    Returns
    --------------
        - list:
            - list[0]: pd.DataFrame -- données principales
            - list[1]: pd.DataFrame -- données où le SIRET n'est pas renseigné

    """
    columns = [
        'siren',
        'nic',
        'siret',
        'typeVoieEtablissement',
        'libelleVoieEtablissement',
        'codePostalEtablissement',
        'libelleCommuneEtablissement',
        'codeCommuneEtablissement',
        'activitePrincipaleEtablissement',
        'nomenclatureActivitePrincipaleEtablissement']  # Colonne à utiliser dans la base Siren
    dtypes = {
        'siret': 'string',
        'typeVoieEtablissement': 'string',
        'libelleVoieEtablissement': 'string',
        'codePostalEtablissement': 'string',
        'libelleCommuneEtablissement': 'string',
        'codeCommuneEtablissement': 'object',
    }
    # Traitement pour le cache. Si le siret n'est pas valide ou non renseigné, on va aller chercher dans le cache. Or on veut pas ça, donc on le gère en amont du cache.
    # Ceux qui ont un siret non valide on les vire de df SIRET, on les récupèrera plus tard.
    mask_siret_nonvalide = (~dfSIRET.siret.apply(is_luhn_valid)) | (dfSIRET.siret == '00000000000000')
    dfSIRET_siret_nonvalide = dfSIRET[mask_siret_nonvalide]
    dfSIRET = dfSIRET[~mask_siret_nonvalide]
    
    # Traitons les caches maintenant

    #Le second cache des siret valide not found est traité en premier.
    cache_siret_not_found_exist = os.path.isfile(path_to_cache_not_inbdd)
    if cache_siret_not_found_exist:
        list_siret_not_found = loading_cache(path_to_cache_not_inbdd)
    else:
        list_siret_not_found = []
    

    mask_siret_valide_notfound = dfSIRET.siret.isin(list_siret_not_found)
    dfSIRET_valide_notfound = dfSIRET[mask_siret_valide_notfound]
    # On retire les siret valides mais non trouvés lors des précédents passages du df.
    dfSIRET = dfSIRET[~mask_siret_valide_notfound]
    
    cache_exist = os.path.isfile(path_to_cache_bdd)
    if cache_exist:
        print('loading cache')
        dfcache = loading_cache(path_to_cache_bdd)
        # regarder les siret dans le cache, ceux pas dans le cache on va passer à travers la bdd pour les trouver. Ceux qui ne sont pas dans la BdD sont sauvés dans un 2e cache.
        seriesSIRETnotincache, seriesSIRETincache = split_on_column_match(dfSIRET, dfcache, column="siret")
        besoin_actualiser_cache = not(seriesSIRETnotincache.empty)

        if besoin_actualiser_cache:
            print('actualiser_cache')
            # Ceux pas dans le cache, ajouter au cache leur correspondant bddinsee
            dfcache, seriessiret_valide_mais_not_found_in_bdd = actualiser_cache(seriesSIRETnotincache, path_to_data, dfcache)
            dfcache = dfcache.drop_duplicates(subset=['siret'], keep='first')
            #Update cache de la lsite des sirets valides mais non trouvés
            list_siret_not_found += seriessiret_valide_mais_not_found_in_bdd.tolist()
            
            # Actualise les caches
            write_cache(dfcache, path_to_cache_bdd)
            write_cache(list_siret_not_found, path_to_cache_not_inbdd)
            
    else:
        print('création cache')
        #dfSIRET_to_add = split_on_column_match(dfSIRET, pd.DataFrame(data={"siret": ["empty_cache"]}), column="siret")
        # crécupérer le dataframe correspondant au cache
        dfcache, seriessiret_valide_mais_not_found_in_bdd = actualiser_cache(dfSIRET.siret, path_to_data, dfcache=pd.DataFrame())
        dfcache = dfcache.drop_duplicates(subset=['siret'], keep='first')
        # Créer les cache
        write_cache(dfcache, path_to_cache_bdd)
        write_cache(seriessiret_valide_mais_not_found_in_bdd.tolist(), path_to_cache_not_inbdd)
    enrichissement_insee_siret = pd.merge(dfSIRET, dfcache, how='outer', on=['siret'], copy=False)
    enrichissement_insee_siret.rename(columns={"siren_x": "siren"}, inplace=True)
    enrichissement_insee_siret.drop(columns=["siren_y"], axis=1, inplace=True)
    nanSiret = pd.concat([enrichissement_insee_siret[enrichissement_insee_siret.activitePrincipaleEtablissement.isnull()], dfSIRET_siret_nonvalide, dfSIRET_valide_notfound])
    enrichissement_insee_siret = enrichissement_insee_siret[
        enrichissement_insee_siret.activitePrincipaleEtablissement.notnull()]
    nanSiret = nanSiret.loc[:, ["siret", "siren", "denominationSociale"]]

    # Concaténation des deux resultats
    enrichissementInsee = enrichissement_insee_siret
    nanSiret = nanSiret.iloc[:, :3]
    nanSiret.reset_index(inplace=True, drop=True)

    return [enrichissementInsee, nanSiret]
    

In [424]:
def get_enrichissement_insee(dfSIRET: pd.DataFrame, path_to_data: str) -> list:
    """
    Ajout des informations Adresse/Activité des entreprises via la base siren Insee

    Retour:
        - list:
            - list[0]: pd.DataFrame -- données principales
            - list[1]: pd.DataFrame -- données ou le SIRET n'est pas renseigné
    """
    # dans StockEtablissement_utf8, il y a principalement : siren, siret, nom établissement, adresse, activité principale
    path = path_to_data
    columns = [
        'siren',
        'nic',
        'siret',
        'typeVoieEtablissement',
        'libelleVoieEtablissement',
        'codePostalEtablissement',
        'libelleCommuneEtablissement',
        'codeCommuneEtablissement',
        'activitePrincipaleEtablissement',
        'nomenclatureActivitePrincipaleEtablissement']  # Colonne à utiliser dans la base Siren
    dtypes = {
        'siret': 'string',
        'typeVoieEtablissement': 'string',
        'libelleVoieEtablissement': 'string',
        'codePostalEtablissement': 'string',
        'libelleCommuneEtablissement': 'string',
    }

    result = pd.DataFrame(columns=columns)
    chunksize = 1000000
    for gm_chunk in pd.read_csv(path, chunksize=chunksize, sep=',', encoding='utf-8', usecols=columns, dtype=dtypes):
        resultTemp = pd.merge(dfSIRET['siret'], gm_chunk, on=['siret'], copy=False)
        result = pd.concat([result, resultTemp], axis=0, copy=False)
        del resultTemp
    result = result.drop_duplicates(subset=['siret'], keep='first')

    enrichissement_insee_siret = pd.merge(dfSIRET, result, how='outer', on=['siret'], copy=False)
    enrichissement_insee_siret.rename(columns={"siren_x": "siren"}, inplace=True)
    enrichissement_insee_siret.drop(columns=["siren_y"], axis=1, inplace=True)
    nanSiret = enrichissement_insee_siret[enrichissement_insee_siret.activitePrincipaleEtablissement.isnull()]
    print('Premier nanSiret \n', nanSiret)
    enrichissement_insee_siret = enrichissement_insee_siret[
        enrichissement_insee_siret.activitePrincipaleEtablissement.notnull()]
    nanSiret = nanSiret.loc[:, ["siret", "siren", "denominationSociale"]]

    # Concaténation des deux resultats
    enrichissementInsee = enrichissement_insee_siret
    print(enrichissement_insee_siret)
    temp_df = pd.merge(nanSiret, result, indicator=True, how="outer", on='siren', copy=False)
    del result
    nanSiret = temp_df[temp_df['activitePrincipaleEtablissement'].isnull()]
    nanSiret = nanSiret.iloc[:, :3]
    nanSiret.reset_index(inplace=True, drop=True).drop_duplicates(subset=['siret'], keep='first')

    return [enrichissementInsee, nanSiret]

In [414]:
unpack_no_cache_l = get_enrichissement_insee(dfSIRET,r'/home/gaspard/Documents/decp-augmente/data/StockEtablissement_utf8.csv')

  for gm_chunk in pd.read_csv(path, chunksize=chunksize, sep=',', encoding='utf-8', usecols=columns, dtype=dtypes):


KeyboardInterrupt: 

In [466]:
unpack_l = enrichissement_insee_cache(dfSIRET, r'/home/gaspard/Documents/decp-augmente/data/StockEtablissement_utf8.csv', r"/home/gaspard/Documents/decp-augmente/cache/cache_StockEtablissement_utf8.csv", r"/home/gaspard/Documents/decp-augmente/cache/cache_NOTIN_StockEtablissement_utf8.csv")

loading cache
                siret      siren denominationSociale
15144  95651314700001  956513147             DECITRE
                siret      siren denominationSociale  nic  \
15144  95651314700001  956513147             DECITRE  NaN   

      typeVoieEtablissement libelleVoieEtablissement codePostalEtablissement  \
15144                  <NA>                     <NA>                    <NA>   

      libelleCommuneEtablissement codeCommuneEtablissement  \
15144                        <NA>                      NaN   

      activitePrincipaleEtablissement  \
15144                             NaN   

      nomenclatureActivitePrincipaleEtablissement  
15144                                         NaN  


# Opti enrichissement_acheteur