# Data pipeline 

## 1.1 Carga de Información

Ya que hemos configurado el manejador de base de datos ahora crearemos la DB, llamemosla `Conekta`, como requerimos de las claves del service account para realizar la conexión no tenemos un gran riesgo de seguridad al dejar explícita la contraseña y el usser.  

In [1]:
import psycopg2 # conector a PostgreSQL
import pandas as pd # manioulacion de data.frames
import numpy as np # operaciones vectorcizadas en C :D 
import gc # grabage collector para liberar memoria explicitamente 

Revisemos cómo vienen los datos crudos en el .csv, observando los primeros y últimos 50 registros. 

In [2]:
! wc -l data_prueba_tecnica.csv # para conocer el numero de lineas del dataset original

10001 data_prueba_tecnica.csv


In [3]:
cargos_head = pd.read_csv('data_prueba_tecnica.csv', nrows=50, error_bad_lines=True, warn_bad_lines =True, skip_blank_lines=False)
cargos_tail = pd.read_csv('data_prueba_tecnica.csv', nrows=50, skiprows=9950, header=None, \
                           names=list(cargos_head.columns), error_bad_lines=True, warn_bad_lines =True, skip_blank_lines=False)
cargos = cargos_head.append( cargos_tail)#, ignore_index=True)
cargos.head(100)

Unnamed: 0,id,name,company_id,amount,status,created_at,paid_at
0,,,,,,,
1,48ba4bdbfb56ceebb32f2bd0263e759be942af3d,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.00,voided,2019-03-19,
2,,,,,,,
3,05fc6f5ac66b6ee7e4253aa5d0c2299eb47aaaf4,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.00,pending_payment,2019-05-06,
4,,,,,,,
...,...,...,...,...,...,...,...
45,,,,,,,
46,03a95d46ec32a76cd3a5f51cd747aa5db89a537a,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,59.04,pending_payment,2019-04-17,
47,,,,,,,
48,abf6097b036eadac9703779cc3b5dc1697afd619,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,68.39,paid,2019-03-16,2019-03-16


In [4]:
# como ya no requerimos estos dataframes los eliminamos para liberar memoria 
del cargos_head
del cargos_tail
gc.collect()

214

En general los primeros y últimos registros lucen bien con excepción de de las líneas en blanco alternadas con cada registro con información, veamos cómo viene el dataset completo.


In [5]:
cargos_row = pd.read_csv('data_prueba_tecnica.csv', error_bad_lines=True, warn_bad_lines =True, skip_blank_lines=True, \
                        encoding = 'utf-8', verbose =True, infer_datetime_format=False)
print(cargos_row.shape)
cargos_row.head(20)

Tokenization took: 30.01 ms
Type conversion took: 28.60 ms
Parser memory cleanup took: 0.01 ms
(10000, 7)


Unnamed: 0,id,name,company_id,amount,status,created_at,paid_at
0,48ba4bdbfb56ceebb32f2bd0263e759be942af3d,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,voided,2019-03-19,
1,05fc6f5ac66b6ee7e4253aa5d0c2299eb47aaaf4,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,pending_payment,2019-05-06,
2,2cdce231c1fc6a2061bfa2f1d978351fe217245d,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,voided,2019-02-22,
3,81633ba310a50b673efd469c37139576982901aa,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,102.61,paid,2019-02-27,2019-02-27
4,6ccfc4c24e788e4bca448df343698782db6b0c0b,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,184.49,paid,2019-02-05,2019-02-05
5,b25f2ff15c24ea881e676be772f4dd99891ad188,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,85.27,pending_payment,2019-01-04,
6,4f9db76960dcdf6b7e2d7d71b11519e49d7b1179,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,41.0,pre_authorized,2019-05-04,
7,7e96211c7d133a6227e809abc3d3ab18992b36b9,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,3.0,pending_payment,2019-01-23,
8,6ee515d5f0a1995731fb31053278e9700d67947c,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,41.61,voided,2019-05-03,
9,cdf1a7a6f5c9b0db543371f9e4bd127b451c09c9,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e,115.84,paid,2019-01-22,2019-01-22


Para sorpresa, no hay errores ni warnings en la lectura de los datos crudos.


## 1.2 Transformación

In [6]:
print(cargos_row.describe(include='all'))
print('-------------------')
print(cargos_row.shape)
print(cargos_row.isna().sum()) #number of on.nas

                                              id        name  \
count                                       9997        9997   
unique                                      9997           4   
top     f0a45ea52e832d41f243530204891a0589fbc7e6  MiPasajefy   
freq                                           1        9899   

                                      company_id amount status  created_at  \
count                                       9996  10000  10000       10000   
unique                                         3   5771     10         143   
top     cbf1c8b09cd5b549416d49d220a40cbd317f952e    3.0   paid  2019-03-01   
freq                                        9899   1980   5892         117   

           paid_at  
count         6009  
unique         140  
top     2019-02-27  
freq            71  
-------------------
(10000, 7)
id               3
name             3
company_id       4
amount           0
status           0
created_at       0
paid_at       3991
dtype: int64


De lo anterior es importante notar los siguientes puntos:
- Existen 3 registros con `id` y `name` nulos, al igual que 4 `company_id`.
- El campo `id` parece ser de buena calidad después de arreglar el punto anterior.
- Existen 4 valores para el nombre de compañías diferentes que habrá que validar.
- También en la limpieza tendremos que considerar la distribución de los valores para la columna `amount` y detectar valores atípicos. 

Finalmente validaremos que la fecha de actualización de los registros sea posterior a la fecha de creación del registro. 


### Limpieza de nulos 

In [7]:
# vamos a seleccionar los registros con nulos para limpiarlos
def Selec_na_index( data ): 
    '''
    Inputs 
     data (pandas data.frame): Solo con las columnas de interes
    Outputs:
     index (numpy array):  Indices de data donde existen nulos 
    '''
    columnas = data.columns
    index =  [ list( data[x].index[ pd.isna( data[x] ) ])  for x in columnas ]
    index = sum( index, []) 
    index.sort()
    return( np.array(index))

Como los registros con `id` nulo contienen información les asignamos uno para no perder esa información __posteriormente revisaremos el porqué estos registros ‘llegarón’ con  `id` nulo__ 

In [8]:
index = Selec_na_index( cargos_row[['id', 'name', 'company_id', 'amount', 'status', 'created_at']])
nulos = cargos_row.iloc[index]
index = cargos_row['id'].index[ pd.isna( cargos_row['id'])]
cargos_row['id'][index] = range(0, len(index))

Para limpiar las columnas `name` y `company_id` nos valdremos de la misma información contenida en el dataset, construiremos un catálogo y sustituiremos los  nulos y valores atípicos. 


In [9]:
catalogo = cargos_row[['name', 'company_id']].drop_duplicates()
catalogo

Unnamed: 0,name,company_id
0,MiPasajefy,cbf1c8b09cd5b549416d49d220a40cbd317f952e
78,Muebles chidos,8f642dc67fccf861548dfe1c761ce22f795e91f0
262,MiPasajefy,
603,MiPasajefy,*******
731,,cbf1c8b09cd5b549416d49d220a40cbd317f952e
1320,MiPas0xFFFF,cbf1c8b09cd5b549416d49d220a40cbd317f952e
1479,MiP0xFFFF,cbf1c8b09cd5b549416d49d220a40cbd317f952e


El catálogo nos permite ver que estamos en una situación sencilla todos los valores nulos y  anormales para `name` y `company_id` son asociados al valor `MiPasajefy`. Por lo que será sencillo limpiarlos, __sin embargo valdría la pena investigar el pipeline que produce este archivo ya que es curioso que solo se esté presentando para esta compañía__ 

In [10]:
index = cargos_row['name'].index[ pd.isna( cargos_row['name'])]
cargos_row['name'][index] = catalogo.name[0]
index = cargos_row['company_id'].index[ pd.isna( cargos_row['company_id'])]
cargos_row['company_id'][index] = catalogo.company_id[0]
cargos_row = cargos_row.replace({'name':  {x : catalogo.name[0] for x in list(set(cargos_row.name)) if x not in ['Muebles chidos', 'MiPasajefy'] } } )
cargos_row = cargos_row.replace({'company_id': { x : catalogo.company_id[0] for x in list(set(cargos_row.company_id))  if x not in ['8f642dc67fccf861548dfe1c761ce22f795e91f0', 'cbf1c8b09cd5b549416d49d220a40cbd317f952e'] } } )

### Errores de encoding y validación de fechas 

Notamos que la columna`status` tiene errores de encoding, uno que puede identificarse a simple vista y otro que sustituiremos por un valor ‘Desconocido’, para no perder esta transacción y nuevamente __valdría la pena investigar el proceso que extrae este archivo porque este valor de encoding se presenta en la misma compañía en la que hemos tenido problemas__. 

In [11]:
print(set(cargos_row.status))
cargos_row = cargos_row = cargos_row.replace({'status':  { 'p&0x3fid' : 'paid',  '0xFFFF': 'desconocido'}})

{'expired', 'partially_refunded', 'p&0x3fid', 'pending_payment', 'pre_authorized', 'charged_back', 'paid', '0xFFFF', 'voided', 'refunded'}


In [12]:
temp = pd.DataFrame(data= { 'freq': cargos_row['created_at'].value_counts(), 'value': cargos_row['created_at'].value_counts().index })
print(temp)
#pd.to_datetime(temp['value'],format= '%Y-%M-%D' ).dt.time
print(pd.to_datetime(temp['value'], infer_datetime_format=True))# verificamos que el parseo sea correcto 

                     freq                value
2019-03-01            117           2019-03-01
2019-03-02            104           2019-03-02
2019-02-27            103           2019-02-27
2019-03-08            103           2019-03-08
2019-02-28            103           2019-02-28
...                   ...                  ...
2019-02-04             35           2019-02-04
2019-05-20              8           2019-05-20
20190121                1             20190121
2019-02-27T00:00:00     1  2019-02-27T00:00:00
20190516                1             20190516

[143 rows x 2 columns]
2019-03-01            2019-03-01
2019-03-02            2019-03-02
2019-02-27            2019-02-27
2019-03-08            2019-03-08
2019-02-28            2019-02-28
                         ...    
2019-02-04            2019-02-04
2019-05-20            2019-05-20
20190121              2019-01-21
2019-02-27T00:00:00   2019-02-27
20190516              2019-05-16
Name: value, Length: 143, dtype: datetime64[ns]


In [13]:
# cast de tipo de dato 
cargos_row['created_at'] = pd.to_datetime(cargos_row['created_at'], infer_datetime_format=True)
cargos_row['paid_at'] = pd.to_datetime(cargos_row['paid_at'], infer_datetime_format=True)
# validacion de fecha de pago 
index = -cargos_row['paid_at'].isna() 
temp = cargos_row['paid_at'][index] - cargos_row['created_at'][index]
temp.value_counts()  #ningun valor negativo OK

0 days    5612
1 days     395
3 days       1
2 days       1
dtype: int64

## 1.4 Dispersión de la información

Creación de la DB, las tablas, sus schemas y la relación entre ellas. 



In [14]:
try:
    conn = psycopg2.connect("host=localhost user=postgres password=conektafou")
except psycopg2.Error as e: 
    print("Error: No se pudo establecer conexión con la base de datos")
conn.set_session(autocommit=True)

cur = conn.cursor()
try:
    cur.execute("DROP DATABASE IF EXISTS Conekta")
    cur.execute("CREATE DATABASE Conekta WITH ENCODING 'utf8' ")
except psycopg2.Error as e: 
    print("Error: No se pudo crear la DB")

try:
    cur.execute("CREATE SCHEMA IF NOT EXISTS  Transaccional AUTHORIZATION postgres ")
except psycopg2.Error as e: 
    print("Error: No se pudo crear el schema Transaccional")

try:    
    cur.execute('''CREATE TABLE Transaccional.Companie (
                id varchar(24) PRIMARY KEY, 
                company_name varchar(130) NOT NULL ) ''')  
except psycopg2.Error as e: 
    print("Error: No se pudo crear la tabla Companie")
   
try:
cur.execute(''' CREATE TABLE Transaccional.Charge ( 
                            id  varchar(24) PRIMARY KEY, 
                            company_id varchar(24) NOT NULL  REFERENCES Transaccional.Companies(id),
                            amount decimal(16,2) NOT NULL, 
                            status varchar(30) NOT NULL
                            created_at timestamp NOT NULL, 
                            updated_at timestamp NOT NULL     )''')
except psycopg2.Error as e: 
    print("Error: No se pudo crear la tabla charge")



(10000, 7)