# Import Data Douane To ElasticSearch

### Import des Bibliothèques nécessaires

In [1]:
import pandas as pd
import numpy as np
import os
import json
import elasticsearch
import datetime
from elasticsearch import Elasticsearch
import elasticsearch_dsl
from datetime import datetime
from elasticsearch_dsl import Document, Date, Nested, Boolean, analyzer, InnerDoc, Completion, Keyword, Text, Integer, Keyword, Text
from elasticsearch_dsl.connections import connections

### Création de la variable my_dir (à modifier par  l'utilisateur)

In [2]:
my_dir = r'C:/Users/aaras/Desktop/Data_Douane'
os.chdir(my_dir)

#### Organisation du dossier ' Data_Douane':
* Data_Douane
    * National-2010-export
      *  Libelle_NC8_2010.txt
      *  Libelle_CPF6.txt
      *  Libelle_A129.txt
      *  National_2010_Export.txt
      *  Libelle_PAYS.txt
    * National-2010-import
      *  Libelle_NC8_2010.txt
      *  Libelle_CPF6.txt
      *  Libelle_A129.txt
      *  National_2010_Import.txt
      *  Libelle_PAYS.txt
    * [...]
    * National-2019-export
      *  Libelle_NC8_2010.txt
      *  Libelle_CPF6.txt
      *  Libelle_A129.txt
      *  National_2010_Export.txt
      *  Libelle_PAYS.txt
    * National-2019-import
      *  Libelle_NC8_2019.txt
      *  Libelle_CPF6.txt
      *  Libelle_A129.txt
      *  National_2019_Import.txt
      *  Libelle_PAYS.txt
      

### Création du DataFrame df.import_export

#### Explication des champs
* Flux : le flux correspond au mouvement de la marchandise. Ce champ est valorisé à I pour
Importation et à E pour Exportation
* Mois : c'est le mois durant lequel a eu lieu le mouvement de marchandise.
* Année: c'est l'année durant laquelle a eu lieu le mouvement de marchandise.
* Code CPF6 : C'est le niveau fin de la nomenclature Classification de produits française (CPF).
La CPF est la nomenclature statistique, nationale et centrale de produits. C'est une nomenclature de
produit et de service. Elle permet aux données du commerce extérieur d'être intégrées avec d'autres
statistiques économiques telles que la production industrielle ou la consommation dans des analyses
économiques tant globales que régionales.
* Code A129 : La nomenclature A129 correspond à une nomenclature de regroupement des niveaux
de la CPF pour répondre aux besoins de la production de données de synthèse pour l'analyse
économique et la diffusion.
* Code NC8 : C'est le code de la nomenclature combinée à 8 chiffres (NC8), utilisée pour les
obligations déclaratives des opérateurs auprès de la douane, permet une connaissance détaillée du
commerce extérieur de la France : elle compte en effet un peu moins de 10 000 rubriques.
* Code Pays : C'est un code à deux caractères alphabétiques attribué à chaque pays par la
Commission des communautés européennes.
     * À l'importation, les marchandises sont relevées au compte du pays d'origine.
     * À l'exportation, les envois sont imputés au compte de la destination finale déclarée.
* Valeur : La valeur des échanges est exprimée en euros.
* Masse : Les masses sont exprimées en kilogrammes.
* USUP : Unité supplémentaire.

##### Remarques diverses sur les fichiers utilisés :
* Vérifier que les fichier .txt sont bien encodés en utf-8. Si ce n'est pas le cas vous pouvez ouvrir le fichier dans notepad++ pour le convertir.
* Les headers des fichiers 'Libelle_NC8_xxx.txt' ont été supprimés à la main car il n'apparaissaient pas sur tous les fichiers
* Le code pays de la Namibie étant 'NA', ce code est par défaut détecté par pd.read_csv() comme un NaN. Il faut ajouter une liste de na_values et mettre keep_defaut_na = false la fonction pd.read_csv()
* Ajout de 3 lignes manquantes dans le fichier Libelle_NC8_2018.txt
    *  99500000;Marchandises de faible valeurs dans les échanges intra-communautaires;0;;1993;2018;;
    *  99988000;Composants d'ensembles industriels non classés ailleurs;0;;1993;2018;;
    *  99992000;Colis postaux non classés ailleurs;0;;1993;2018;;
* Ajout de 3 lignes manquantes dans le fichier Libelle_NC8_2019.txt
    *  99050000;Biens personnels appartenant à des personnes physiques qui transfèrent leur résidence normale;0;;2011;2019
    *  99190000;Autres biens appartenant à des personnes physiques qui transfèrent leur résidence normale;0;;2011;2019
    *  99500000;Marchandises de faible valeurs dans les échanges intra-communautaires;0;;1993;2019
    *  99992000;Colis postaux non classés ailleurs;0;;1993;2019

In [3]:
def creation_df_import_export(liste_annee):
    
    '''création d'/un dataframe contenant les données d'/import/export des douanes pour la liste d'années spécifiées en entrée'''
    
    df_columns = ['Code_Flux', 'Libelle_Flux', 'Mois', 'Annee', 'Date_DateTime', 'Code_CPF6', 'Libelle_CPF6', 'Code_A129', 'Libelle_A129', 'Code_NC8', 'Libelle_NC8', 'Code_Pays', 'Libelle_Pays', 'Valeur', 'Masse', 'USUP']
    df = pd.DataFrame(columns=df_columns)
    
    for annee in liste_annee:
        df_import_export = creation_df_import_export_annee(annee)
        df = pd.concat([df,df_import_export])
        print('Année {} ajoutée, {} import/export ajoutés depuis {}'.format(annee, df.shape[0], liste_annee[0]))

    df = df.reset_index(drop = True)
    return df

In [4]:
def creation_df_import_export_annee(annee):
    
    '''Création d'un dataframe pour l'année courante de la forme ['Code_Flux', 'Libelle_Flux', 'Mois', 'Annee', 'Date_DateTime', 'Code_CPF6', 'Libelle_CPF6', 'Code_A129', 'Libelle_A129', 'Code_NC8', 'Libelle_NC8', 'Code_Pays', 'Libelle_Pays', 'Valeur', 'Masse', 'USUP']'''
    
    df_import, df_export, df_nc8, df_pays, df_cpf6, df_a129 = csv_to_df(annee) #on charge les 6 fichiers (import, export, et table de conversion nc8, a129, cpf6, pays)
    df = pd.concat([df_import,df_export]) # on fusionne les imports et les exports
    df = pd.merge(df, df_nc8, how='left') # on ajoute le libelle nc8
    df = pd.merge(df, df_nc8, how='left') # on ajoute le libelle nc8    
    df = pd.merge(df, df_a129, how='left') # on ajoute le libelle a129
    df = pd.merge(df, df_cpf6, how='left') # on ajoute le libelle cpf6
    df = pd.merge(df, df_pays, how='left') # on ajoute le libelle pays
    df = df_add_date_datetime(df) # on ajopute une colonne date au format datetime
    df = df_add_libelle_flux(df) # on ajoute import/export en toute lettre
    new_columns_order = ['Code_Flux', 'Libelle_Flux', 'Mois', 'Annee', 'Date_DateTime', 'Code_CPF6', 'Libelle_CPF6', 'Code_A129', 'Libelle_A129', 'Code_NC8', 'Libelle_NC8', 'Code_Pays', 'Libelle_Pays', 'Valeur', 'Masse', 'USUP']
    df = df.reindex(columns = new_columns_order)
    return df

In [5]:
def csv_to_df(annee):
    
    '''création d'un dataframe contenant les données d'import/export des douanes pour l'année spécifiée en entrée'''
    
    import_dir = os.path.join(my_dir,'National-'+str(annee)+'-import')
    export_dir = os.path.join(my_dir,'National-'+str(annee)+'-export')
    na_values_liste = ['#N/A', '#N/A', 'N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN',' N/A',' NULL', 'NaN', 'n/a','nan', 'null']
    columns_name = ['Code_Flux', 'Mois', 'Annee', 'Code_CPF6','Code_A129','Code_NC8','Code_Pays','Valeur', 'Masse', 'USUP']
    dtype_dico = {'Code_Flux': object , 'Mois': int , 'Annee': int, 'Code_CPF6': object, 'Code_A129': object, 'Code_NC8':object,'Code_Pays':object, 'Valeur':int, 'Masse':int, 'USUP':int}
    os.chdir(import_dir)
    df_import = pd.read_csv('National_'+str(annee)+'_Import.txt', sep=";", header=None, encoding = 'utf-8', names=columns_name, dtype=dtype_dico, keep_default_na=False, na_values = na_values_liste)
    os.chdir(export_dir)
    df_export = pd.read_csv('National_'+str(annee)+'_Export.txt', sep=";", header=None, encoding = 'utf-8', names=columns_name, dtype=dtype_dico, keep_default_na=False, na_values = na_values_liste)
    df_nc8 = pd.read_csv('Libelle_NC8_'+str(annee)+'.txt', sep=";", header=None, names = ['Code_NC8','Libelle_NC8'], encoding = 'utf-8',  usecols = [0,1], dtype={'Code_NC8':object,'Libelle_NC8':object} )
    df_a129 = pd.read_csv('Libelle_A129.txt', sep=";", header=None, names = ['Code_A129','Libelle_A129','c','d'], usecols = ['Code_A129','Libelle_A129'], dtype={'Code_A129':object,'Libelle_A129':object})
    df_cpf6 = pd.read_csv('Libelle_CPF6.txt', sep=";", header=None, names = ['Code_CPF6', 'Libelle_CPF6','c','d'], usecols = ['Code_CPF6', 'Libelle_CPF6'],dtype={'Code_CPF6':object,'Libelle_CPF6':object})
    df_pays = pd.read_csv('Libelle_PAYS.txt', sep=";", header=None, names = ['Code_Pays', 'Libelle_Pays', 'c','d'],usecols=['Code_Pays', 'Libelle_Pays'], dtype={'Code_Pays':object,'Libelle_Pays':object},keep_default_na=False, na_values = na_values_liste)
    return df_import, df_export, df_nc8, df_pays, df_cpf6, df_a129

In [6]:
def df_add_date_datetime(df):
    
    '''A partir des colonnes Annee et Mois du dataframe, création d'une nouvelle colonne Date au format datetime'''
    
    df = df.assign(Date_DateTime = pd.to_datetime({'year':df.Annee,'month':df.Mois,'day':1}))
    return df

In [7]:
def df_add_libelle_flux(df):
    
    '''Création d'une nouvelle colonne spécifiant en toute lettre s'il s'agit d'un import ou d'un export'''
    
    df_flux = pd.DataFrame([['I','Import'],['E','Export']],columns=['Code_Flux', 'Libelle_Flux'])
    df = pd.merge(df, df_flux,how='left')
    return df

In [None]:
liste_annee = ['2010','2011','2012','2013','2014','2015','2016','2017','2018','2019']
df = creation_df_import_export(liste_annee)

Année 2010 ajoutée, 3728478 import/export ajoutés depuis 2010
Année 2011 ajoutée, 7503145 import/export ajoutés depuis 2010
Année 2012 ajoutée, 11392481 import/export ajoutés depuis 2010


In [None]:
os.chdir(my_dir)
df.to_csv('National_2010_2019_Import_Export.csv', index=False) #créé un fichier de 15 Go donc peut être pas une bonne idée :(

In [None]:
df_test = df.head(10000)
df_test

## Export vers ElasticSearch


In [None]:
df_export = df.head(10000)

In [None]:
df['Date_DateTime'] = df['Date_DateTime'].astype(str)
df_export


### Méthode 1:

In [None]:
INDEX="douane_2"
TYPE= "record"

def rec_to_actions(df):
    import json
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : "%s", "_type" : "%s" }}'% (INDEX, TYPE))
        yield (json.dumps(record))

from elasticsearch import Elasticsearch
e = Elasticsearch() # no args, connect to localhost:9200
if not e.indices.exists(INDEX):
    raise RuntimeError('index does not exists, use `curl -X PUT "localhost:9200/%s"` and try again'%INDEX)

r = e.bulk(rec_to_actions(df)) # return a dict

print(not r["errors"])

### Méthode 2 

In [None]:
connections.create_connection(hosts=['localhost'])

class Import_Export_Douane(Document):
    flux = Keyword()
    mois = Integer() 
    annee = Integer()
    code_CPF6 = Keyword()
    libelle_CPF6 = Text()
    code_A129 = Keyword()
    libelle_A129 = Text()
    code_NC8 = Keyword()
    libelle_NC8 = Text()
    code_pays = Keyword()
    libelle_pays = Keyword()
    valeur = Integer()
    masse = Integer()
    usup = Integer()
    
    class Index:
        name = 'import_export_douane'

    def save(self, ** kwargs):
        return super(Import_Export_Douane, self).save(** kwargs)

    def is_published(self):
        return datetime.now() >= self.published_from

In [None]:
def import_df(df):
    lis = []
    for i in range (df.shape[0]):
        info_export = df.loc[i].tolist() #['Flux', 'Mois', 'Annee', 'Code_CPF6','Code_A129','Code_NC8','Code_Pays','Valeur', 'Masse', 'USUP'] 
         # create and save and article
        doc = Import_Export_Douane(meta={'id': i}, tags=['test'])
        doc.flux = info_export[0]
        doc.mois =  info_export[1]
        doc.annee = info_export[2]
        doc.code_CPF6 = info_export[3]
        doc.libelle_CPF6 = info_export[4]
        doc.code_A129 = info_export[5]
        doc.libelle_A129 = info_export[6]
        doc.code_NC8 = info_export[7]
        doc.libelle_NC8 = info_export[8]
        doc.code_pays = info_export[9]
        doc.libelle_pays = info_export[10]
        doc.valeur = info_export[11]
        doc.masse = info_export[12]
        doc.usup = info_export[13]
        doc.published_from = datetime.now()
        doc.save(doc_type="test")
        print('Export n°{} importé'.format(i))

    

In [None]:
connections.create_connection(hosts=['localhost'])
import_df(df)