# RNDC ETL job

## Install libraries

In [21]:
import os
import sys

os.system(f"{sys.executable} -m pip install --quiet openpyxl")
os.system(f"{sys.executable} -m pip install --quiet unidecode")
os.system(f"{sys.executable} -m pip install --quiet redshift_connector")

0


## Import libraries

In [23]:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.job import Job

import pandas as pd
import numpy as np

import redshift_connector
import psycopg2

from unidecode import unidecode
import warnings
warnings.filterwarnings('ignore')




In [5]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




## Initial definitions:

### Variable types:

In [6]:
dict_types = {'MES': int,
            'COD_CONFIG_VEHICULO': str,
            'CONFIG_VEHICULO': str,
            'CODOPERACIONTRANSPORTE': str,
            'OPERACIONTRANSPORTE':str,
            'CODTIPOCONTENEDOR': str,
            'TIPOCONTENEDOR':str,
            'CODMUNICIPIOORIGEN': int,
            'MUNICIPIOORIGEN': str,
            'CODMUNICIPIODESTINO': int,
            'MUNICIPIODESTINO': str,
            'CODMERCANCIA': str,
            'MERCANCIA': str,  
            'NATURALEZACARGA': str,
            'VIAJESTOTALES': int,
            'KILOGRAMOS': float,
            'GALONES': float,
            'VIAJESLIQUIDOS': int,
            'VIAJESVALORCERO': int,
            'KILOMETROS': float,
            'VALORESPAGADOS': float,
            'CODMUNICIPIOINTERMEDIO': int,
            'MUNICIPIOINTERMEDIO': str,
            'KILOMETROSREGRESO': float,
            'KILOGRAMOSREGRESO': float,
            'GALONESREGRESO': float}




### Dimensions and RedShift tables dictionary:

In [7]:
# This dictionary represent all those columns that are necessary to build the dimension tables of the model.
# The keys represent the code names of the file, the values are a list in which the first element is the description of the code, the second is the dimesion table related to
# that code and the third one represents the idenfitier column name in the dimension table.

dict_dimensions = {
                'COD_CONFIG_VEHICULO':['CONFIG_VEHICULO','DIM_CONFIGURACIONES_VEHICULO','ID_CONFIGURACION_VEHICULO'],
                'CODOPERACIONTRANSPORTE':['OPERACIONTRANSPORTE','DIM_OPERACIONES_TRANSPORTE','ID_OPERACION_TRANSPORTE'],
                'CODTIPOCONTENEDOR':['TIPOCONTENEDOR','DIM_TIPOS_CONTENEDOR','ID_TIPO_CONTENEDOR'],
                'CODMUNICIPIOORIGEN':['MUNICIPIOORIGEN','DIM_MUNICIPIOS','ID_MUNICIPIO_ORIGEN'],
                'CODMUNICIPIODESTINO':['MUNICIPIODESTINO','DIM_MUNICIPIOS','ID_MUNICIPIO_DESTINO'],
                'CODMERCANCIA':['MERCANCIA','DIM_MERCANCIAS','ID_MERCANCIA'],
                'CODMUNICIPIOINTERMEDIO':['MUNICIPIOINTERMEDIO','DIM_MUNICIPIOS','ID_MUNICIPIO_INTERMEDIO'],
                'NATURALEZACARGA':['NATURALEZACARGA','DIM_NATURALEZAS_CARGA','ID_NATURALEZA_CARGA']
                }




### Fact table columns:

In [8]:
# This dictionary represent all those columns that are necessary to build the fact table of the model.
# The keys represent the column names that will be necessary to build the fact table, the values are a list in which the first element is the field order that is
# going to be used and the second one represents the name of the column in the Redshift table.

dict_fact = {
            'MES':['MES','ANO_MES'],
            'COD_CONFIG_VEHICULO':['ID_CONFIGURACION_VEHICULO','ID_CONFIGURACION_VEHICULO'],
            'CODOPERACIONTRANSPORTE':['ID_OPERACION_TRANSPORTE','ID_OPERACION_TRANSPORTE'],
            'CODTIPOCONTENEDOR':['ID_TIPO_CONTENEDOR','ID_TIPO_CONTENEDOR'],
            'CODMUNICIPIOORIGEN':['ID_MUNICIPIO_ORIGEN','ID_MUNICIPIO_ORIGEN'],
            'CODMUNICIPIODESTINO':['ID_MUNICIPIO_DESTINO','ID_MUNICIPIO_DESTINO'],
            'NATURALEZACARGA':['ID_NATURALEZA_CARGA','ID_NATURALEZA_CARGA'],
            'CODMERCANCIA':['ID_MERCANCIA','ID_MERCANCIA'],
            'VIAJESTOTALES':['VIAJESTOTALES','VIAJES_TOTALES'],
            'KILOGRAMOS':['KILOGRAMOS','KILOGRAMOS'],
            'GALONES':['GALONES','GALONES'],
            'VIAJESLIQUIDOS':['VIAJESLIQUIDOS','VIAJES_LIQUIDOS'],
            'VIAJESVALORCERO':['VIAJESVALORCERO','VIAJES_VALOR_CERO'],
            'KILOMETROS':['KILOMETROS','KILOMETROS'],
            'VALORESPAGADOS':['VALORESPAGADOS','VALORES_PAGADOS'],
            'CODMUNICIPIOINTERMEDIO':['ID_MUNICIPIO_INTERMEDIO','ID_MUNICIPIO_INTERMEDIO'],
            'KILOMETROSREGRESO':['KILOMETROSREGRESO','KILOMETROS_REGRESO'],
            'KILOGRAMOSREGRESO':['KILOGRAMOSREGRESO','KILOGRAMOS_REGRESO'],
            'GALONESREGRESO':['GALONESREGRESO','GALONES_REGRESO']
            }

ls_fact_keys = list(dict_fact.keys())
ls_fact_values = list(dict_fact.values())
ls_order_values = [x[0] for x in ls_fact_values]
ls_redshift_values = [x[-1] for x in ls_fact_values]
dict_redshift = {k: v[-1] for k, v in dict_fact.items()}




### Redshift connection:

In [9]:
conn = psycopg2.connect(
            host = 'redshift-cluster-2.cg5i3fotr9gy.sa-east-1.redshift.amazonaws.com', 
            database = 'dev', 
            port = 5439,
            user = 'admin', 
            password = 'Awscente1803*.*'
        )

cursor = conn.cursor()




## Retrive parameters from Lambda function:

In [10]:
args = getResolvedOptions(sys.argv, ['bucket','object_key'])
bucket = args['bucket']
object_key = args['object_key']

# bucket = 'rndc-raw'
# object_key = 'estadisticas/EstadisticasRNDC_202208.xlsx'

GlueArgumentError: the following arguments are required: --bucket, --object_key


## Read the dataset

In [18]:
df = pd.read_excel(f"s3://{bucket}/{object_key}", dtype=dict_types)




## Transformations

In [24]:
df.drop_duplicates(inplace=True)

df = df.applymap(lambda x: x.upper() if isinstance(x, str) else x)
df = df.applymap(lambda x: unidecode(x) if isinstance(x, str) else x)
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

df['CODMUNICIPIOORIGEN'] = df['CODMUNICIPIOORIGEN'].astype(str)
df['CODMUNICIPIODESTINO'] = df['CODMUNICIPIODESTINO'].astype(str)
df['CODMUNICIPIOINTERMEDIO'] = df['CODMUNICIPIOINTERMEDIO'].astype(str)

print(f"file: {object_key.split('/')[-1]} | status: refined")

file: EstadisticasRNDC_202207.xlsx | status: refined


## Dimesions tables:

In [25]:
for key in dict_dimensions:
    
    # Create dataframe with the coluns required to represent the dimension
    cod_description = dict_dimensions[key][0]
    if key != 'NATURALEZACARGA':
        df_dimension = df[[key,cod_description]]
    else:
        df_dimension = df[[key]]
    df_dimension.drop_duplicates(subset=[key],inplace=True)
    
    # Retrieve data from Redshift
    table_name = dict_dimensions[key][1]
    query = f'SELECT * FROM {table_name}'
    cursor.execute(query)
    rows = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]
    df_dimension_redshift = pd.DataFrame(rows, columns=column_names)
    
    # Identify new values for the dimension
    if key != 'NATURALEZACARGA':
        df_merged = pd.merge(df_dimension, df_dimension_redshift, how='left', left_on = key, right_on = 'codigo')
        df_merged = df_merged[pd.isna(df_merged['codigo'])]
    else:
        df_merged = pd.merge(df_dimension, df_dimension_redshift, how='left', left_on = key, right_on = 'naturaleza_carga') # esta dimensión no tiene código
        df_merged = df_merged[pd.isna(df_merged['naturaleza_carga'])]
        
    # Delete right columns
    left_columns = df_dimension.columns
    right_columns_to_drop = [col for col in df_merged.columns if col not in left_columns]
    df_merged = df_merged.drop(columns=right_columns_to_drop)
    
    #Insert data if there are new records to be inserted
    if df_merged.empty != True:
        data = [tuple(row) for row in df_merged.itertuples(index=False)]
        ls_col = df_dimension.columns.to_list()
        ls_col_redshift = df_dimension_redshift.columns.to_list()
        ls_col_redshift.remove('id')
        col_text = ",".join(ls_col_redshift)
        query = f"INSERT INTO {table_name} ({col_text}) VALUES ({','.join(['%s']*len(ls_col))})"
        cursor.executemany(query, data)
        conn.commit()
        print(f'table: {table_name} | inserted new row(s): {len(df_merged)}')
    else:
        print(f'table: {table_name} | not new values')

<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_CONFIGURACIONES_VEHICULO | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_OPERACIONES_TRANSPORTE | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_TIPOS_CONTENEDOR | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_MUNICIPIOS | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_MUNICIPIOS | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_MERCANCIAS | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_MUNICIPIOS | not new values
<redshift_connector.cursor.Cursor object at 0x7fd184ff9e10>
table: DIM_NATURALEZAS_CARGA | not new values


## Fact table:

In [26]:
df_fact = df[ls_fact_keys].copy()

for key in dict_dimensions:
    #Retrieve data from Redshift
    table_name = dict_dimensions[key][1]
    id_name = dict_dimensions[key][-1]
    
    if key != 'NATURALEZACARGA': query = f'SELECT id AS {id_name}, codigo FROM {table_name}'
    else: query = f'SELECT id AS {id_name}, naturaleza_carga AS codigo FROM {table_name}'
    cursor.execute(query)
    rows = cursor.fetchall()
    
    column_names = [desc[0].upper() for desc in cursor.description]
    df_dimension_redshift = pd.DataFrame(rows, columns=column_names)
    
    #Change cod column to its corresponding id
    df_fact = pd.merge(df_fact,df_dimension_redshift,left_on = key,right_on = 'CODIGO',how = 'left')
    df_fact.drop([key,'CODIGO'], axis=1, inplace=True)

#Order de columns to save into Redshift
df_fact = df_fact[ls_order_values].copy()




In [None]:
# TEMPORAL
df_fact = df_fact.head(50)

In [None]:
ano_mes = int(df_fact['MES'].unique()[0])
dataset = list(zip(df_fact['ID_MUNICIPIO_INTERMEDIO'], df_fact['VIAJESVALORCERO'], df_fact['VIAJESLIQUIDOS'], df_fact['VIAJESTOTALES'], df_fact['ID_MERCANCIA'], df_fact['ID_NATURALEZA_CARGA'], df_fact['ID_MUNICIPIO_DESTINO'], df_fact['ID_MUNICIPIO_ORIGEN'], df_fact['ID_TIPO_CONTENEDOR'], df_fact['ID_OPERACION_TRANSPORTE'], df_fact['ID_CONFIGURACION_VEHICULO'], df_fact['MES'], df_fact['GALONESREGRESO'], df_fact['KILOGRAMOSREGRESO'], df_fact['KILOMETROSREGRESO'], df_fact['VALORESPAGADOS'], df_fact['KILOMETROS'], df_fact['GALONES'], df_fact['KILOGRAMOS']))

consulta = "DELETE FROM estadisticas WHERE ano_mes = %s;"
cursor.execute(consulta, (ano_mes,))

consulta = "INSERT INTO estadisticas (id_municipio_intermedio, viajes_valor_cero, viajes_liquidos, viajes_totales, id_mercancia, id_naturaleza_carga, id_municipio_destino, id_municipio_origen, id_tipo_contenedor, id_operacion_transporte, id_configuracion_vehiculo, ano_mes, galones_regreso, kilogramos_regreso, kilometros_regreso, valores_pagados, kilometros, galones, kilogramos) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"
cursor.executemany(consulta, (dataset))
conn.commit()

In [16]:
cursor.close()
conn.close()


