<font size="+3"><b><center>DP IMP ACDS Data Pipeline</center></b></font>

### Imports

In [1]:
%load_ext autoreload
%autoreload 2
import os
import datetime
import pandas as pd
pd.set_option("display.max_rows", 500)
pd.set_option("display.max_columns", 100)
from pyspark.sql import functions as F

from crf0a_app.configuration import spark_config
from crf0a_app.utils import system

### Spark session

In [2]:
spark_context, spark_session = spark_config.get_spark(
    app_name="[app00] Test_Read_Write_Data",
    driver_cores=1,
    driver_mem="4g",
    max_executors=8,
    executor_cores=4,
    executor_mem="4g"
)

### HDFS

#### Read

In [3]:
# Data filepath
cycle = '202201'
flow_filepath = "/user/sd01865/crf0a/data/iCube/PROREV_%sIMP_ACDS.csv" %cycle
print(flow_filepath)

/user/sd01865/crf0a/data/iCube/PROREV_202201IMP_ACDS.csv


In [4]:
# Read data from HDFS
df_flow = spark_session.read.options(header='True', inferSchema='True', delimiter=';').csv(flow_filepath)
df_flow.printSchema()

root
 |-- Marque: string (nullable = true)
 |-- Pays: string (nullable = true)
 |-- Libelle pays: string (nullable = true)
 |-- Code version: string (nullable = true)
 |-- Date: integer (nullable = true)
 |-- Volume 01: integer (nullable = true)
 |-- Volume 02: integer (nullable = true)
 |-- Volume 03: integer (nullable = true)
 |-- Volume 04: integer (nullable = true)
 |-- Volume 05: integer (nullable = true)
 |-- Volume 06: integer (nullable = true)
 |-- Volume 07: integer (nullable = true)
 |-- Volume 08: integer (nullable = true)
 |-- Volume 09: integer (nullable = true)
 |-- Volume 10: integer (nullable = true)
 |-- Volume 11: integer (nullable = true)
 |-- Volume 12: integer (nullable = true)



In [5]:
df_flow.persist()

DataFrame[Marque: string, Pays: string, Libelle pays: string, Code version: string, Date: int, Volume 01: int, Volume 02: int, Volume 03: int, Volume 04: int, Volume 05: int, Volume 06: int, Volume 07: int, Volume 08: int, Volume 09: int, Volume 10: int, Volume 11: int, Volume 12: int]

In [6]:
df_flow.limit(5).toPandas()
df_flow.printSchema()

root
 |-- Marque: string (nullable = true)
 |-- Pays: string (nullable = true)
 |-- Libelle pays: string (nullable = true)
 |-- Code version: string (nullable = true)
 |-- Date: integer (nullable = true)
 |-- Volume 01: integer (nullable = true)
 |-- Volume 02: integer (nullable = true)
 |-- Volume 03: integer (nullable = true)
 |-- Volume 04: integer (nullable = true)
 |-- Volume 05: integer (nullable = true)
 |-- Volume 06: integer (nullable = true)
 |-- Volume 07: integer (nullable = true)
 |-- Volume 08: integer (nullable = true)
 |-- Volume 09: integer (nullable = true)
 |-- Volume 10: integer (nullable = true)
 |-- Volume 11: integer (nullable = true)
 |-- Volume 12: integer (nullable = true)



In [7]:
df_flow.count()

9439

### Transformations

In [8]:
#change the data type of the Date column
df_flow = df_flow.withColumn("Date", F.to_date(F.col("Date").cast("string"), \
    'yyyyMM'))

In [9]:
df_flow.limit(5).toPandas()

Unnamed: 0,Marque,Pays,Libelle pays,Code version,Date,Volume 01,Volume 02,Volume 03,Volume 04,Volume 05,Volume 06,Volume 07,Volume 08,Volume 09,Volume 10,Volume 11,Volume 12
0,C,027987W,SOUTH AFRICA AC VT New,1CCESYTKNMURA050,2022-01-01,0,0,4,1,3,0,12,10,5,2,3,0
1,C,027987W,SOUTH AFRICA AC VT New,1CCESYVKNMURA050,2022-01-01,0,0,6,2,4,0,17,14,7,2,5,0
2,C,027987W,SOUTH AFRICA AC VT New,1CCESYTKNMURA046,2022-01-01,13,8,0,0,0,0,0,0,0,0,0,0
3,C,027987W,SOUTH AFRICA AC VT New,1CCESYVKNMURA046,2022-01-01,19,11,0,0,0,0,0,0,0,0,0,0
4,C,027987W,SOUTH AFRICA AC VT New,1CW8AFKCZMURA052,2022-01-01,17,19,9,3,7,0,28,23,12,4,8,0


In [10]:
#get the date value we will need to change the volume columns names
the_date = df_flow.collect()[0][4]
print(the_date)

2022-01-01


In [11]:
#Rename the Valume colums with the date of the first of every month starting from the_date 
df_flow = df_flow.withColumnRenamed("Volume 01",str(the_date)) \
    .withColumnRenamed("Volume 02",str((the_date + datetime.timedelta(days=32)).replace(day=1))) \
    .withColumnRenamed("Volume 03",str((the_date + datetime.timedelta(days=32*2)).replace(day=1))) \
    .withColumnRenamed("Volume 04",str((the_date + datetime.timedelta(days=32*3)).replace(day=1))) \
    .withColumnRenamed("Volume 05",str((the_date + datetime.timedelta(days=32*4)).replace(day=1))) \
    .withColumnRenamed("Volume 06",str((the_date + datetime.timedelta(days=32*5)).replace(day=1))) \
    .withColumnRenamed("Volume 07",str((the_date + datetime.timedelta(days=32*6)).replace(day=1))) \
    .withColumnRenamed("Volume 08",str((the_date + datetime.timedelta(days=32*7)).replace(day=1))) \
    .withColumnRenamed("Volume 09",str((the_date + datetime.timedelta(days=32*8)).replace(day=1))) \
    .withColumnRenamed("Volume 10",str((the_date + datetime.timedelta(days=32*9)).replace(day=1))) \
    .withColumnRenamed("Volume 11",str((the_date + datetime.timedelta(days=32*10)).replace(day=1))) \
    .withColumnRenamed("Volume 12",str((the_date + datetime.timedelta(days=32*11)).replace(day=1))) \
    .withColumnRenamed("Date","Cycle")

In [12]:
pd_df=df_flow.toPandas()

In [15]:
print(pd_df.columns)

Index(['Marque', 'Pays', 'Libelle pays', 'Code version', 'Cycle', '2022-01-01',
       '2022-02-01', '2022-03-01', '2022-04-01', '2022-05-01', '2022-06-01',
       '2022-07-01', '2022-08-01', '2022-09-01', '2022-10-01', '2022-11-01',
       '2022-12-01'],
      dtype='object')


In [16]:
#pivot the table
pivoted_df = pd_df.melt(id_vars = ['Marque', 'Pays','Libelle pays','Code version','Cycle'], value_vars = ['2022-01-01',
       '2022-02-01', '2022-03-01', '2022-04-01', '2022-05-01', '2022-06-01',
       '2022-07-01', '2022-08-01', '2022-09-01', '2022-10-01', '2022-11-01',
       '2022-12-01'], var_name = 'Date', value_name = 'Value')

In [17]:
pivoted_df

Unnamed: 0,Marque,Pays,Libelle pays,Code version,Cycle,Date,Value
0,C,027987W,SOUTH AFRICA AC VT New,1CCESYTKNMURA050,2022-01-01,2022-01-01,0
1,C,027987W,SOUTH AFRICA AC VT New,1CCESYVKNMURA050,2022-01-01,2022-01-01,0
2,C,027987W,SOUTH AFRICA AC VT New,1CCESYTKNMURA046,2022-01-01,2022-01-01,13
3,C,027987W,SOUTH AFRICA AC VT New,1CCESYVKNMURA046,2022-01-01,2022-01-01,19
4,C,027987W,SOUTH AFRICA AC VT New,1CW8AFKCZMURA052,2022-01-01,2022-01-01,17
...,...,...,...,...,...,...,...
113263,C,010850U,IRELAND ACDS GOWAN,1CK9CLYMNKU0A042,2022-01-01,2022-12-01,0
113264,C,027525U,MAURITIUS AC AXESS,1CCESYVKNMURA050,2022-01-01,2022-12-01,4
113265,C,027525U,MAURITIUS AC AXESS,1CCESYVKNMURA046,2022-01-01,2022-12-01,0
113266,C,027525U,MAURITIUS AC AXESS,1CW8AFNCZMURA052,2022-01-01,2022-12-01,0


In [18]:
#slice the Code version values to leave only the first 4 characters
pivoted_df['Code version']=pivoted_df['Code version'].str[:4]

In [19]:
pivoted_df

Unnamed: 0,Marque,Pays,Libelle pays,Code version,Cycle,Date,Value
0,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,0
1,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,0
2,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,13
3,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,19
4,C,027987W,SOUTH AFRICA AC VT New,1CW8,2022-01-01,2022-01-01,17
...,...,...,...,...,...,...,...
113263,C,010850U,IRELAND ACDS GOWAN,1CK9,2022-01-01,2022-12-01,0
113264,C,027525U,MAURITIUS AC AXESS,1CCE,2022-01-01,2022-12-01,4
113265,C,027525U,MAURITIUS AC AXESS,1CCE,2022-01-01,2022-12-01,0
113266,C,027525U,MAURITIUS AC AXESS,1CW8,2022-01-01,2022-12-01,0


In [20]:
#add the Inserted_date column
pivoted_df['INSERTED_DATE'] = datetime.date.today()

In [21]:
#add the Measure column
pivoted_df['Measure'] = 'DP IMP'

In [22]:
#Rename some columns to match the destination columns in Oracle Exadata
pivoted_df = pivoted_df.rename(columns={"Date":"MONTHYEAR"})
pivoted_df = pivoted_df.rename(columns={"Code version":"FAMILLE"})
pivoted_df = pivoted_df.rename(columns={"Libelle pays":"LIBELLE_PAYS"})
pivoted_df

Unnamed: 0,Marque,Pays,LIBELLE_PAYS,FAMILLE,Cycle,MONTHYEAR,Value,INSERTED_DATE,Measure
0,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,0,2022-07-13,DP IMP
1,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,0,2022-07-13,DP IMP
2,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,13,2022-07-13,DP IMP
3,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,2022-01-01,19,2022-07-13,DP IMP
4,C,027987W,SOUTH AFRICA AC VT New,1CW8,2022-01-01,2022-01-01,17,2022-07-13,DP IMP
...,...,...,...,...,...,...,...,...,...
113263,C,010850U,IRELAND ACDS GOWAN,1CK9,2022-01-01,2022-12-01,0,2022-07-13,DP IMP
113264,C,027525U,MAURITIUS AC AXESS,1CCE,2022-01-01,2022-12-01,4,2022-07-13,DP IMP
113265,C,027525U,MAURITIUS AC AXESS,1CCE,2022-01-01,2022-12-01,0,2022-07-13,DP IMP
113266,C,027525U,MAURITIUS AC AXESS,1CW8,2022-01-01,2022-12-01,0,2022-07-13,DP IMP


In [23]:
pivoted_df.columns

Index(['Marque', 'Pays', 'LIBELLE_PAYS', 'FAMILLE', 'Cycle', 'MONTHYEAR',
       'Value', 'INSERTED_DATE', 'Measure'],
      dtype='object')

In [24]:
#reordering the columns
cols = ('Marque', 'Pays', 'LIBELLE_PAYS', 'FAMILLE','MONTHYEAR','Measure','Value','Cycle','INSERTED_DATE')
pivoted_df = pivoted_df[list(cols)]
pivoted_df

Unnamed: 0,Marque,Pays,LIBELLE_PAYS,FAMILLE,MONTHYEAR,Measure,Value,Cycle,INSERTED_DATE
0,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,DP IMP,0,2022-01-01,2022-07-13
1,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,DP IMP,0,2022-01-01,2022-07-13
2,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,DP IMP,13,2022-01-01,2022-07-13
3,C,027987W,SOUTH AFRICA AC VT New,1CCE,2022-01-01,DP IMP,19,2022-01-01,2022-07-13
4,C,027987W,SOUTH AFRICA AC VT New,1CW8,2022-01-01,DP IMP,17,2022-01-01,2022-07-13
...,...,...,...,...,...,...,...,...,...
113263,C,010850U,IRELAND ACDS GOWAN,1CK9,2022-12-01,DP IMP,0,2022-01-01,2022-07-13
113264,C,027525U,MAURITIUS AC AXESS,1CCE,2022-12-01,DP IMP,4,2022-01-01,2022-07-13
113265,C,027525U,MAURITIUS AC AXESS,1CCE,2022-12-01,DP IMP,0,2022-01-01,2022-07-13
113266,C,027525U,MAURITIUS AC AXESS,1CW8,2022-12-01,DP IMP,0,2022-01-01,2022-07-13


In [25]:
from crf0a_app.infra.oracle_database import OracleDatabase

In [26]:
#Instantiate OracleDatabase object
oracle_db = OracleDatabase(dialect="jdbc", spark_session=spark_session)

In [27]:
#Convert the pandas dataframe to pyspark dataframe
sparkDF=spark_session.createDataFrame(pivoted_df) 

#### Write

In [28]:
# Write data to Oracle
oracle_db.write_df_to_oracle(
    sparkDF,
    "BRC_SD01865.DP_IMP_ACDS",
    mode="append"
)