# Imports

In [1]:
import os, shutil
import time
import sqlite3
import csv
import json
import logging
from psutil import virtual_memory
import pandas as pd
import requests
from pprint import pprint
from elasticsearch_dsl import Index, connections
from elasticsearch import helpers
import logging

In [2]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
def mem():
    print(f'used memory : {round(virtual_memory()[3]/(1024*1024*1024)*10)/10}Go')

In [None]:
def stats(): 
    print("--- %s seconds ---" % (time.time() - start_time))
    mem()

In [None]:
def global_stats(): 
    print("--- %s seconds ---" % (time.time() - global_start_time))
    mem()

In [None]:
def unique_list(l):
    if l is None:
        return None
    ulist = []
    [ulist.append(x) for x in l if x not in ulist]
    return ulist

def unique_string(a):
    if a is None:
        return a
    return ' '.join(unique_list(a.strip().split(','))).strip()

def unique_seperated_string(a):
    if a is None:
        return a
    return ', '.join(unique_list(a.strip().split(','))).strip()

In [None]:
def get_string(string):
    if string is None:
        return ""
    return string

In [None]:
mem()

In [None]:
global_start_time = time.time()

In [None]:
print('RAM memory % used:', virtual_memory()[2])

# Path set-up

In [3]:
if "DATA_DIR" not in locals():
    DATA_DIR = "./data/"
else:
    print(DATA_DIR)

if os.path.exists(DATA_DIR) and os.path.isdir(DATA_DIR):
    shutil.rmtree(DATA_DIR)
os.makedirs(os.path.dirname(DATA_DIR), exist_ok=True)

In [4]:
DATA_DIR = "./data/"

In [5]:
if "OUTPUT_DATA_FOLDER" not in locals():
    OUTPUT_DATA_FOLDER = "./output/"
else:
    print(OUTPUT_DATA_FOLDER)

if os.path.exists(OUTPUT_DATA_FOLDER) and os.path.isdir(OUTPUT_DATA_FOLDER):
    shutil.rmtree(OUTPUT_DATA_FOLDER)
os.makedirs(os.path.dirname(OUTPUT_DATA_FOLDER), exist_ok=True)

# Prepare Data Enrichment 

## Nom complet

In [None]:
def create_nom_complet(nature_juridique_unite_legale=None, nom=None, nom_usage=None, nom_raison_sociale=None, sigle=None, prenom=None):    
    is_auto_entrepreneur = nature_juridique_unite_legale == "1000"
    
    if is_auto_entrepreneur:
        formatted_nom_usage = " " + nom_usage if nom_usage else ""
        formatted_sigle = sigle if sigle else ""
        
        if (prenom is None and nom is None):
            return None
        if nom_usage is None:
            return f"{prenom} {nom} ({formatted_sigle})".lower()
        else:
            return f"{prenom} {formatted_nom_usage}({nom}, {formatted_sigle})".lower()
            
    else:
        if nom_raison_sociale is None and sigle is None:
            return None
        else:
            formatted_sigle = f" ({sigle})" if sigle else ""
            return f"{nom_raison_sociale}{formatted_sigle}".lower()

## Entrepreneur Individuel

In [None]:
def create_entrepreneur_individuel(nature_juridique_unite_legale):
    if nature_juridique_unite_legale in ["1", "10", "1000"]:
        return 'true'
    else:
        return 'false'

## Section activité principale

In [None]:
sections_NAF = {
"01":"A","02":"A","03":"A","05":"B","06":"B","07":"B","08":"B","09":"B","10":"C","11":"C","12":"C","13":"C","14":"C",
 "15":"C","16":"C","17":"C","18":"C","19":"C","20":"C","21":"C","22":"C","23":"C","24":"C","25":"C","26":"C","27":"C",
 "28":"C","29":"C","30":"C","31":"C","32":"C","33":"C","35":"D","36":"E","37":"E","38":"E","39":"E","41":"F","42":"F",
 "43":"F","45":"G","46":"G","47":"G","49":"H","50":"H","51":"H","52":"H","53":"H","55":"I","56":"I","58":"J","59":"J",
 "60":"J","61":"J","62":"J","63":"J","64":"K","65":"K","66":"K","68":"L","69":"M","70":"M","71":"M","72":"M","73":"M",
 "74":"M","75":"M","77":"N","78":"N","79":"N","80":"N","81":"N","82":"N","84":"O","85":"P","86":"Q","87":"Q","88":"Q",
 "90":"R","91":"R","92":"R","93":"R","94":"S","95":"S","96":"S","97":"T","98":"T","99":"U"
}

In [None]:
def create_section(activite_principale_unite_legale):
    if activite_principale_unite_legale is not None:
        code_naf = activite_principale_unite_legale[:2]
        section_activite_principale = sections_NAF[code_naf] if code_naf in sections_NAF else None
        return section_activite_principale
    else:
        return None

## Adresse complète

In [None]:
def create_adresse_complete(complement_adresse, numero_voie, indice_repetition, type_voie, libelle_voie, libelle_commune, libelle_cedex, distribution_speciale, commune, cedex, libelle_commune_etranger, libelle_pays_etranger):    
    col_list = [complement_adresse, numero_voie, indice_repetition, type_voie, libelle_voie, distribution_speciale]
    adresse = ""
    for column in col_list:
        if(column !=''):
            adresse = adresse + " " + column if column is not None else ""
    if cedex == "":
        if commune == "":
            adresse =  adresse
        else:
            adresse = adresse + " " + commune + " " + libelle_commune
    else:
        adresse = adresse + " " + cedex + " " + libelle_cedex
    etranger_list = [libelle_commune_etranger, libelle_pays_etranger]
    for column in etranger_list:
        if(column != ""):
            adresse = adresse + " " + column if column != "" else ""
    return adresse.strip()

## Département

In [None]:
def create_departement(commune):
    departement = str(commune)[:3] if str(commune)[:2]=="97" else (None if commune is None else str(commune)[:2])
    return departement

## Coordonnées

In [None]:
def create_coordonnees(longitude, latitude):
    coordonnees = None if (longitude is None) or (latitude is None) else f"{latitude},{longitude}"
    return coordonnees

## Liste enseigne

In [None]:
def create_list_enseignes(enseigne_1=None, enseigne_2=None, enseigne_3=None, nom_commercial=None):
    return list(filter(None,set([enseigne_1, enseigne_2, enseigne_3, nom_commercial])))

# SQLITE


## Set-up

## Do this in workflow Airflow.......

In [None]:
if os.path.exists(DATA_DIR+'sirene.db'):
    os.remove(DATA_DIR+'sirene.db')

In [None]:
connection = sqlite3.connect(DATA_DIR+'sirene.db')
cursor = connection.cursor()

## Unité Légale

In [None]:
cursor.execute(f'''DROP TABLE IF EXISTS unite_legale''')
cursor.execute('''
    CREATE TABLE IF NOT EXISTS unite_legale
    (
        siren,
        date_creation_unite_legale,
        sigle,
        prenom,
        identifiant_association_unite_legale,
        tranche_effectif_salarie_unite_legale,
        date_mise_a_jour_unite_legale,
        categorie_entreprise,
        etat_administratif_unite_legale,
        nom,
        nom_usage,
        nom_raison_sociale,
        nature_juridique_unite_legale,
        activite_principale_unite_legale,
        economie_sociale_solidaire_unite_legale
    )
''')

In [None]:
cursor.execute('''
                CREATE UNIQUE INDEX index_siren
                ON unite_legale (siren);
                ''')

In [None]:
connection.commit()

In [None]:
start_time = time.time()
url = 'https://files.data.gouv.fr/insee-sirene/StockUniteLegale_utf8.zip'
r = requests.get(url, allow_redirects=True)
open(DATA_DIR+'StockUniteLegale_utf8.zip', 'wb').write(r.content)
shutil.unpack_archive(DATA_DIR+'StockUniteLegale_utf8.zip', DATA_DIR)
stats()

In [None]:
df_iterator = pd.read_csv(
    DATA_DIR+'StockUniteLegale_utf8.csv', 
    chunksize=100000,
    dtype=str)

In [None]:
start_time = time.time()

for i, df_unite_legale in enumerate(df_iterator):
    df_unite_legale = df_unite_legale[[
            "siren",
            "dateCreationUniteLegale",
            "sigleUniteLegale",
            "prenom1UniteLegale",
            "identifiantAssociationUniteLegale",
            "trancheEffectifsUniteLegale",
            "dateDernierTraitementUniteLegale",
            "categorieEntreprise",
            "etatAdministratifUniteLegale",
            "nomUniteLegale",
            "nomUsageUniteLegale",
            "denominationUniteLegale",
            "categorieJuridiqueUniteLegale",
            "activitePrincipaleUniteLegale",
            "economieSocialeSolidaireUniteLegale",
        ]]
    # Rename columns
    df_unite_legale = df_unite_legale.rename(
        columns={
            "dateCreationUniteLegale": "date_creation_unite_legale",
            "sigleUniteLegale": "sigle",
            "prenom1UniteLegale": "prenom",
            "trancheEffectifsUniteLegale": "tranche_effectif_salarie_unite_legale",
            "dateDernierTraitementUniteLegale": "date_mise_a_jour_unite_legale",
            "categorieEntreprise": "categorie_entreprise",
            "etatAdministratifUniteLegale":"etat_administratif_unite_legale",
            "nomUniteLegale": "nom",
            "nomUsageUniteLegale": "nom_usage",
            "denominationUniteLegale": "nom_raison_sociale",
            "categorieJuridiqueUniteLegale": "nature_juridique_unite_legale",
            "activitePrincipaleUniteLegale": "activite_principale_unite_legale",
            "economieSocialeSolidaireUniteLegale":"economie_sociale_solidaire_unite_legale",
            "identifiantAssociationUniteLegale":"identifiant_association_unite_legale",
        }
    )    
    df_unite_legale.to_sql("unite_legale", connection, if_exists='append', index=False)

stats()
del df_unite_legale

In [None]:
connection.commit()

In [None]:
for row in cursor.execute("select count() from unite_legale"):
    print(row)

In [None]:
for item in cursor.execute('PRAGMA table_info(unite_legale)'):
    print (item)

In [None]:
for row in cursor.execute("SELECT * FROM unite_legale WHERE siren='965706690' LIMIT 10;"):
    print(row)

In [None]:
cursor.execute('SELECT * FROM unite_legale LIMIT 1')
names = [description[0] for description in cursor.description]
rows = cursor.fetchall()
for row in rows:
    for name,val in zip(names,row):
        print(f"{name}: {val}")

## Établissements

In [None]:
# Create list of departement zip codes
all_deps = [
    *"-0".join(list(str(x) for x in range(0, 10))).split("-")[1:],
    *list(str(x) for x in range(10, 20)),
    *["2A", "2B"],
    *list(str(x) for x in range(21, 96)),
    *"-7510".join(list(str(x) for x in range(0, 10))).split("-")[1:],
    *"-751".join(list(str(x) for x in range(10, 21))).split("-")[1:],
    *["971", "972", "973", "974", "976"],
    *[""],
]
# Remove Paris zip code
all_deps.remove("75")

In [None]:
cursor.execute(f'''DROP TABLE IF EXISTS siret''')
cursor.execute(f'''CREATE TABLE IF NOT EXISTS siret
        (
        id INTEGER NOT NULL PRIMARY KEY,
        siren,
        siret,
        date_creation,
        tranche_effectif_salarie,
        activite_principale_registre_metier,
        is_siege,
        numero_voie,
        type_voie,
        libelle_voie,
        code_postal,
        libelle_cedex,
        libelle_commune,
        commune,
        complement_adresse,
        complement_adresse_2,
        numero_voie_2,
        indice_repetition_2,
        type_voie_2,
        libelle_voie_2,
        commune_2,
        libelle_commune_2,
        cedex_2,
        libelle_cedex_2,
        cedex,
        date_debut_activite,
        distribution_speciale,
        distribution_speciale_2,
        etat_administratif_etablissement,
        enseigne_1,
        enseigne_2,
        enseigne_3,
        activite_principale,
        indice_repetition,
        nom_commercial,
        libelle_commune_etranger,
        code_pays_etranger,
        libelle_pays_etranger,
        libelle_commune_etranger_2,
        code_pays_etranger_2,
        libelle_pays_etranger_2,
        longitude,
        latitude,
        geo_adresse,
        geo_id)
        ''')

In [None]:
%%time
# Upload geo data by departement
for dep in all_deps:
    start_time = time.time()
    url = "https://files.data.gouv.fr/geo-sirene/last/dep/geo_siret_" + dep + ".csv.gz"
    print(url)
    df_dep = pd.read_csv(
        url,
        compression="gzip",
        dtype=str,
        usecols=[
            "siren",
            "siret",
            "dateCreationEtablissement",
            "trancheEffectifsEtablissement",
            "activitePrincipaleRegistreMetiersEtablissement",
            "etablissementSiege",
            "numeroVoieEtablissement",
            "libelleVoieEtablissement",
            "codePostalEtablissement",
            "libelleCommuneEtablissement",
            "libelleCedexEtablissement",
            "typeVoieEtablissement",
            "codeCommuneEtablissement",
            "codeCedexEtablissement",
            "complementAdresseEtablissement",
            "distributionSpecialeEtablissement",
            "complementAdresse2Etablissement",
            "indiceRepetition2Etablissement",
            "libelleCedex2Etablissement",
            "codeCedex2Etablissement",
            "numeroVoie2Etablissement",
            "typeVoie2Etablissement",
            "libelleVoie2Etablissement",
            "codeCommune2Etablissement",
            "libelleCommune2Etablissement",
            "distributionSpeciale2Etablissement",
            "dateDebut",
            "etatAdministratifEtablissement",
            "enseigne1Etablissement",
            "enseigne1Etablissement",
            "enseigne2Etablissement",
            "enseigne3Etablissement",
            "denominationUsuelleEtablissement",
            "activitePrincipaleEtablissement",
            "geo_adresse",
            "geo_id",
            "longitude",
            "latitude",
            "indiceRepetitionEtablissement",
            "libelleCommuneEtrangerEtablissement",
            "codePaysEtrangerEtablissement",
            "libellePaysEtrangerEtablissement",
            "libelleCommuneEtranger2Etablissement",
            "codePaysEtranger2Etablissement",
            "libellePaysEtranger2Etablissement",
        ],
    )
    df_dep = df_dep.rename(
        columns={
            "dateCreationEtablissement": "date_creation",
            "trancheEffectifsEtablissement": "tranche_effectif_salarie",
            "activitePrincipaleRegistreMetiersEtablissement": "activite_principale_registre_metier",
            "etablissementSiege": "is_siege",
            "numeroVoieEtablissement": "numero_voie",
            "typeVoieEtablissement": "type_voie",
            "libelleVoieEtablissement": "libelle_voie",
            "codePostalEtablissement": "code_postal",
            "libelleCedexEtablissement": "libelle_cedex",
            "libelleCommuneEtablissement": "libelle_commune",
            "codeCommuneEtablissement": "commune",
            "complementAdresseEtablissement": "complement_adresse",
            "complementAdresse2Etablissement": "complement_adresse_2",
            "numeroVoie2Etablissement": "numero_voie_2",
            "indiceRepetition2Etablissement": "indice_repetition_2",
            "typeVoie2Etablissement": "type_voie_2",
            "libelleVoie2Etablissement": "libelle_voie_2",
            "codeCommune2Etablissement": "commune_2",
            "libelleCommune2Etablissement": "libelle_commune_2",
            "codeCedex2Etablissement": "cedex_2",
            "libelleCedex2Etablissement": "libelle_cedex_2",
            "codeCedexEtablissement": "cedex",
            "dateDebut": "date_debut_activite",
            "distributionSpecialeEtablissement": "distribution_speciale",
            "distributionSpeciale2Etablissement": "distribution_speciale_2",
            "etatAdministratifEtablissement": "etat_administratif_etablissement",
            "enseigne1Etablissement": "enseigne_1",
            "enseigne2Etablissement": "enseigne_2",
            "enseigne3Etablissement": "enseigne_3",
            "activitePrincipaleEtablissement": "activite_principale",
            "indiceRepetitionEtablissement": "indice_repetition",
            "denominationUsuelleEtablissement": "nom_commercial",
            "libelleCommuneEtrangerEtablissement": "libelle_commune_etranger",
            "codePaysEtrangerEtablissement": "code_pays_etranger",
            "libellePaysEtrangerEtablissement": "libelle_pays_etranger",
            "libelleCommuneEtranger2Etablissement": "libelle_commune_etranger_2",
            "codePaysEtranger2Etablissement": "code_pays_etranger_2",
            "libellePaysEtranger2Etablissement": "libelle_pays_etranger_2",
        }
    )
    stats()
    start_time = time.time()
    df_dep.to_sql("siret", connection, if_exists='append', index=False)
    connection.commit()
    stats()
del df_dep

In [None]:
for row in cursor.execute("select count() from siret"):
    print(row)

In [None]:
for row in cursor.execute('SELECT * FROM siret LIMIT 2;'):
    print(row)

In [None]:
cursor.execute('SELECT * FROM siret LIMIT 1')
names = [description[0] for description in cursor.description]
rows = cursor.fetchall()
for row in rows:
    for name,val in zip(names,row):
        print(f"{name}: {val}")

In [None]:
connection.commit()

### Nombre d'établissements

In [None]:
# create a count table
cursor.execute(f'''DROP TABLE IF EXISTS count_etab''')
cursor.execute('''CREATE TABLE count_etab (siren VARCHAR(10), count INTEGER)''')
# create index
cursor.execute('''
                CREATE UNIQUE INDEX index_count_siren
                ON count_etab (siren);
                ''')
connection.commit()

In [None]:
start_time = time.time()
# Add etab count
cursor.execute('''INSERT INTO count_etab (siren, count) SELECT siren, count(*) as count FROM siret GROUP BY siren;''')
stats()

In [None]:
for row in cursor.execute("select count() from count_etab"):
    print(row)

In [None]:
for row in cursor.execute('SELECT * FROM count_etab LIMIT 5;'):
    print(row)

### Nombre d'établissements ouverts

In [None]:
cursor.execute(f'''DROP TABLE IF EXISTS count_etab_ouvert''')
cursor.execute('''CREATE TABLE count_etab_ouvert (siren VARCHAR(10), count INTEGER)''')
cursor.execute('''
                CREATE UNIQUE INDEX index_count_ouvert_siren
                ON count_etab_ouvert (siren);
                ''')
connection.commit()

In [None]:
start_time = time.time()
# Add etab count
cursor.execute('''INSERT INTO count_etab_ouvert (siren, count) SELECT siren, count(*) as count FROM siret WHERE etat_administratif_etablissement = 'A' GROUP BY siren;''')
stats()

In [None]:
for row in cursor.execute("select count() from count_etab_ouvert"):
    print(row)

In [None]:
for row in cursor.execute('SELECT * FROM count_etab_ouvert LIMIT 5;'):
    print(row)

### Liste d'enseignes

In [None]:
# Add liste d'enseignes for each établissement
add_enseigne = f'''ALTER TABLE siret ADD COLUMN enseignes GENERATED ALWAYS AS
               (COALESCE(enseigne_1, '') || COALESCE(enseigne_2, ' ') || COALESCE(enseigne_3, ' ') || COALESCE(nom_commercial, ''))
               '''
cursor.execute(add_enseigne)

In [None]:
# Create enseignes table with grouped lists of enseignes per siren
cursor.execute(f'''DROP TABLE IF EXISTS enseignes''')
cursor.execute('''CREATE TABLE enseignes (siren VARCHAR(10), liste_enseignes)''')
cursor.execute('''
                CREATE UNIQUE INDEX index_liste_enseignes
                ON enseignes (siren);
                ''')
connection.commit()

In [None]:
start_time = time.time()
# Add liste enseignes
cursor.execute('''INSERT INTO enseignes (siren, liste_enseignes) SELECT siren, GROUP_CONCAT(enseignes, ',') as liste_enseignes FROM siret GROUP BY siren;''')
stats()

In [None]:
for row in cursor.execute('SELECT liste_enseignes FROM enseignes WHERE siren="345130488" LIMIT 2;'):
    print(row)

In [None]:
for row in cursor.execute('SELECT enseignes FROM siret WHERE siren="345130488" LIMIT 5;'):
    print(row)

### Liste adresses

In [None]:
# Create SQLite function
connection.create_function("add_adresse_complete", 12, create_adresse_complete)
# Add adresse_complete column for each établissement
cursor.execute('ALTER TABLE siret ADD COLUMN adresse_complete;')

In [None]:
# Insert addresses into column
start_time = time.time()
cursor.execute(f'''UPDATE siret
                    SET adresse_complete = (
                                            SELECT add_adresse_complete(COALESCE(complement_adresse,''),COALESCE(numero_voie,''),
                                                                    COALESCE(indice_repetition,''), COALESCE(type_voie,''),
                                                                    COALESCE(libelle_voie,''), COALESCE(libelle_commune,''),
                                                                    COALESCE(libelle_cedex,''), COALESCE(distribution_speciale,''),
                                                                    COALESCE(commune,''), COALESCE(cedex,''), COALESCE(libelle_commune_etranger,''),
                                                                    COALESCE(libelle_pays_etranger,'')))''')
stats()

In [None]:
for row in cursor.execute('SELECT adresse_complete FROM siret WHERE siren="005520135" LIMIT 10;'):
    print(row)

In [None]:
# Create adresses table with grouped addresses for each siren
cursor.execute(f'''DROP TABLE IF EXISTS adresses''')
cursor.execute('''CREATE TABLE adresses (siren VARCHAR(10), liste_adresses)''')
cursor.execute('''
                CREATE UNIQUE INDEX index_liste_adresses
                ON adresses(siren);
                ''')
connection.commit()

In [None]:
start_time = time.time()
# Insert addresses into adresses table
cursor.execute('''INSERT INTO adresses (siren, liste_adresses) SELECT siren, GROUP_CONCAT(adresse_complete, ',') as liste_adresses FROM siret GROUP BY siren;''')
stats()

In [None]:
for row in cursor.execute('SELECT * FROM adresses WHERE siren="005520135" LIMIT 10;'):
    print(row)

In [None]:
cursor.execute('SELECT * FROM adresses LIMIT 1')
names = [description[0] for description in cursor.description]
rows = cursor.fetchall()
for row in rows:
    for name,val in zip(names,row):
        print(f"{name}: {val}")

## Siège only

In [None]:
cursor.execute(f'''DROP TABLE IF EXISTS siretsiege''')
cursor.execute(f'''CREATE TABLE IF NOT EXISTS siretsiege
        (
        id INTEGER NOT NULL PRIMARY KEY,
        siren,
        siret,
        date_creation,
        tranche_effectif_salarie,
        activite_principale_registre_metier,
        is_siege,
        numero_voie,
        type_voie,
        libelle_voie,
        code_postal,
        libelle_cedex,
        libelle_commune,
        commune,
        complement_adresse,
        complement_adresse_2,
        numero_voie_2,
        indice_repetition_2,
        type_voie_2,
        libelle_voie_2,
        commune_2,
        libelle_commune_2,
        cedex_2,
        libelle_cedex_2,
        cedex,
        date_debut_activite,
        distribution_speciale,
        distribution_speciale_2,
        etat_administratif_etablissement,
        enseigne_1,
        enseigne_2,
        enseigne_3,
        activite_principale,
        indice_repetition,
        nom_commercial,
        libelle_commune_etranger,
        code_pays_etranger,
        libelle_pays_etranger,
        libelle_commune_etranger_2,
        code_pays_etranger_2,
        libelle_pays_etranger_2,
        longitude,
        latitude,
        geo_adresse,
        geo_id,
        adresse_complete)
''')
cursor.execute('''INSERT INTO siretsiege (
        siren,
        siret,
        date_creation,
        tranche_effectif_salarie,
        activite_principale_registre_metier,
        is_siege,
        numero_voie,
        type_voie,
        libelle_voie,
        code_postal,
        libelle_cedex,
        libelle_commune,
        commune,
        complement_adresse,
        complement_adresse_2,
        numero_voie_2,
        indice_repetition_2,
        type_voie_2,
        libelle_voie_2,
        commune_2,
        libelle_commune_2,
        cedex_2,
        libelle_cedex_2,
        cedex,
        date_debut_activite,
        distribution_speciale,
        distribution_speciale_2,
        etat_administratif_etablissement,
        enseigne_1,
        enseigne_2,
        enseigne_3,
        activite_principale,
        indice_repetition,
        nom_commercial,
        libelle_commune_etranger,
        code_pays_etranger,
        libelle_pays_etranger,
        libelle_commune_etranger_2,
        code_pays_etranger_2,
        libelle_pays_etranger_2,
        longitude,
        latitude,
        geo_adresse,
        geo_id,
        adresse_complete) 
    SELECT
        siren,
        siret,
        date_creation,
        tranche_effectif_salarie,
        activite_principale_registre_metier,
        is_siege,
        numero_voie,
        type_voie,
        libelle_voie,
        code_postal,
        libelle_cedex,
        libelle_commune,
        commune,
        complement_adresse,
        complement_adresse_2,
        numero_voie_2,
        indice_repetition_2,
        type_voie_2,
        libelle_voie_2,
        commune_2,
        libelle_commune_2,
        cedex_2,
        libelle_cedex_2,
        cedex,
        date_debut_activite,
        distribution_speciale,
        distribution_speciale_2,
        etat_administratif_etablissement,
        enseigne_1,
        enseigne_2,
        enseigne_3,
        activite_principale,
        indice_repetition,
        nom_commercial,
        libelle_commune_etranger,
        code_pays_etranger,
        libelle_pays_etranger,
        libelle_commune_etranger_2,
        code_pays_etranger_2,
        libelle_pays_etranger_2,
        longitude,
        latitude,
        geo_adresse,
        geo_id,
        adresse_complete
    FROM siret
    WHERE is_siege = 'true';
''')

In [None]:
cursor.execute('SELECT * FROM siretsiege WHERE siren="965706690" LIMIT 1')
names = [description[0] for description in cursor.description]
rows = cursor.fetchall()
for row in rows:
    for name,val in zip(names,row):
        print(f"{name}: {val}")

In [None]:
cursor.execute('''
                CREATE INDEX index_siret_siren
                ON siretsiege (siren);
                ''')

## Insert INPI data

In [None]:
connection_inpi = sqlite3.connect(DATA_DIR +'inpi.db')

In [None]:
cursor_inpi = connection_inpi.cursor()

### Check tables

In [None]:
cursor.execute("select * from SQLite_master")

tables = cursor.fetchall()
print("Listing tables and indices from main database:")

for table in tables:

    print("Type of database object: %s"%(table[0]))

    print("Name of the database object: %s"%(table[1]))

    print("Table Name: %s"%(table[2]))

    print("Root page: %s"%(table[3]))

    print("SQL statement: %s"%(table[4]))

# Elasticsearch

In [6]:
elastic_url = 'http://search.sirene.dataeng.etalab.studio:80/'
elastic_index = 'siren-green'
elastic_user = 'elastic'
elastic_password = 'etalab123'
elastic_index_shards = 1
elastic_bulk_size=1500

## Connection

In [7]:
# initiate the default connection to elasticsearch
connections.create_connection(
    hosts=[elastic_url],
    http_auth=(elastic_user, elastic_password),
    retry_on_timeout=True,
)

<Elasticsearch([{'host': 'search.sirene.dataeng.etalab.studio', 'port': 80}])>

## Mapping

In [9]:
from elasticsearch_dsl import (
    Boolean,
    Date,
    Document,
    GeoPoint,
    Integer,
    Keyword,
    Text,
    analyzer,
    token_filter,
    tokenizer,
    Object,
)




class ElasticsearchSireneIndex(Document):
    
    class Index:
        name = elastic_index
        settings = {"number_of_shards": elastic_index_shards, "number_of_replicas": 0, 'mapping':{'ignore_malformed':True}}

## Create Index

In [None]:
if Index(elastic_index).exists():
    logging.info(f"Index  {elastic_index} already exists! Deleting...")
    Index(elastic_index).delete()
    logging.info(f"Index {elastic_index} deleted!")
logging.info(f"Creating {elastic_index} index!")
Siren.init()

In [11]:
elastic_connection = connections.get_connection()
elastic_health = elastic_connection.cluster.health()
elastic_status = elastic_health["status"]

## Health check

In [12]:
if elastic_status not in ("green", "yellow"):
    raise Exception(
        f"Cluster status is {self.elastic_status}, not green nor yellow!!"
    )
else:
    logging.info(f"Cluster status is functional: {elastic_status}")

INFO:root:Cluster status is functional: green


In [13]:
elastic_mapping = ElasticsearchSireneIndex._index.get_mapping()

INFO:elasticsearch:GET http://search.sirene.dataeng.etalab.studio:80/siren-green/_mapping [status:200 request:0.002s]


In [14]:
elastic_mapping

{'siren-green': {'mappings': {'properties': {'activite_principale_registre_metier': {'type': 'keyword'},
    'activite_principale_siege': {'type': 'keyword'},
    'activite_principale_unite_legale': {'type': 'keyword'},
    'adresse_etablissement': {'type': 'text'},
    'adresse_etablissement_2': {'type': 'text'},
    'categorie_entreprise': {'type': 'text'},
    'cedex': {'type': 'keyword'},
    'code_pays_etranger': {'type': 'text'},
    'code_postal': {'type': 'keyword'},
    'commune': {'type': 'keyword'},
    'complement_adresse': {'type': 'text'},
    'concat_enseigne_adresse': {'type': 'text',
     'analyzer': 'annuaire_analyzer'},
    'concat_nom_adr_siren': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword'}},
     'analyzer': 'annuaire_analyzer'},
    'coordonnees': {'type': 'geo_point'},
    'date_creation_siege': {'type': 'date'},
    'date_creation_unite_legale': {'type': 'date'},
    'date_debut_activite_siege': {'type': 'date'},
    'date_mise_a_jour': {'type

In [None]:
from elasticsearch_dsl import query

def search_text(index, offset: int, page_size: int, **kwargs):
    query_terms = kwargs["terms"]
    s = index.search()
    # Use filters to reduce search results
    s = filter_search(s, filters_to_ignore=["terms", "min_date_nais_dirigeant",
                                            "max_date_nais_dirigeant", "nom_dir"],
                      **kwargs)
    

In [None]:
def filter_search(search, filters_to_ignore: list, **kwargs):
    """Use filters to reduce search results."""
    for key, value in kwargs.items():
        if key == "nom_dir":
            search = search.query("nested", path="dirigeants_pp", query=Q("term",
                                                             **{'dirigeants_pp.noms':
                                                                   value}))
        if value is not None and key not in filters_to_ignore:
            search = search.filter("term", **{key: value})
            """
        if key == "min_date_nais_dirigeant":
            search = search.filter("range", **{"dirigeants_pp.date_naissance": {
                'gte': value, 'lt': kwargs["max_date_nais_dirigeant"]}})
        
        query('nested', path='user_post_id',
              query=Q('range', eser_post_id__score={'gt': 42}))
        """

    return search

In [None]:
search = search.filter("nested", path="dirigeants_pp", query=query.Q("term",
                                                             **{'dirigeants_pp.noms':
                                                                   "jouppe"}))

In [None]:
search = search.filter("nested", path="dirigeants_pp", query=query.Q("term",
                                                             **{'dirigeants_pp.prenoms':
                                                                   "xavier marie"}))

In [None]:
search = ElasticsearchSireneIndex.search()

In [None]:
search = search.filter("nested", path="dirigeants_pp", query=query.Q("term",**{'dirigeants_pp.noms.keyword': "arnal "}))
search = search.filter("nested", path="dirigeants_pp", query=query.Q("term",**{'dirigeants_pp.prenoms.keyword': "nicole"}))

In [None]:
search = search.filter("nested", path="dirigeants_pp", query=query.Q("match",**{'dirigeants_pp.noms.keyword': "arnal"}))
search = search.filter("nested", path="dirigeants_pp", query=query.Q("match",**{'dirigeants_pp.prenoms.keyword': "nicole"}))

In [None]:
search = search.filter("nested",
                       path= "dirigeants_pp",
                       query=query.Bool(filter= [{
                
                  "regexp": {
                "dirigeants_pp.noms": "arnal* cesar"
              }
            },
            {
              "match": {
                "dirigeants_pp.prenoms.keyword": "nicole"
                }
              }
          ]))
                      

In [None]:
search = search.filter("nested",
                       path= "dirigeants_pp",
                       query=query.Bool(filter= [{
                
                  "regexp": {
                "dirigeants_pp.noms": "arnal*"
              }
            },
            {
                
                  "regexp": {
                "dirigeants_pp.noms": "cesar*"
              }
            },
            {
              "match": {
                "dirigeants_pp.prenoms.keyword": "nicole"
                }
              }
          ]))
                      

In [None]:
search = search.extra(track_scores=True)
    search = search.sort(
        {"_score": {"order": "desc"}},
        {"etat_administratif_siege": {"order": "asc"}},
    )
    search = search[offset : (offset + page_size)]

In [38]:
search = ElasticsearchSireneIndex.search()
offset = 0
page_size = 20
search = search[offset : (offset + page_size)]
min_date_nais_dirigeant = "1941-09-07"
max_date_nais_dirigeant = "2022-01-01"

noms = "arnal"
prenoms = "nicole"
filters = []
for item in noms.split(' '):
    filters.append({
        "match": {
            "dirigeants_pp.noms": item
        }
    })
for item in prenoms.split(' '):
    filters.append({
        "match": {
            "dirigeants_pp.prenoms": item
        }
    })
    
filters.append({
    "range": {
        **{"dirigeants_pp.date_naissance": {
                'gte': min_date_nais_dirigeant,
                'lte': max_date_nais_dirigeant
            }}}})

search = search.filter("nested",
                       path= "dirigeants_pp",
                       query=query.Bool(filter= filters))



                      

In [53]:
search = ElasticsearchSireneIndex.search()
offset = 0
page_size = 20
search = search[offset : (offset + page_size)]
min_date_nais_dirigeant = "1900-01-01"
max_date_nais_dirigeant = "2022-01-01"

noms = "jouppe"
prenoms = "bernard"
filters = []
for item in noms.split(' '):
    filters.append({
        "match": {
            "dirigeants_pp.noms": item
        }
    })
for item in prenoms.split(' '):
    filters.append({
        "match": {
            "dirigeants_pp.prenoms": item
        }
    })
    
filters.append({
    "range": {
        **{"dirigeants_pp.date_naissance": {
                'gte': min_date_nais_dirigeant,
                'lte': max_date_nais_dirigeant
            }}}})

search = search.filter("nested",
                       path= "dirigeants_pp",
                       query=query.Bool(filter= filters))



                      

In [54]:
results = search.execute()

INFO:elasticsearch:POST http://search.sirene.dataeng.etalab.studio:80/siren-green/_search [status:200 request:0.009s]


In [55]:
total_results = results.hits.total.value
response = [
    hit.to_dict(skip_empty=False, include_meta=False) for hit in results.hits
]

In [56]:
total_results

5

In [57]:
response

[{'siren': '421485210',
  'siret_siege': '42148521000015',
  'date_creation_siege': '1999-01-12',
  'tranche_effectif_salarie_siege': 'NN',
  'date_debut_activite_siege': '2018-12-17',
  'etat_administratif_siege': 'F',
  'activite_principale_siege': '68.20B',
  'numero_voie': '15',
  'type_voie': 'RUE',
  'libelle_voie': 'D ENTRAIGUES',
  'commune': '37261',
  'libelle_commune': 'TOURS',
  'code_postal': '37000',
  'geo_id': '37261_1680_00015',
  'longitude': '0.688547',
  'latitude': '47.388091',
  'date_creation_unite_legale': '1999-01-12',
  'tranche_effectif_salarie_unite_legale': 'NN',
  'date_mise_a_jour': '2019-10-04T03:50:23',
  'etat_administratif_unite_legale': 'C',
  'nom_raison_sociale': 'MENENTRAIGUES',
  'nature_juridique_unite_legale': '6540',
  'activite_principale_unite_legale': '68.20B',
  'nombre_etablissements': 1,
  'nombre_etablissements_ouverts': 0,
  'is_siege': 'true',
  'dirigeants_pp': [{'siren': '421485210',
    'noms': 'amoudry',
    'prenoms': 'claude hug

In [16]:
from elasticsearch_dsl import query

## Indexing

In [None]:
def dict_from_row(row):
    return dict(zip(row.keys(), row))

In [None]:
def process_res(res):
    arr = []
    for result in res:            
        mydict = {}
        # mydict['siege'] = {}
        for item in result:
            # clean string from spaces
            if item in ["liste_enseignes", "liste_adresses"]:
                mydict[item] = unique_string(result[item])
            else:   
                mydict[item] = result[item]
                
        mydict['dirigeants_pp'] = json.loads(result['dirigeants_pp'])
        mydict['dirigeants_pm'] = json.loads(result['dirigeants_pm'])
        mydict['conventions_collectives'] = json.loads(result['conventions_collectives'])
                
        mydict['liste_dirigeants'] = []
        mydict['liste_conventions_collectives'] = []
        
        for dirigeant_pp in mydict['dirigeants_pp']:
            #dirigeant = {k: None if not v else v for k, v in dirigeant.items()}
            # dirigeant = {k: v for k, v in dirigeant.items() if v}
            mydict['liste_dirigeants'].append(dirigeant_pp["prenoms"] + " " + dirigeant_pp["noms"])
        for dirigeant_pm in mydict['dirigeants_pm']:
            #dirigeant = {k: None if not v else v for k, v in dirigeant.items()}
            # dirigeant = {k: v for k, v in dirigeant.items() if v}
            mydict['liste_dirigeants'].append(dirigeant_pm["denomination"])
            
        for convention in mydict['conventions_collectives']:
            mydict['liste_conventions_collectives'].append(convention["idcc"].strip())
        
        del mydict['conventions_collectives']

        mydict['nom_complet'] = create_nom_complet(
            result['nature_juridique_unite_legale'],
            result['nom'],
            result['nom_usage'],
            result['nom_raison_sociale'],
            result['sigle'],
            result['prenom'],
        )
        mydict['is_entrepreneur_individuel'] = create_entrepreneur_individuel(
            result['nature_juridique_unite_legale']
        )
        mydict['section_activite_principale'] = create_section(
            result['activite_principale_unite_legale']
        )
        mydict['departement'] = create_departement(
            result['commune']
        )
        mydict['coordonnees'] = create_coordonnees(
            result['longitude'],
            result['latitude']
        )
        mydict['concat_enseigne_adresse'] = (get_string(unique_string(result['liste_enseignes'])) + " " + get_string(unique_string(result['liste_adresses']))).strip()
        mydict['concat_nom_adr_siren'] = (get_string(mydict['nom_complet']) + " " + get_string(result['adresse_complete']) + " " + get_string(result['siren'])).strip()
        
        arr.append(mydict)
    return arr
    

In [None]:
def doc_generator(data):
    for index, document in enumerate(data):
        yield Siren(meta={"id": document["siret_siege"]}, **document).to_dict(
            include_meta=True
        )
    # Serialize the instance into a dictionary so that it can be saved in elasticsearch.

In [None]:
global_stats()

In [None]:
chunk_size = 1500

In [None]:
for row in cursor.execute('''SELECT count(*) FROM siretsiege;'''):
    nb_iter = int(row[0]) / chunk_size + 1

In [None]:
int(nb_iter)

In [None]:
start_time = time.time()

cursor.execute(f'''
    SELECT 
        ul.siren,
        st.siret as siret_siege,
        st.date_creation as date_creation_siege,
        st.tranche_effectif_salarie as tranche_effectif_salarie_siege,
        st.date_debut_activite as date_debut_activite_siege,
        st.etat_administratif_etablissement as etat_administratif_siege,
        st.activite_principale as activite_principale_siege,
        st.complement_adresse as complement_adresse,
        st.numero_voie as numero_voie,
        st.indice_repetition as ndice_repetition,
        st.type_voie as type_voie,
        st.libelle_voie as libelle_voie,
        st.distribution_speciale as distribution_speciale,
        st.cedex as cedex,
        st.libelle_cedex as libelle_cedex,
        st.commune as commune,
        st.libelle_commune as libelle_commune,
        st.code_pays_etranger as code_pays_etranger,
        st.libelle_commune_etranger as libelle_commune_etranger,
        st.libelle_pays_etranger as libelle_pays_etranger,
        st.code_postal as code_postal,
        st.geo_id as geo_id,
        st.longitude as longitude,
        st.latitude as latitude,
        st.activite_principale_registre_metier as activite_principale_registre_metier,
        st.adresse_complete as adresse_complete,
        ul.date_creation_unite_legale as date_creation_unite_legale,
        ul.tranche_effectif_salarie_unite_legale as tranche_effectif_salarie_unite_legale,
        ul.date_mise_a_jour_unite_legale as date_mise_a_jour,
        ul.categorie_entreprise as categorie_entreprise,
        ul.etat_administratif_unite_legale as etat_administratif_unite_legale,
        ul.nom_raison_sociale as nom_raison_sociale,
        ul.nature_juridique_unite_legale as nature_juridique_unite_legale,
        ul.activite_principale_unite_legale as activite_principale_unite_legale,
        ul.economie_sociale_solidaire_unite_legale as economie_sociale_solidair_unite_legale,
        (SELECT count FROM count_etab ce WHERE ce.siren = st.siren) as nombre_etablissements,
        (SELECT count FROM count_etab_ouvert ceo WHERE ceo.siren = st.siren) as nombre_etablissements_ouverts,
        (SELECT liste_enseignes FROM enseignes le WHERE le.siren = st.siren) as liste_enseignes,
        (SELECT liste_adresses FROM adresses la WHERE la.siren = st.siren) as liste_adresses,
        ul.sigle as sigle,
        ul.prenom as prenom,
        ul.nom as nom,
        ul.nom_usage as nom_usage,
        st.is_siege as is_siege,
        (SELECT json_group_array(
                json_object(
                    'siren', siren,
                    'noms', rep_noms,
                    'prenoms', rep_prenoms,
                    'date_naissance', rep_datenaissance,
                    'ville_naissance', rep_villenaissance,
                    'pays_naissance', rep_paysnaissance,
                    'qualite', rep_qualite
                    )
                ) FROM 
                (
                    SELECT siren, rep_noms, rep_prenoms, rep_datenaissance, rep_villenaissance, rep_paysnaissance, rep_qualite from dirigeant_pp 
                    WHERE siren = st.siren
                )
            ) as dirigeants_pp,
        (SELECT json_group_array(
                json_object(
                    'siren', siren,
                    'denomination', rep_denomination,
                    'qualite', rep_qualite
                    )
                ) FROM 
                (
                    SELECT siren, rep_denomination, rep_qualite from dirigeant_pm
                    WHERE siren = st.siren
                )
            ) as dirigeants_pm,
        (SELECT json_group_array(
                json_object(
                    'siren', siren,
                    'idcc', idcc
                    )
                ) FROM 
                (
                    SELECT siren, idcc from conventions_collectives
                    WHERE siren = st.siren
                )
            ) as conventions_collectives
        
    FROM
        siretsiege st
    LEFT JOIN unite_legale ul ON ul.siren = st.siren 
    
    
''')
#ld.liste_dirigeants as liste_dirigeants
#LEFT JOIN dirigeants ld ON ul.siren = ld.siren

In [None]:
i=0    
res = 1

while (res):
    res = cursor.fetchmany(1500)
    columns = tuple([x[0] for x in cursor.description])
    res = tuple(
        [{column: val for column, val in zip(columns, x)} for x in res]
    )
    res2 = process_res(res)
    i=i+1
    if(i%1000 == 0):
        print(i)
    try:
        for success, details in helpers.parallel_bulk(elastic_connection, doc_generator(res2), chunk_size=elastic_bulk_size):
            if not success:
                raise Exception(f"A file_access document failed: {details}")
    except Exception as e:
        logging.error(f"Failed to send to Elasticsearch: {e}")
    doc_count = elastic_connection.cat.count(
            index=elastic_index, params={"format": "json"}
        )[0]["count"]
    print(f"Number of documents indexed: {doc_count}")
stats()

In [None]:
global_stats()

In [None]:
cursor.close()

In [None]:
cursor.execute("SELECT * from dirigeant_pp WHERE siren='005520135' LIMIT 1000")
names = [description[0] for description in cursor.description]
rows = cursor.fetchall()
for row in rows:
    for name,val in zip(names,row): 
        print(f"{name}: {val}")

In [None]:
connection.close()