In [1]:
# Context creation
from dtm_tools.init_spark import CreateContext

context = CreateContext(app_name='dad_lot2_1',
                        conf='big_heavy_d')       # conf est optionnel, defaut : small_light_d
spark = context.get_spark()
hive   = context.get_hive(spark)

import pandas as pd 
import os
import pyspark.sql.types as T
import pyspark.sql.functions as F
from datetime import datetime

I - Authentification validée
I - Configuration:  big_heavy_d
I - En attente de l'initialisation de l'application Spark ....
I - Application Spark lancée !


In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import re

In [3]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    "Union des tables"
    return reduce(DataFrame.unionAll, dfs)

def select_between_dates(df, date1, date2, date_type):

    "Récupère un df et sélectionne les lignes émis entre deux strings dates passées en paramètres selon la colonne date passée en parametre"
    date1 = datetime.strptime(date1, '%d/%m/%Y')
    date2 = datetime.strptime(date2, '%d/%m/%Y')
    df_out = (df.filter((df[date_type] >= date1) & (df[date_type] <= date2)))
    n_rows_deleted = df.count() - df_out.count()
    print("Nombre de lignes en période d'analyse: " + str(df_out.count()))
    return(df_out)

In [4]:
deltac = spark.sql("select * from db_deltac.deltac_extrn")
deltad = spark.sql("select * from db_deltad.deltad_extrn")
deltax = spark.sql("select * from db_deltax.deltax_extrn")

deltag = unionAll(deltac,deltad,deltax)

#Features casting and creation
##Conversion en int des masses et valeur
deltag = deltag.withColumn("qt_masse_nette", deltag["qt_masse_nette"].cast(T.IntegerType()))
deltag = deltag.withColumn("mt_valeur_douane", deltag["mt_valeur_douane"].cast(T.IntegerType()))
##Creation d'une colonne nc4 
deltag = deltag.withColumn('nc6',deltag.cd_marchandise.substr(1, 6))
deltag = deltag.withColumn('nc4',deltag.cd_marchandise.substr(1, 4))

##Creation d'un numéro DAUs sans préfixe
deltag = deltag.withColumn("dau_number",F.col("id").substr(6,14))

## Creation Id article
deltag = deltag.withColumn("id_article",F.concat(F.col("dau_number"),
                                                         F.lit("_"),
                                                         F.col("cd_numero_article")))
#Filter Import 
deltag = deltag.filter(F.col("cd_type_flux").rlike("IMP"))

In [5]:
#Selection du périmètre par dates d'emission 
from datetime import datetime
deltag_sub = select_between_dates(deltag, date1 = "01/02/2017", date2 = "31/12/2021", date_type = "dt_validation")

Nombre de lignes en période d'analyse: 127838204


In [6]:
deltag_sub = deltag_sub.cache()
deltag_sub.count()

127838204

In [7]:
decs = deltag_sub.groupBy("lb_pays_origine","cd_marchandise").agg(

    F.count("id_article").alias("n_articles"),
    F.sum("mt_valeur_douane").alias("mt_valeur_douane"),
    F.sum("qt_masse_nette").alias("qt_masse_nette")

)

In [8]:
decs_pd = decs.toPandas()

In [9]:
print(decs_pd["n_articles"].describe([0.1,0.25,0.5,0.75,0.9]))
print(decs_pd["mt_valeur_douane"].describe([0.1,0.25,0.3,0.4,0.5,0.75,0.9]))

count    2.558510e+05
mean     4.996588e+02
std      5.607397e+04
min      1.000000e+00
10%      1.000000e+00
25%      1.000000e+00
50%      5.000000e+00
75%      3.100000e+01
90%      2.150000e+02
max      2.817988e+07
Name: n_articles, dtype: float64
count    2.547950e+05
mean     3.837792e+06
std      1.041842e+08
min      0.000000e+00
10%      1.580000e+02
25%      1.187000e+03
30%      2.042000e+03
40%      5.554000e+03
50%      1.425900e+04
75%      1.691775e+05
90%      1.577151e+06
max      2.033298e+10
Name: mt_valeur_douane, dtype: float64


In [10]:
sub_decs_pd = decs_pd[(decs_pd["mt_valeur_douane"] > 1000000) & (decs_pd["n_articles"] > 100)]
sub_decs_pd.columns = ['lb_pays_origine_1', 'cd_marchandise_1', 'n_articles_1', 'mt_valeur_douane_1','qt_masse_nette_1']
sub_decs_pd = sub_decs_pd[['lb_pays_origine_1', 'cd_marchandise_1']]
selected_ncs = spark.createDataFrame(sub_decs_pd) 

In [11]:
deltag_sub_merged = deltag_sub.join(F.broadcast(selected_ncs),
                                    (deltag_sub.cd_marchandise == selected_ncs.cd_marchandise_1) &
                                    (deltag_sub.lb_pays_origine == selected_ncs.lb_pays_origine_1), how = "inner")

In [12]:
deltag_sub_merged = deltag_sub_merged.cache()
deltag_sub_merged.count()

64704959

In [13]:
decs_lot2 = deltag_sub_merged.select("dt_validation",
                "lb_pays_provenance",
                "lb_pays_origine",
                "mt_valeur_douane",
                "cd_marchandise",
                "cd_operateur_importateur",
                "lb_operateur_importateur_nom",
                "id",
                "qt_masse_nette",
                "id_article")

In [14]:
decs_lot2 = decs_lot2.withColumn("ratio_mv", F.col("mt_valeur_douane") / F.col("qt_masse_nette"))
decs_lot2 = decs_lot2.withColumn("date", F.regexp_replace(F.col('dt_validation'),"T"," "))
decs_lot2 = decs_lot2.withColumn("date", F.substring(F.col("date"),1,22))
decs_lot2 = decs_lot2.withColumn("date", F.to_timestamp(F.col("date")))

In [15]:
decs_lot2_grp = decs_lot2.groupBy("lb_pays_origine","cd_marchandise", F.window(F.col("date"),"1 week")).agg(

#     F.collect_list("id_article").alias("id_articles"),
#     F.collect_list("dt_validation").alias("dt_validation"),
#     F.collect_list("mt_valeur_douane").alias("mt_valeur_douane"),
#     F.collect_list("qt_masse_nette").alias("qt_masse_nette"),
#     F.collect_list("ratio_mv").alias("ratio_mv")
    F.sum("mt_valeur_douane").alias("mt_valeur_douane"),
    F.sum("qt_masse_nette").alias("qt_masse_nette"),
)

In [16]:
decs_lot2_grp = decs_lot2_grp.cache()
print(decs_lot2_grp.count())


2717511


In [17]:
def get_item(liste):
    return(liste[0])
#Get firstdate
first_elem_udf = F.udf(lambda x: get_item(x), T.TimestampType())
decs_lot2_grp = decs_lot2_grp.withColumn("dates", first_elem_udf(F.col("window")))
decs_lot2_grp = decs_lot2_grp.withColumn("dates", F.date_format(F.col("dates"),"yyyy-MM-dd"))
decs_lot2_grp = decs_lot2_grp.cache()
decs_lot2_grp.count()

2717511

In [18]:
from datetime import datetime
def get_full_period_data(startdate,
                         endate,
                         resampling_parameter):
    """
    Creating a spark dataframe with one column filled with 0 for each dates between, Start and enddate.
    Parms : Startdate (string), Endate (string), Resampling parameter
    """
    #Setting-up analysis period dataframe : 2 cols dates and sum filled by zeros
    startdate = datetime.strptime(startdate, '%Y-%m-%d')
    endate = datetime.strptime(endate, '%Y-%m-%d')
    analysis_period = pd.date_range(startdate,endate,freq=resampling_parameter)
    analysis_period = pd.DataFrame(analysis_period)
    analysis_period.columns = ["dates"]
    analysis_period["mt_valeur_douane"] = 0
    analysis_period["qt_masse_nette"] = 0
    analysis_period["dates"] = analysis_period["dates"].apply(lambda x: x.strftime('%Y-%m-%d'))
    analysis_period_sq = spark.createDataFrame(analysis_period)
    return(analysis_period_sq)

In [19]:
#Creating a 4 years dataframe with values filled with 0 
full_period = get_full_period_data("2017-02-01","2021-12-31","W-THU")
full_period = full_period.repartition(10)
#Crossjoin in order to product a table with by weeks x cd_marchandise x pays origine filled of zeros
##Selecting list of NCs x Lb pays origine
ncs_orgin = decs_lot2_grp.select("cd_marchandise","lb_pays_origine").distinct()
ncs_orgin = ncs_orgin.cache()
ncs_orgin.count()
df_0_filled = full_period.crossJoin(F.broadcast(ncs_orgin))
##Changing colnames for future join 
for c in df_0_filled.columns:
    df_0_filled = df_0_filled.withColumnRenamed(c,c+"_0")
##Caching
df_0_filled = df_0_filled.cache()
df_0_filled.count()  
decs_lot2_grp = decs_lot2_grp.repartition(200)

#Fulljoin with decs
full_join = df_0_filled.join(decs_lot2_grp, ((df_0_filled.dates_0 == decs_lot2_grp.dates) & 
                                            (df_0_filled.lb_pays_origine_0 == decs_lot2_grp.lb_pays_origine) & 
                                             (df_0_filled.cd_marchandise_0 == decs_lot2_grp.cd_marchandise)), how = "full_outer")

full_join = full_join.cache()
full_join.count()

##Selecting rows wich have no data 
left_only = full_join.filter(F.col("lb_pays_origine").isNull())
##Renaming cols
selected_cols = ['dates_0',
 'mt_valeur_douane_0',
 'qt_masse_nette_0',
 'cd_marchandise_0',
 'lb_pays_origine_0']
left_only = left_only.select(selected_cols)

for c in left_only.columns: 
    left_only = left_only.withColumnRenamed(c, c[:-2])

#Dropping windows
decs_lot2_grp = decs_lot2_grp.drop("window")

##Appending rows with 0 
left_only = left_only.select(decs_lot2_grp.columns)
augement_decs_lot_2_grp = decs_lot2_grp.union(left_only)

#Final data 
augement_decs_lot_2_grp = augement_decs_lot_2_grp.cache()
augement_decs_lot_2_grp.count()

5300886

In [2]:
augement_decs_lot_2_grp.write.parquet("lot_2_dad.parquet")
!hadoop fs -copyToLocal /user/nnougue/lot_2_dad.parquet /home/nnougue/poc_monitoring_dad/data/outputs