# Fill index _etablissements_ of elasticsearch
elastic path : http://elasticsearch-master:9200

In [None]:
!pip install elasticsearch_dsl
#!pip install minio

In [None]:
#!mc cp s3/projet-matchsiret/sirene.db data/

### helpers utils 
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/helpers/utils.py

In [None]:

def unique_list(lst):
    ulist = []
    [ulist.append(x) for x in lst if x not in ulist]
    return ulist


def unique_string(a):
    return " ".join(unique_list(a.strip().split(","))).strip()


def get_empty_string_if_none(string):
    if string is None:
        return ""
    return string


def dict_from_row(row):
    return dict(zip(row.keys(), row))



### data enrichment
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/data_enrichment.py

In [None]:
import json

labels_file_path = "./data/"


def load_file(file_name: str):
    with open(f"{labels_file_path}{file_name}") as json_file:
        file_decoded = json.load(json_file)
    return file_decoded


sections_NAF = load_file("sections_codes_naf.json")


def format_nom_complet(
    nom_commercial=None,
    nom=None,
    nom_usage=None,
    nom_raison_sociale=None,
    sigle=None,
    prenom=None,
) -> str:
    name = ""
    if nom_raison_sociale:
        name += nom_raison_sociale + " "
    if nom_commercial:
        name += nom_commercial + " "
        
    # if there is no official company name (typically for individual company)
    if not name:
        if prenom or nom or nom_usage:
            if nom_usage:
                formatted_name = f" {nom_usage} ({nom})" if nom else f" {nom_usage}"
            else:
                formatted_name = f" {nom}" if nom else ""

            name = f"{prenom if prenom else ''}{formatted_name}" + " "

    if sigle:
        name += f"({sigle})"
    return name.lower().strip()

# Entrepreneur individuel
def is_entrepreneur_individuel(nature_juridique_unite_legale):
    if nature_juridique_unite_legale in ["1", "10", "1000"]:
        return "true"
    else:
        return "false"


# Section activité principale
def label_section_from_activite(activite_principale_unite_legale):
    if activite_principale_unite_legale is not None:
        code_naf = activite_principale_unite_legale[:2]
        section_activite_principale = (
            sections_NAF[code_naf] if code_naf in sections_NAF else None
        )
        return section_activite_principale
    else:
        return None


# Adresse complète
def format_adresse_complete(
    complement_adresse,
    numero_voie,
    indice_repetition,
    type_voie,
    libelle_voie,
    libelle_commune,
    libelle_cedex,
    distribution_speciale,
    commune,
    cedex,
    libelle_commune_etranger,
    libelle_pays_etranger,
):
    col_list = [
        complement_adresse,
        numero_voie,
        indice_repetition,
        type_voie,
        libelle_voie,
        distribution_speciale,
    ]
    adresse = ""
    for column in col_list:
        if column:
            adresse = adresse + " " + column
    if cedex is None:
        if commune is None:
            adresse = adresse
        else:
            adresse = (
                adresse
                + " "
                + get_empty_string_if_none(commune)
                + " "
                + get_empty_string_if_none(libelle_commune)
            )
    else:
        adresse = (
            adresse
            + " "
            + get_empty_string_if_none(cedex)
            + " "
            + get_empty_string_if_none(libelle_cedex)
        )
    etranger_list = [libelle_commune_etranger, libelle_pays_etranger]
    for column in etranger_list:
        if column:
            adresse = adresse + " " + column if column != "" else ""
    return adresse.strip()


# Département
def format_departement(commune):
    departement = (
        str(commune)[:3]
        if str(commune)[:2] == "97"
        else (None if commune is None else str(commune)[:2])
    )
    return departement


# Coordonnées
def format_coordonnees(longitude, latitude):
    coordonnees = (
        None if (longitude is None) or (latitude is None) else f"{latitude},{longitude}"
    )
    return coordonnees


### Create elastic index
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/elasticsearch/create_sirene_index.py

In [None]:
import logging
from typing import Optional

from elasticsearch_dsl import Index, connections


class ElasticCreateSiret:
    """
    Create elasticsearch Index
    :param elastic_url: endpoint url of elasticsearch
    :type elastic_url: str
    :param elastic_index: index to create
    :type elastic_index: str
    :param elastic_index_shards: number of shards for index
    :type elastic_index_shards: int
    :param elastic_user: user for elasticsearch
    :type elastic_user: str
    :param elastic_password: password for elasticsearch
    :type elastic_password: str
    """

    def __init__(
        self,
        *,
        elastic_url: Optional[str] = None,
        elastic_index: Optional[str] = None,
        elastic_user: Optional[str] = None,
        elastic_password: Optional[str] = None,
        elastic_bulk_size: Optional[int] = 1500,
        **kwargs,
    ) -> None:
        self.elastic_url = elastic_url
        self.elastic_index = elastic_index
        self.elastic_user = elastic_user
        self.elastic_password = elastic_password
        self.elastic_bulk_size = elastic_bulk_size

        # initiate the default connection to elasticsearch
        connections.create_connection(
            hosts=[self.elastic_url],
            http_auth=(self.elastic_user, self.elastic_password),
            retry_on_timeout=True,
        )

        self.elastic_connection = connections.get_connection()
        self.elastic_health = self.elastic_connection.cluster.health()
        self.elastic_status = self.elastic_health["status"]
        self.elastic_mapping = self.elastic_connection.indices.get_mapping()

        logging.info("Elasticsearch connection initiated!")

    def check_health(self):
        if self.elastic_status not in ("green", "yellow"):
            raise Exception(
                f"Cluster status is {self.elastic_status}, not green nor yellow!!"
            )
        else:
            logging.info(f"Cluster status is functional: {self.elastic_status}")

    def execute(self):

        self.check_health()

        if not self.elastic_url:
            raise ValueError("Please provide elasticsearch url endpoint")

        # if self.elastic_index_shards is not None:
        if Index(self.elastic_index).exists():
            logging.info(f"Index  {self.elastic_index} already exists! Deleting...")
            Index(self.elastic_index).delete()
            logging.info(f"Index {self.elastic_index} deleted!")
        logging.info(f"Creating {self.elastic_index} index!")
        # Create the mapping in elasticsearch
        ElasticsearchSireneIndex.init(self.elastic_index)


### Process unites legales
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/elasticsearch/process_unites_legales.py

### Process etablissements
adapted from process unites legales

In [None]:
from typing import Tuple

def process_etablissements(chunk_etablissements_sqlite: Tuple):
    list_etablissements_processed = []
    for etablissement in chunk_etablissements_sqlite:
        etablissement_processed = {"enseignes": [], "adresses":[]}
        
        for field in etablissement:
            if field in ["enseignes", "adresses"]: # nested fields
                etablissement_processed[field] = json.loads(etablissement[field])
            else:
                etablissement_processed[field] = etablissement[field]

        # Enseignes
        etablissement_processed["liste_enseignes"] = []
        for enseigne in etablissement_processed["enseignes"]:
            etablissement_processed["liste_enseignes"].extend(
                [enseigne["enseigne_1"], enseigne["enseigne_2"], enseigne["enseigne_3"]]
            )
        etablissement_processed["liste_enseignes"] = list(
            set(filter(None, etablissement_processed["liste_enseignes"]))
        )
        del etablissement_processed["enseignes"]

        # Adresses
        etablissement_processed["liste_adresses"] = []
        for adresse in etablissement_processed["adresses"]:
            etablissement_processed["liste_adresses"].append(
                format_adresse_complete(
                    adresse["complement_adresse"],
                    adresse["numero_voie"],
                    adresse["indice_repetition"],
                    adresse["type_voie"],
                    adresse["libelle_voie"],
                    adresse["libelle_commune"],
                    adresse["libelle_cedex"],
                    adresse["distribution_speciale"],
                    adresse["commune"],
                    adresse["cedex"],
                    adresse["libelle_commune_etranger"],
                    adresse["libelle_pays_etranger"],
                )
            )
        etablissement_processed["liste_adresses"] = list(
            set(filter(None, etablissement_processed["liste_adresses"]))
        )
        del etablissement_processed["adresses"]

        etablissement_processed["adresse_etablissement"] = format_adresse_complete(
            etablissement_processed["complement_adresse"],
            etablissement_processed["numero_voie"],
            etablissement_processed["indice_repetition"],
            etablissement_processed["type_voie"],
            etablissement_processed["libelle_voie"],
            etablissement_processed["libelle_commune"],
            etablissement_processed["libelle_cedex"],
            etablissement_processed["distribution_speciale"],
            etablissement_processed["commune"],
            etablissement_processed["cedex"],
            etablissement_processed["libelle_commune_etranger"],
            etablissement_processed["libelle_pays_etranger"],
        )

        # compute nom complet for unite legale (that is not specific to etablissement)
        etablissement_processed["nom_complet"] = format_nom_complet(
            etablissement_processed["nom_commercial"],
            etablissement_processed["nom"],
            etablissement_processed["nom_usage"],
            etablissement_processed["nom_raison_sociale"],
            etablissement_processed["sigle"],
            etablissement_processed["prenom"],
        )
        # Replace missing values with 0
        etablissement_processed["nombre_etablissements_ouverts"] = (
            0
            if "nombre_etablissements_ouverts" not in etablissement_processed or etablissement_processed["nombre_etablissements_ouverts"] is None
            else etablissement_processed["nombre_etablissements_ouverts"]
        )
        #etablissement_processed[
        #    "is_entrepreneur_individuel"
       # ] = is_entrepreneur_individuel(etablissement["nature_juridique_etablissement"])
        etablissement_processed[
            "section_activite_principale"
        ] = label_section_from_activite(
            etablissement["activite_principale_siege"]
        )
        etablissement_processed["departement"] = format_departement(
            etablissement["commune"]
        )
        etablissement_processed["coordonnees"] = format_coordonnees(
            etablissement["longitude"], etablissement["latitude"]
        )
        etablissement_processed["concat_enseigne_adresse"] = (
            etablissement_processed["liste_enseignes"]
            + etablissement_processed["liste_adresses"]
        )

        etablissement_processed["concat_nom_adr_siren"] = (
            get_empty_string_if_none(etablissement_processed["nom_complet"])
            + " "
            + get_empty_string_if_none(etablissement_processed["adresse_etablissement"])
            + " "
            + get_empty_string_if_none(etablissement["siren"])
        ).strip()
        list_etablissements_processed.append(etablissement_processed)
    return list_etablissements_processed


### Mapping unites legales
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/elasticsearch/mapping_sirene_index.py

In [None]:
from elasticsearch_dsl import (
    Boolean,
    Date,
    Document,
    GeoPoint,
    Integer,
    Keyword,
    Text,
    analyzer,
    token_filter,
    tokenizer,
)

# Define filters
french_elision = token_filter(
    "french_elision",
    type="elision",
    articles_case=True,
    articles=[
        "l",
        "m",
        "t",
        "qu",
        "n",
        "s",
        "j",
        "d",
        "c",
        "jusqu",
        "quoiqu",
        "lorsqu",
        "puisqu",
    ],
)
french_stop = token_filter("french_stop", type="stop", stopwords="_french_")
french_stemmer = token_filter("french_stemmer", type="stemmer", language="light_french")
# ignore_case option deprecated, use lowercase filter before synonym filter
french_synonym = token_filter(
    "french_synonym", type="synonym", expand=True, synonyms=[]
)

# Define analyzer
annuaire_analyzer = analyzer(
    "annuaire_analyzer",
    tokenizer=tokenizer("icu_tokenizer"),
    filter=[
        "lowercase",
        french_elision,
        french_stop,
        "icu_folding",
        french_synonym,
        "asciifolding",
        french_stemmer,
    ],
)


class ElasticsearchSireneIndex(Document):
    """

    Model-like class for persisting documents in elasticsearch.
    It's a wrapper around document to create specific mappings and to add settings in
    elasticsearch.

    Class used to represent etablissements
    """

    activite_principale_siege = Keyword()  # Add index_prefixes option
    activite_principale_unite_legale = Keyword()
    activite_principale_registre_metier = Keyword()
    adresse_etablissement = Text()
    categorie_entreprise = Text()
    cedex = Keyword()
    code_pays_etranger = Text()
    code_postal = Keyword()
    commune = Keyword()
    complement_adresse = Text()
    concat_enseigne_adresse = Text(analyzer=annuaire_analyzer)
    concat_nom_adr_siren = Text(
        analyzer=annuaire_analyzer, fields={"keyword": Keyword()}
    )
    coordonnees = GeoPoint()
    date_creation_siege = Date()
    date_creation_unite_legale = Date()
    date_debut_activite_siege = Date()
    date_mise_a_jour = Date()
    departement = Keyword()
    distribution_speciale = Text()
    economie_sociale_solidaire_unite_legale = Keyword()
    enseigne = Text()
    etat_administratif_unite_legale = Keyword()
    etat_administratif_etablissement = Keyword()
    geo_adresse = Text(analyzer=annuaire_analyzer)
    geo_id = Keyword()
    identifiant_association_unite_legale = Keyword()
    indice_repetition = Text()
    is_entrepreneur_individuel = Boolean()
    is_siege = Boolean()
    latitude = Text()
    libelle_cedex = Text()
    libelle_commune = Text()
    libelle_commune_etranger = Text()
    libelle_pays_etranger = Text()
    libelle_voie = Text()
    liste_adresses = Text(analyzer=annuaire_analyzer)
    liste_enseignes = Text(analyzer=annuaire_analyzer)
    longitude = Text()
    nature_juridique_unite_legale = Integer()
    nom = Text()
    nom_complet = Text(analyzer=annuaire_analyzer, fields={"keyword": Keyword()})
    nom_raison_sociale = Text()
  #  nombre_etablissements = Integer()  # NaN can't be stored in an integer array
    nombre_etablissements_ouverts = Integer()
    numero_voie = Text()
    section_activite_principale = Keyword()
    sigle = Keyword()
    siren = Keyword(required=True)
    siret_siege = Keyword(required=True)
    type_voie = Text()
    tranche_effectif_salarie_siege = Keyword()
    tranche_effectif_salarie_unite_legale = Keyword()

    class Index:
        settings = {"number_of_shards": 1, "number_of_replicas": 0}
        name = "siret"


### Fill database and index
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/task_functions.py

In [None]:
def create_unite_legale_table(**kwargs):
    siren_db_conn, siren_db_cursor = connect_to_db()
    siren_db_cursor.execute("""DROP TABLE IF EXISTS unite_legale""")
    siren_db_cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS unite_legale
        (
            siren,
            date_creation_unite_legale,
            sigle,
            prenom,
            identifiant_association_unite_legale,
            tranche_effectif_salarie_unite_legale,
            date_mise_a_jour_unite_legale,
            categorie_entreprise,
            etat_administratif_unite_legale,
            nom,
            nom_usage,
            nom_raison_sociale,
            nature_juridique_unite_legale,
            activite_principale_unite_legale,
            economie_sociale_solidaire_unite_legale
        )
    """
    )
    siren_db_cursor.execute(
        """
        CREATE UNIQUE INDEX index_siren
        ON unite_legale (siren);
        """
    )
    url = "https://files.data.gouv.fr/insee-sirene/StockUniteLegale_utf8.zip"
    r = requests.get(url, allow_redirects=True)
    open(DATA_DIR + "StockUniteLegale_utf8.zip", "wb").write(r.content)
    shutil.unpack_archive(DATA_DIR + "StockUniteLegale_utf8.zip", DATA_DIR)
    df_iterator = pd.read_csv(
        DATA_DIR + "StockUniteLegale_utf8.csv", chunksize=100000, dtype=str
    )
    # Insert rows in database by chunk
    for i, df_unite_legale in enumerate(df_iterator):
        df_unite_legale = df_unite_legale[
            [
                "siren",
                "dateCreationUniteLegale",
                "sigleUniteLegale",
                "prenom1UniteLegale",
                "identifiantAssociationUniteLegale",
                "trancheEffectifsUniteLegale",
                "dateDernierTraitementUniteLegale",
                "categorieEntreprise",
                "etatAdministratifUniteLegale",
                "nomUniteLegale",
                "nomUsageUniteLegale",
                "denominationUniteLegale",
                "categorieJuridiqueUniteLegale",
                "activitePrincipaleUniteLegale",
                "economieSocialeSolidaireUniteLegale",
            ]
        ]
        # Rename columns
        df_unite_legale = df_unite_legale.rename(
            columns={
                "dateCreationUniteLegale": "date_creation_unite_legale",
                "sigleUniteLegale": "sigle",
                "prenom1UniteLegale": "prenom",
                "trancheEffectifsUniteLegale": "tranche_effectif_salarie_unite_legale",
                "dateDernierTraitementUniteLegale": "date_mise_a_jour_unite_legale",
                "categorieEntreprise": "categorie_entreprise",
                "etatAdministratifUniteLegale": "etat_administratif_unite_legale",
                "nomUniteLegale": "nom",
                "nomUsageUniteLegale": "nom_usage",
                "denominationUniteLegale": "nom_raison_sociale",
                "categorieJuridiqueUniteLegale": "nature_juridique_unite_legale",
                "activitePrincipaleUniteLegale": "activite_principale_unite_legale",
                "economieSocialeSolidaireUniteLegale": "economie_sociale_solidaire_"
                "unite_legale",
                "identifiantAssociationUniteLegale": "identifiant_association_"
                "unite_legale",
            }
        )
        df_unite_legale.to_sql(
            "unite_legale", siren_db_conn, if_exists="append", index=False
        )

        for row in siren_db_cursor.execute("""SELECT COUNT() FROM unite_legale"""):
            logging.info(
                f"************ {row} records have been added to the unite_legale table!"
            )

    del df_unite_legale

    for count_unites_legales in siren_db_cursor.execute(
        """
        SELECT COUNT()
        FROM unite_legale
        """
    ):
        logging.info(
            f"************ {count_unites_legales} records have been added to the "
            f"unite_legale table!"
        )
    commit_and_close_conn(siren_db_conn)

In [None]:
import json
import logging
import os
import shutil
import sqlite3
from urllib.request import urlopen

import pandas as pd
import requests
from elasticsearch_dsl import connections
#from minio import Minio

DATA_DIR = "/home/jovyan/work/matchSIRET/indexation/data/"
DATABASE_LOCATION = DATA_DIR + "sirene.db"
ELASTIC_BULK_SIZE = 1500

ELASTIC_PASSWORD = ""
ELASTIC_URL = "http://elasticsearch-master:9200"
ELASTIC_USER = ""


# Connect to database
def connect_to_db():
    siret_db_conn = sqlite3.connect(DATABASE_LOCATION)
    logging.info("******************* Connecting to database! *******************")
    siret_db_cursor = siret_db_conn.cursor()
    return siret_db_conn, siret_db_cursor


def commit_and_close_conn(siret_db_conn):
    siret_db_conn.commit()
    siret_db_conn.close()


def create_sqlite_database():
    os.makedirs(os.path.dirname(DATA_DIR), exist_ok=True)
    if os.path.exists(DATABASE_LOCATION):
        os.remove(DATABASE_LOCATION)
        logging.info(
            "******************** Existing database removed from {DATABASE_LOCATION}"
        )
    siren_db_conn = sqlite3.connect(DATABASE_LOCATION)
    logging.info(
        "******************* Creating and connecting to database! *******************"
    )
    commit_and_close_conn(siren_db_conn)


def create_etablissement_table():
    siret_db_conn, siret_db_cursor = connect_to_db()
    # Create list of departement zip codes
    all_deps = [
        *"-0".join(list(str(x) for x in range(0, 10))).split("-")[1:],
        *list(str(x) for x in range(10, 20)),
        *["2A", "2B"],
        *list(str(x) for x in range(21, 96)),
        *"-7510".join(list(str(x) for x in range(0, 10))).split("-")[1:],
        *"-751".join(list(str(x) for x in range(9, 21))).split("-")[1:],
        *["971", "972", "973", "974", "976", "98"],
        *[""],
    ]
    # Remove Paris zip code
    all_deps.remove("75")

    # Create database
    siret_db_cursor.execute("""DROP TABLE IF EXISTS siret""")
    siret_db_cursor.execute(
        """CREATE TABLE IF NOT EXISTS siret
            (
            id INTEGER NOT NULL PRIMARY KEY,
            siren,
            siret,
            date_creation,
            tranche_effectif_salarie,
            activite_principale_registre_metier,
            is_siege,
            numero_voie,
            type_voie,
            libelle_voie,
            code_postal,
            libelle_cedex,
            libelle_commune,
            commune,
            complement_adresse,
            complement_adresse_2,
            numero_voie_2,
            indice_repetition_2,
            type_voie_2,
            libelle_voie_2,
            commune_2,
            libelle_commune_2,
            cedex_2,
            libelle_cedex_2,
            cedex,
            date_debut_activite,
            distribution_speciale,
            distribution_speciale_2,
            etat_administratif_etablissement,
            enseigne_1,
            enseigne_2,
            enseigne_3,
            activite_principale,
            indice_repetition,
            nom_commercial,
            libelle_commune_etranger,
            code_pays_etranger,
            libelle_pays_etranger,
            libelle_commune_etranger_2,
            code_pays_etranger_2,
            libelle_pays_etranger_2,
            longitude,
            latitude,
            geo_adresse,
            geo_id)
            """
    )
     # TODO : add in files.data.gouv the field sigle_unite_legale ?
    siret_db_cursor.execute(
        """
        CREATE INDEX index_siret
        ON siret (siret);
        """
    )

    # Upload geo data by departement
    for dep in all_deps:
        url = f"https://files.data.gouv.fr/geo-sirene/last/dep/geo_siret_{dep}.csv.gz"
        print(url)
        df_dep = pd.read_csv(
            url,
            compression="gzip",
            dtype=str,
            usecols=[
                "siren",
                "siret",
                "dateCreationEtablissement",
                "trancheEffectifsEtablissement",
                "activitePrincipaleRegistreMetiersEtablissement",
                "etablissementSiege",
                "numeroVoieEtablissement",
                "libelleVoieEtablissement",
                "codePostalEtablissement",
                "libelleCommuneEtablissement",
                "libelleCedexEtablissement",
                "typeVoieEtablissement",
                "codeCommuneEtablissement",
                "codeCedexEtablissement",
                "complementAdresseEtablissement",
                "distributionSpecialeEtablissement",
                "complementAdresse2Etablissement",
                "indiceRepetition2Etablissement",
                "libelleCedex2Etablissement",
                "codeCedex2Etablissement",
                "numeroVoie2Etablissement",
                "typeVoie2Etablissement",
                "libelleVoie2Etablissement",
                "codeCommune2Etablissement",
                "libelleCommune2Etablissement",
                "distributionSpeciale2Etablissement",
                "dateDebut",
                "etatAdministratifEtablissement",
                "enseigne1Etablissement",
                "enseigne1Etablissement",
                "enseigne2Etablissement",
                "enseigne3Etablissement",
                "denominationUsuelleEtablissement",
         #       "sigleUniteLegale",
                "activitePrincipaleEtablissement",
                "geo_adresse",
                "geo_id",
                "longitude",
                "latitude",
                "indiceRepetitionEtablissement",
                "libelleCommuneEtrangerEtablissement",
                "codePaysEtrangerEtablissement",
                "libellePaysEtrangerEtablissement",
                "libelleCommuneEtranger2Etablissement",
                "codePaysEtranger2Etablissement",
                "libellePaysEtranger2Etablissement",
            ],
        )
        df_dep = df_dep.rename(
            columns={
                "dateCreationEtablissement": "date_creation",
                "trancheEffectifsEtablissement": "tranche_effectif_salarie",
                "activitePrincipaleRegistreMetiersEtablissement": "activite_principale"
                "_registre_metier",
                "etablissementSiege": "is_siege",
                "numeroVoieEtablissement": "numero_voie",
                "typeVoieEtablissement": "type_voie",
                "libelleVoieEtablissement": "libelle_voie",
                "codePostalEtablissement": "code_postal",
                "libelleCedexEtablissement": "libelle_cedex",
                "libelleCommuneEtablissement": "libelle_commune",
                "codeCommuneEtablissement": "commune",
                "complementAdresseEtablissement": "complement_adresse",
                "complementAdresse2Etablissement": "complement_adresse_2",
                "numeroVoie2Etablissement": "numero_voie_2",
                "indiceRepetition2Etablissement": "indice_repetition_2",
                "typeVoie2Etablissement": "type_voie_2",
                "libelleVoie2Etablissement": "libelle_voie_2",
                "codeCommune2Etablissement": "commune_2",
                "libelleCommune2Etablissement": "libelle_commune_2",
                "codeCedex2Etablissement": "cedex_2",
                "libelleCedex2Etablissement": "libelle_cedex_2",
                "codeCedexEtablissement": "cedex",
                "dateDebut": "date_debut_activite",
                "distributionSpecialeEtablissement": "distribution_speciale",
                "distributionSpeciale2Etablissement": "distribution_speciale_2",
                "etatAdministratifEtablissement": "etat_administratif_etablissement",
                "enseigne1Etablissement": "enseigne_1",
                "enseigne2Etablissement": "enseigne_2",
                "enseigne3Etablissement": "enseigne_3",
                "activitePrincipaleEtablissement": "activite_principale",
                "indiceRepetitionEtablissement": "indice_repetition",
                "denominationUsuelleEtablissement": "nom_commercial",
            #    "sigleUniteLegale": "sigle_unite_legale",
                "libelleCommuneEtrangerEtablissement": "libelle_commune_etranger",
                "codePaysEtrangerEtablissement": "code_pays_etranger",
                "libellePaysEtrangerEtablissement": "libelle_pays_etranger",
                "libelleCommuneEtranger2Etablissement": "libelle_commune_etranger_2",
                "codePaysEtranger2Etablissement": "code_pays_etranger_2",
                "libellePaysEtranger2Etablissement": "libelle_pays_etranger_2",
            }
        )
        df_dep.to_sql("siret", siret_db_conn, if_exists="append", index=False)
        siret_db_conn.commit()
        for row in siret_db_cursor.execute("""SELECT COUNT() FROM siret"""):
            logging.info(
                f"************ {row} records have been added to the siret table!"
            )
    del df_dep
    commit_and_close_conn(siret_db_conn)


def count_nombre_etablissements():
    # Connect to database
    siren_db_conn, siren_db_cursor = connect_to_db()
    # Create a count table
    siren_db_cursor.execute("""DROP TABLE IF EXISTS count_etab""")
    siren_db_cursor.execute(
        """CREATE TABLE count_etab (siren VARCHAR(10), count INTEGER)"""
    )
    # Create index
    siren_db_cursor.execute(
        """
        CREATE UNIQUE INDEX index_count_siren
        ON count_etab (siren);
        """
    )
    siren_db_cursor.execute(
        """
        INSERT INTO count_etab (siren, count)
        SELECT siren, count(*) as count
        FROM siret GROUP BY siren;
        """
    )
    commit_and_close_conn(siren_db_conn)


def count_nombre_etablissements_ouverts():
    siret_db_conn, siret_db_cursor = connect_to_db()
    siret_db_cursor.execute("""DROP TABLE IF EXISTS count_etab_ouvert""")
    siret_db_cursor.execute(
        """CREATE TABLE count_etab_ouvert (siren VARCHAR(10), count INTEGER)"""
    )
    siret_db_cursor.execute(
        """
        CREATE UNIQUE INDEX index_count_ouvert_siren
        ON count_etab_ouvert (siren);
        """
    )
    siret_db_cursor.execute(
        """
        INSERT INTO count_etab_ouvert (siren, count)
        SELECT siren, count(*) as count
        FROM siret
        WHERE etat_administratif_etablissement = 'A' GROUP BY siren;
        """
    )
    commit_and_close_conn(siret_db_conn)



def create_elastic_index(**kwargs):
    elastic_index = "siret"
    logging.info(f"******************** Index to create: {elastic_index}")
    create_index = ElasticCreateSiret(
        elastic_url=ELASTIC_URL,
        elastic_index=elastic_index,
        elastic_user=ELASTIC_USER,
        elastic_password=ELASTIC_PASSWORD,
        elastic_bulk_size=ELASTIC_BULK_SIZE,
    )
    create_index.execute()


def fill_elastic_index(**kwargs):
    elastic_index = "siret"
    siret_db_conn, siret_db_cursor = connect_to_db()

    
    siret_db_cursor.execute(
        """SELECT
        ul.siren,
        st.siret as siret,
        st.nom_commercial as nom_commercial,
        st.date_creation as date_creation_siege,
        st.tranche_effectif_salarie as tranche_effectif_salarie_siege,
        st.date_debut_activite as date_debut_activite_siege,
        st.etat_administratif_etablissement as etat_administratif_siege,
        st.activite_principale as activite_principale_siege,
        st.complement_adresse as complement_adresse,
        st.numero_voie as numero_voie,
        st.indice_repetition as indice_repetition,
        st.type_voie as type_voie,
        st.libelle_voie as libelle_voie,
        st.distribution_speciale as distribution_speciale,
        st.cedex as cedex,
        st.libelle_cedex as libelle_cedex,
        st.commune as commune,
        st.libelle_commune as libelle_commune,
        st.code_pays_etranger as code_pays_etranger,
        st.libelle_commune_etranger as libelle_commune_etranger,
        st.libelle_pays_etranger as libelle_pays_etranger,
        st.code_postal as code_postal,
        st.geo_id as geo_id,
        st.longitude as longitude,
        st.latitude as latitude,
        st.activite_principale_registre_metier as activite_principale_registre_metier,
        ul.date_creation_unite_legale as date_creation_unite_legale,
        ul.tranche_effectif_salarie_unite_legale as tranche_effectif_salarie_unite_legale,
        ul.date_mise_a_jour_unite_legale as date_mise_a_jour,
        ul.categorie_entreprise as categorie_entreprise,
        ul.etat_administratif_unite_legale as etat_administratif_unite_legale,
        ul.nom_raison_sociale as nom_raison_sociale,
        ul.nature_juridique_unite_legale as nature_juridique_unite_legale,
        ul.activite_principale_unite_legale as activite_principale_unite_legale,
        ul.economie_sociale_solidaire_unite_legale as
        economie_sociale_solidaire_unite_legale,
        (SELECT count FROM count_etab_ouvert ceo WHERE ceo.siren = st.siren LIMIT 10) as
        nombre_etablissements_ouverts,
        (SELECT json_group_array(
            json_object(
                'enseigne_1', enseigne_1,
                'enseigne_2', enseigne_2,
                'enseigne_3', enseigne_3)
            ) FROM
            (SELECT enseigne_1, enseigne_2, enseigne_3 from siret
            WHERE siren = st.siren LIMIT 10)
        ) as enseignes,
        (SELECT json_group_array(
            json_object(
            'complement_adresse', complement_adresse,
            'numero_voie', numero_voie,
            'indice_repetition', indice_repetition,
            'type_voie', type_voie,
            'libelle_voie', libelle_voie,
            'libelle_commune', libelle_commune,
            'libelle_cedex', libelle_cedex,
            'distribution_speciale', distribution_speciale,
            'commune', commune,
            'cedex', cedex,
            'libelle_commune_etranger', libelle_commune_etranger,
            'libelle_pays_etranger', libelle_pays_etranger)
            ) FROM
            (SELECT complement_adresse, numero_voie, indice_repetition,
            type_voie, libelle_voie, libelle_commune, distribution_speciale,
            commune, cedex, libelle_commune_etranger, libelle_pays_etranger
            FROM siret
            WHERE siren = st.siren LIMIT 10)
            ) as adresses,
            ul.sigle as sigle,
            ul.prenom as prenom,
            ul.nom as nom,
            ul.nom_usage as nom_usage,
            st.is_siege as is_siege
        FROM
            siret st
        LEFT JOIN
            unite_legale ul
        ON
            ul.siren = st.siren
        LIMIT 10
        ;"""  # noqa
    )
    
    connections.create_connection(
        hosts=[ELASTIC_URL],
        http_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
        retry_on_timeout=True,
    )
    elastic_connection = connections.get_connection()

    
    doc_count = index_etablissements_by_chunk(
        cursor=siret_db_cursor,
        elastic_connection=elastic_connection,
        elastic_bulk_size=ELASTIC_BULK_SIZE,
        elastic_index=elastic_index,
    )
    



### Indexing chunks of unite legale
https://github.com/etalab/annuaire-entreprises-search-infra/blob/b6d7f44bffec99a1036cca21eb56804abf8e8b4a/elasticsearch/indexing_unite_legale.py

In [None]:
import logging
from elasticsearch.helpers import parallel_bulk


def elasticsearch_doc_generator(data):
    # Serialize the instance into a dictionary so that it can be saved in elasticsearch.
    for index, document in enumerate(data):
        yield ElasticsearchSireneIndex(
            meta={"id": document["siret"]}, **document
        ).to_dict(include_meta=True)


def index_unites_legales_by_chunk(
    cursor, elastic_connection, elastic_bulk_size, elastic_index
):
    logger = 0
    chunk_unites_legales_sqlite = 1
    while chunk_unites_legales_sqlite:
        chunk_unites_legales_sqlite = cursor.fetchmany(elastic_bulk_size)
        unite_legale_columns = tuple([x[0] for x in cursor.description])
        liste_unites_legales_sqlite = []
        # Group all fetched unites_legales from sqlite in one list
        for unite_legale in chunk_unites_legales_sqlite:
            liste_unites_legales_sqlite.append(
                {
                    unite_legale_columns: value
                    for unite_legale_columns, value in zip(
                        unite_legale_columns, unite_legale
                    )
                }
            )

        liste_unites_legales_sqlite = tuple(liste_unites_legales_sqlite)

        chunk_unites_legales_processed = process_unites_legales(
            liste_unites_legales_sqlite
        )
        logger += 1
        if logger % 1000 == 0:
            logging.info(f"logger={logger}")
        try:
            chunk_doc_generator = elasticsearch_doc_generator(
                chunk_unites_legales_processed
            )
            # Bulk index documents into elasticsearch using the parallel version of the
            # bulk helper that runs in multiple threads
            # The bulk helper accept an instance of Elasticsearch class and an
            # iterable, a generator in our case
            for success, details in parallel_bulk(
                elastic_connection, chunk_doc_generator, chunk_size=elastic_bulk_size
            ):
                if not success:
                    raise Exception(f"A file_access document failed: {details}")
        except Exception as e:
            logging.error(f"Failed to send to Elasticsearch: {e}")
        doc_count = elastic_connection.cat.count(
            index=elastic_index, params={"format": "json"}
        )[0]["count"]
        logging.info(f"Number of documents indexed: {doc_count}")
    return doc_count


### Indexing chunks of etablissements

In [None]:
def index_etablissements_by_chunk(
    cursor, elastic_connection, elastic_bulk_size, elastic_index
):
    logger = 0
    chunk_etablissements_sqlite = 1
    while chunk_etablissements_sqlite:
        chunk_etablissements_sqlite = cursor.fetchmany(elastic_bulk_size)
        etablissements_columns = tuple([x[0] for x in cursor.description])
        liste_etablissements_sqlite = []
        # Group all fetched etablissements from sqlite in one list
        for etablissement in chunk_etablissements_sqlite:
            liste_etablissements_sqlite.append(
                {
                    etablissement_column: value
                    for etablissement_column, value in zip(
                        etablissements_columns, etablissement
                    )
                }
            )

        liste_etablissements_sqlite = tuple(liste_etablissements_sqlite)

        # TODO: modify function `process_etablissements` to focus on the most relevant fields
        chunk_etablissements_processed = process_etablissements(
            liste_etablissements_sqlite
        )
        logger += 1
        if logger % 1000 == 0:
            logging.info(f"logger={logger}")
        try:
            chunk_doc_generator = elasticsearch_doc_generator(
                chunk_etablissements_processed
            )
            # Bulk index documents into elasticsearch using the parallel version of the
            # bulk helper that runs in multiple threads
            # The bulk helper accept an instance of Elasticsearch class and an
            # iterable, a generator in our case
            for success, details in parallel_bulk(
                elastic_connection, chunk_doc_generator, chunk_size=elastic_bulk_size
            ):
                if not success:
                    raise Exception(f"A file_access document failed: {details}")
        except Exception as e:
            logging.error(f"Failed to send to Elasticsearch: {e}")
        doc_count = elastic_connection.cat.count(
            index=elastic_index, params={"format": "json"}
        )[0]["count"]
        logging.info(f"Number of documents indexed: {doc_count}")
    return doc_count

## Summary of pipeline steps
https://github.com/etalab/annuaire-entreprises-search-infra/blob/main/DAG-insert-elk-sirene.py

# Launch the processs

In [None]:
"""
create_sqlite_database()
create_unite_legale_table()
create_etablissement_table()
count_nombre_etablissements_ouverts()
"""


In [None]:
"""
create_elastic_index()
fill_elastic_index()
"""

## Copy the etablissement table sqlite

In [None]:
def test():
    conn, siret_db_cursor = connect_to_db()
    print(siret_db_cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'").fetchall())
    print(siret_db_cursor.execute("SELECT  count(*) from unite_legale").fetchall())
    print(siret_db_cursor.execute("SELECT  count(*) from siret").fetchall())
    print(siret_db_cursor.execute("SELECT  count(*) from count_etab_ouvert").fetchall())
    
test()

In [None]:
#count_nombre_etablissements_ouverts()
