<img src="_media/logo_stellantis.png" width="300">

<font size="+3"><b><center>Tutorial 02: Example pipeline</center></b></font>

This notebook explains all the different steps of the example pipeline located in app_template/pipeline/app_pipeline.py.

It shows how the pipeline involves all the modules of the application.

### Imports

In [1]:
%load_ext autoreload
%autoreload 2
import os
import datetime
import pandas as pd
pd.set_option("display.max_rows", 500)
pd.set_option("display.max_columns", 100)

from app_template.configuration import app, data, mail, spark_config
from app_template.infra.oracle_database import OracleDatabase
from app_template.infra.repositories import flow_repository
from app_template.domain import kpi
from app_template.interface import notification
from app_template.utils import system

### Configuration: Application configuration

In [2]:
# Pipeline start
dt_start = datetime.datetime.now()

In [3]:
# Retrieve information from the configuration files located in app_tempate/configuration/resources directory
data_config = data.DataConfig()

data_config.infra_config

{'flow': {'input_dirpath': '/user/brc05/data/refined/manf001_vehc_prdc_flow/year=2021/month=01',
  'output_filename': 'production_details',
  'perimeter': {'site_code': ['PY'],
   'genr_door': 'SMON',
   'start_date': '2021-01-18',
   'end_date': '2021-01-21'},
  'columns': ['site_code', 'faml_grp_labl', 'vin', 'pass_date']},
 'kpis': {'table_name': 'app00_prdc_kpis'}}

In [4]:
# Mailer class to send emails
mailer = mail.Mailer()

In [7]:
# Instantiate Spark session
_, spark_session = spark_config.get_spark(
    app_name="[app00] Test_App_Template",
    driver_cores=1,
    driver_mem="4g",
    max_executors=8,
    executor_cores=4,
    executor_mem="4g"
)

### Infra: Connection to the database to read the application input

In [None]:
# Create temp dirpath
TEMP_DIRPATH = os.path.join(os.environ["UNXDATA"], "temp")
if not os.path.exists(TEMP_DIRPATH):
    os.makedirs(TEMP_DIRPATH)
    
# Flow repository: production flow data collection
flow_repo = flow_repository.FlowRepository(spark_session, TEMP_DIRPATH)

In [None]:
# Read data according to DataConfig
flow_repo.read(preprocessing=True)

flow_repo.df.limit(5).toPandas()

In [None]:
# Write data according to DataConfig
flow_repo.write()

### Domain: Business modelisation

In [9]:
# Instantiate execution KPIs
execution_kpis = {}

In [10]:
# Compute production KPIs
kpis_production = kpi.compute_n_vins(
    flow_repo.df, ["site_code", "faml_grp_labl", "smon_date"]
)

kpis_production.head(5)

Unnamed: 0,site_code,faml_grp_labl,smon_date,n_vin
0,PY,P2QO-BEV,2021-01-18,9
1,PY,P2QO-BEV,2021-01-19,8
2,PY,P2QO-BEV,2021-01-20,16
3,PY,P2QO-TH,2021-01-18,26
4,PY,P2QO-TH,2021-01-19,43


In [11]:
execution_kpis["Total number of vehicles produced"] = kpis_production['n_vin'].sum()

In [12]:
# Format date
kpis_production["smon_date"] = pd.to_datetime(
    kpis_production["smon_date"],
    format="%Y-%m-%d"
)

### Infra: Connection to the database to write the application output

In [13]:
# OracleDatabase
oracle_db = OracleDatabase("cx_oracle")

In [14]:
# Write df to Oracle
kpis_table_name = data_config.infra_config["kpis"]["table_name"]
oracle_db.write_df_to_oracle(
    kpis_production,
    kpis_table_name,
    mode="overwrite"
)

### Interface: Send an execution email

In [15]:
notification.send_email(
    mailer,
    "success",
    kpis_table_name=kpis_table_name,
    execution_kpis=execution_kpis
)