# 1. Objet

Ce script retraite les données brutes enregistrées par le RawDataProcessor afin de :
- contrôler la conformité à certaines règles de gestion métier
- nettoyer les avoirs lorsque c'est possible
- agréger les informations à une maille commande
- calculer le canal majoritaire en poids
- effectuer d'autres aggrégations si nécessaire (ex : types de tarif...)

# 2. Imports et setup technique

In [1]:
from pathlib import Path
import sys
import datetime
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from importlib import reload
project_root = str(Path(sys.path[0]).parents[0].absolute())
project_root
if project_root not in sys.path:
    sys.path.append(project_root)
import multiprocessing as mp
    
from scripts.utils import process_df  # traitement des avoirs
    
data_path = Path('..') / 'data'
persist_path = Path('..') / 'persist'
from IPython.display import display, HTML
display(HTML("<style>.container { width:90%; }</style>"))

from dask.distributed import LocalCluster, Client
import dask.dataframe as dd
# Paramètres pour les traitements "tout-venant"
# std_client_kwargs = dict(
#     n_workers=int(0.9 * mp.cpu_count()),
#     processes=True,
#     threads_per_worker=2,
#     dashboard_address=':36000',
#     memory_limit='6GB',
# )
std_client_kwargs = dict(
    n_workers=4,
    processes=True,
    threads_per_worker=2,
    memory_limit='8GB',
    dashboard_address=':36000',
)



# Paramètres pour la constitution de l'index, gourmand en mémoire.
heavy_client_kwargs = dict(
    n_workers=1,
    threads_per_worker=2,
    processes=True,
    memory_limit='31GB',
    dashboard_address=':36000',
)

In [2]:
orgacom_list = [
    '1ALO',
    '1BFC',
    '1CAP',
    '1CTR',
    '1EXP',
    '1LRO',
    '1LXF',
    '1NCH',
    '1OUE',
    '1PAC',
#     '1PLU', 
    '1PNO',
    '1PSU',
    '1RAA',
    '1SOU',
    '2BRE',
    '2CAE',
    '2CTR',
    '2EST',
    '2IDF',
#     '2IFC', Cash Européenne Food ?
    '2MPY',
    '2NOR',
    '2RAA',
    '2SES',
    '2SOU',
]

# Chargement des données brutes

Il est possible de suivre l'avancement du traitement à l'url http://devdm:36000 (une fois le traitement lancé).

In [3]:
%%time
with LocalCluster(**std_client_kwargs) as cluster, Client(cluster) as client_:
    raw_data = dd.read_parquet(persist_path / 'raw_data.parquet')
    print(f'Il y a {len(raw_data)} lignes dans le dataset')

Il y a 246155473 lignes dans le dataset
CPU times: user 1.7 s, sys: 201 ms, total: 1.9 s
Wall time: 9.34 s


In [4]:
# timed : 6,45s

# %%time
# with LocalCluster(
#     n_workers=int(0.9 * mp.cpu_count()),
#     processes=False,
#     threads_per_worker=1,
#     dashboard_address=':36000',
# #     memory_limit='2GB',
# #     ip='tcp://localhost:9895',
# ) as cluster, Client(cluster) as client_:
#     raw_data = dd.read_parquet(persist_path / 'raw_data.parquet')
#     print(f'Il y a {len(raw_data)} lignes dans le dataset')

Il est nécessaire de recharger les catégories à la lecture du fichier parquet.
https://docs.dask.org/en/latest/dataframe-categoricals.html

In [5]:
for field in raw_data.dtypes.loc[lambda x: x == 'category'].index:
    raw_data[field] = raw_data[field].cat.set_categories(raw_data[field].head(1).cat.categories)

# Contrôle des données

On définit les fonctions qui vont permettre d'effectuer les contrôles.

### Contrôle des types de documents

In [6]:
valid_doctypes = ['ZC01', 'ZC02', 'ZC10'] # documents de type "commandes"
invalid_doctypes = ['ZR01', 'ZR02', 'ZA01', 'ZA02', 'ZA03', 'ZA04', 'ZC20']  # autres types de documents
doctypes = valid_doctypes + invalid_doctypes   

On s'assure qu'on n'a pas de type de document non prévu.

In [7]:
def ctrle_doctypes(data):
    if len(data.loc[~data.doctype.isin(doctypes)]) > 0:
        print(f'Unexpected doctypes have been encountered: '
              f'{data.loc[~data.doctype.isin(doctypes), "doctype"].value_counts().compute()}!')

In [8]:
%%time
with LocalCluster(**std_client_kwargs) as cluster, Client(cluster) as client_: 
    ctrle_doctypes(raw_data)

Unexpected doctypes have been encountered: ZC04    223011
ZA07      4064
ZB1       2497
ZB5       1155
Name: doctype, dtype: int64!
CPU times: user 21.6 s, sys: 1.75 s, total: 23.4 s
Wall time: 4min 53s


### Contrôle des CA bruts non nuls alors que le poids est nul

D'un point de vue métier, il n'est pas possible qu'une ligne de **commande** (= valid_doctype) avec un poids nul ait un CA brut, sauf pour certains articles de service

In [9]:
def ctrle_no_weight_revenue(data, order_doctypes=valid_doctypes):
    return(
        data.loc[
            data.doctype.isin(order_doctypes) &
            (data.weight == 0) &
            (data.brutrevenue != 0)
        ]
        .compute()
        .groupby(['orgacom', 'material'], observed=True)
        .size()
        .to_frame()
    )

In [10]:
%%time
with LocalCluster(**std_client_kwargs) as cluster, Client(cluster) as client_:
    result = ctrle_no_weight_revenue(raw_data)

CPU times: user 12.1 s, sys: 1.01 s, total: 13.2 s
Wall time: 2min 18s


In [11]:
with pd.option_context('display.max_columns', None):
    display(result.unstack('orgacom', fill_value=0).sort_index(axis=1).style.bar(align='mid', axis=None))

Unnamed: 0_level_0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
orgacom,1ALO,1BFC,1CAP,1CTR,1LRO,1LXF,1NCH,1OUE,1PAC,1PNO,1PSU,1RAA,1SOU,2BRE,2CTR,2EST,2IDF,2MPY,2NOR,2RAA,2SES,2SOU
material,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2,Unnamed: 22_level_2
000000000000028084,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0
000000000000052565,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
000000000000064208,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0
000000000000156453,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1
000000000000156466,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0
000000000000156474,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
000000000000156486,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2,7,0,2,3,1,2,1
000000000000162186,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0
000000000000189057,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
FC18404,0,0,0,0,0,0,0,2,1,0,0,0,0,0,0,0,0,0,0,0,0,0


L'essentiel des articles concernés sont des articles de service (forfaits livraison). Il faudrait regarder ce qui s'est passé sur le 156486, il remonte régulièrement côté ES.

### Contrôle des origines de commande

In [12]:
valid_origins = ['TV', 'VR', 'WEB', 'EDI']

def ctrle_origin(data, valid_origins=valid_origins):
    sizes = (
        data.loc[
            (data.brutrevenue != 0) &
            data.doctype.isin(valid_doctypes)
        ].groupby(['orgacom'], observed=True).size()
        .compute()
    ).rename('baseline')
    
    invalid_origins = (
        data.loc[
            (data.brutrevenue != 0) &
            data.doctype.isin(valid_doctypes) &
            ~data.origin.isin(valid_origins)
        ].groupby(['orgacom', 'origin'], observed=True).size()
        .compute()    
    ).rename('invalid').to_frame()
    invalid_origins = invalid_origins.reset_index().merge(sizes.reset_index(), on='orgacom', how='left')
    invalid_origins['percentage'] = invalid_origins['invalid'] / invalid_origins['baseline']
    return(invalid_origins)

On contrôle que les origines de commande "exotiques" sont minoritaire sur le dataset.

In [13]:
%%time
with LocalCluster(**std_client_kwargs) as cluster, Client(cluster) as client_:
    display(ctrle_origin(raw_data).sort_values(['orgacom', 'origin']).style.format({'percentage': lambda x: f'{x:.2%}'}).bar(subset=['percentage'], align='mid', ))

Unnamed: 0,orgacom,origin,invalid,baseline,percentage
18,1ALO,TELE,5,14951326,0.00%
8,1ALO,DFUE,3,14951326,0.00%
2,1CAP,SCHR,100,329655,0.03%
11,1CTR,DFUE,39,7936828,0.00%
10,1NCH,SCHR,85,11628415,0.00%
9,1PSU,DFUE,8,16415373,0.00%
6,1RAA,SCHR,22,18052798,0.00%
5,1RAA,TELE,76,18052798,0.00%
19,1RAA,MUEN,1,18052798,0.00%
7,1RAA,DFUE,30,18052798,0.00%


CPU times: user 18.5 s, sys: 1.93 s, total: 20.4 s
Wall time: 5min 20s


### Retraitement des avoirs

On lance le retraitement des avoirs. La règle de gestion est qu'on va combiner le montant du tonnage, CA et marge des avoirs aux commandes si on est dans la situation suivante : 
- l'avoir est passé pour le même article, le même client (et organisation commerciale) et le même jour que le poste de commande à combiner
- il n'y a qu'un seul poste de commande existant pour ces critères (contre exemple : l'article a fait l'objet d'une commande sur 2 canaux distincts, on ne sait pas auquel affecter l'avoir)
- le CA brut et le tonnage du poste de commande résiduel doivent être tous les deux supérieurs ou égal 0

In [18]:
processed_ddf = raw_data.map_partitions(
    process_df,
    meta=raw_data,
)

Les cellules ci-dessous produisent des graphes pour illustrer le traitement, leur exécution n'est pas indispensable.

In [15]:
# %%time

# with LocalCluster(**std_client_kwargs) as cluster, Client(cluster) as client_:
#     processed_1ALO = (processed_ddf.loc[lambda x: x.orgacom == '1ALO'].compute())
#     raw_1ALO = raw_data.loc[raw_data.orgacom == '1ALO'].compute()

In [16]:
# fig, axs = plt.subplots(nrows=2, figsize=(20, 12))
# sns.lineplot(
#     data=raw_1ALO.groupby(['date', 'origin'])[['weight', 'brutrevenue', 'margin']].sum(),
#     x='date',
#     y='margin',
#     hue='origin',
#     ax=axs[0],
# )
# sns.lineplot(
#     data=processed_1ALO.groupby(['date', 'origin'])[['weight', 'brutrevenue', 'margin']].sum(),
#     x='date',
#     y='margin',
#     hue='origin',
#     ax=axs[1],
# )
# fig.suptitle('Illustration du nettoyage des avoirs - Succursale 1ALO')

In [17]:
# del(raw_1ALO, processed_1ALO)

### Calcul des aggrégations

On définit d'abord les fonctions des aggrégations à calculer (somme pour les indicateurs, + un indicateur pour compter les lignes), ainsi que les aggrégations : 
- orders : les commandes, au sens client x date x origine
- pricetype : niveau commande avec en plus les types de prix initiaux et appliqués. Cette aggrégation a été désactivé dans le dictionnaire ci-dessous, car elle n'est pas utilisée.

In [15]:
aggfuncs = {
    'margin': 'sum', 
    'brutrevenue': 'sum',
    'weight': 'sum',
    'material': 'size',
}

agg_defs = {
    'orders': ['date', 'orgacom', 'client', 'origin'],
#     'pricetype': ['date', 'orgacom', 'client', 'origin', 'pricetype_init', 'pricetype_applied']
}

In [32]:
%%time
# around 80 minutes for 'orders' aggregation only.

with LocalCluster(**std_client_kwargs) as cluster, Client(cluster) as client_:
    filtered_ddf = (
        processed_ddf
        .loc[
            processed_ddf.origin.isin(valid_origins) & 
            (processed_ddf.brutrevenue > 0.) &
            (processed_ddf.weight > 0.)
        ]
    )
    for agg_name, groupers in agg_defs.items():
        (
            filtered_ddf
            .groupby(groupers, observed=True)
            .agg(aggfuncs, split_out=5)
            .reset_index()
            .rename(columns={'material': 'linecount'})
            .set_index('date')
            .repartition(partition_size="100MB")
            .to_parquet(
                persist_path / (agg_name + '_all_SV.parquet'),
                overwrite=True,
                engine='fastparquet',         
            )
        )



CPU times: user 9min 9s, sys: 40.1 s, total: 9min 49s
Wall time: 1h 21min 26s


In [23]:
### Ancienne version, plus nécessaire.

# order_groupers = ['orgacom', 'date', 'client', 'origin']  # pour les commandes, on considèrera ces clefs de regroupement
# order_df_list = []  # aggrégé par date, client, canal, orgacom 
# pricetype_groupers = ['orgacom', 'date', 'client', 'origin', 'pricetype_init', 'pricetype_applied']
# pricetype_df_list = []


# for orgacom in orgacom_list:
#     print('----------------------------------------------------------------')
#     print(f'{datetime.datetime.now()} - Début du traitement pour {orgacom}')
#     print('----------------------------------------------------------------')
#     print(f'{datetime.datetime.now()} - Lecture du fichier')
#     data = pd.read_pickle(persist_path / 'rawbyoc' / f'data_{orgacom}.pkl')
#     print(f'{datetime.datetime.now()} - Traitement des avoirs')
#     data = credit_processing(data)
#     print(f'{datetime.datetime.now()} - Application des filtres: CA > 0, origine de commande ok, poids > 0')
#     data = data.loc[
#         data.origin.isin(valid_origins) & 
#         (data.brutrevenue > 0) &
#         (data.weight > 0)
#     ]
#     print(f"{datetime.datetime.now()} - Calcul de l'aggrégation 'orders'")
#     order_df = data.groupby(order_groupers, observed=True).agg(aggfuncs).rename({'origin': 'linecount'}, axis=1)
#     order_df_list.append(order_df)
# #     print(f"{datetime.datetime.now()} - Calcul de l'aggrégation 'pricetype'")
# #     pricetype_df = data.groupby(pricetype_groupers, observed=True).agg(aggfuncs).rename({'origin': 'linecount'}, axis=1)
# #     pricetype_df_list.append(pricetype_df)

# pd.concat(order_df_list, axis=0).to_pickle(persist_path / 'orders_all_SV.pkl')

# Calcul du canal majoritaire en poids

In [33]:
raw_orders = dd.read_parquet(persist_path / 'orders_all_SV.parquet').compute()
raw_orders.head(10)

Unnamed: 0_level_0,orgacom,client,origin,margin,brutrevenue,weight,linecount
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2017-07-01,1LRO,60151,VR,225.1,763.76,83.47,20
2017-07-01,1LRO,211118,VR,81.63,230.69,47.716,13
2017-07-01,1LRO,210623,TV,85.88,273.32,43.08,13
2017-07-01,1LRO,210124,TV,213.51,798.87,93.174,17
2017-07-01,1LRO,210071,TV,81.48,178.88,18.663,10
2017-07-01,1LRO,209507,VR,68.93,208.51,39.714,16
2017-07-01,1LRO,208718,TV,14.89,80.3,8.14,3
2017-07-01,1LRO,208444,VR,26.43,68.94,5.872,3
2017-07-01,1LRO,207557,VR,42.15,133.0,23.66,4
2017-07-01,1LRO,202457,VR,294.38,1166.8,122.959,23


In [34]:
print(f'Nb orders in initial dataset: {len(raw_orders)}')
target_len = len(raw_orders.reset_index().loc[:, ['orgacom', 'client', 'date']].drop_duplicates())
print(f'Target order count in order dataset: {target_len}')

Nb orders in initial dataset: 15396408
Target order count in order dataset: 13648307


In [50]:
%%time
# about 1min 45s

main_origin = (
    raw_orders
    .set_index(['orgacom', 'client', 'origin'], append=True)
    ['weight']
    .unstack('origin', fill_value=0)
    .idxmax(axis=1)
    .rename('main_origin')
)

CPU times: user 1min 39s, sys: 5.64 s, total: 1min 45s
Wall time: 1min 45s


In [51]:
raw_orders

Unnamed: 0_level_0,orgacom,client,origin,margin,brutrevenue,weight,linecount
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2017-07-01,1LRO,0000060151,VR,225.10,763.76,83.470,20
2017-07-01,1LRO,0000211118,VR,81.63,230.69,47.716,13
2017-07-01,1LRO,0000210623,TV,85.88,273.32,43.080,13
2017-07-01,1LRO,0000210124,TV,213.51,798.87,93.174,17
2017-07-01,1LRO,0000210071,TV,81.48,178.88,18.663,10
...,...,...,...,...,...,...,...
2021-05-31,1BFC,0000029484,TV,111.21,515.19,83.630,10
2021-05-31,1BFC,0000019288,TV,19.27,132.53,35.377,4
2021-05-31,1BFC,0000019204,EDI,14.41,83.22,41.000,8
2021-05-31,1BFC,0000055460,TV,37.83,597.28,114.374,17


In [54]:
%%time
# about 3min 30s

orders_with_main_origin = (
    raw_orders
    .set_index(['orgacom', 'client', 'origin'], append=True)
    .unstack('origin', fill_value=0)
    .rename_axis(('indicator', 'origin'), axis=1)
    .groupby('indicator', axis=1).sum()
).join(main_origin)

CPU times: user 3min 17s, sys: 1min 29s, total: 4min 46s
Wall time: 4min 43s


On vérifie que le calcul du canal majoritaire n'a pas modifié les indicateurs au total (la cellule suivante lève une exception en cas d'écart).

In [60]:
%%time
# about 3min 30s

check = orders_with_main_origin.iloc[:, :-1].join(raw_orders.groupby(['orgacom', 'date', 'client'], observed=True).sum(), rsuffix='_r')
for indicator in ['margin', 'brutrevenue', 'weight', 'linecount']:
    check[indicator + '_delta'] = (check[indicator] - check[indicator + '_r']).abs()
    assert(max(check[indicator + '_delta']) <= 0.00001)
del(check)

CPU times: user 2min 3s, sys: 1min 21s, total: 3min 25s
Wall time: 3min 23s


On peut contrôler que ce retraitement a bien produit le nombre de lignes initialement prévu (quelques cellules plus haut).

In [58]:
print(
    f"Order count in order dataset: {len(orders_with_main_origin)}"
)

Order count in order dataset: 13648307


In [59]:
orders_with_main_origin.to_pickle(persist_path / 'orders_all_SV_with_main_origin.pkl')