# Proceso de ETL

### • Puntos a tener en cuenta:

Los productos tienen más de un código?

py arrow -> read parquet

el script debe poder recibir nuevas tablas y arrojar un resultado unificado.
La carga incremental debe estar administrada por el mismo script.

al data engineer no le importan los outliers, eso es cuestión del analista

DER: Diagrama de Entidad Relacional

La carga: una carga más.. No debe relacionarse al tiempo

print shape info() isnull().sum() duplicated().value_counts()

precios en excel tienen un decimal corrido

producto tiene items con múltiples IDE y también IDE's que se repiten en múltiples productos

Los problemas que no se alcancen a solucionar hay que señalarlos 


In [1]:
# Empezamos importando las librerías que usaremos para la limpieza de los datos
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
from datetime import datetime
import re
import os

## Creación de los DataFrames en pandas
Importamos los datasets de diferentes formatos a DataFrames de pandas de manera automatizada, organizándolos en un diccionario para facilitar su acceso y limpieza.

In [2]:
# Obtenemos una lista de los archivos dentro del directorio 'datasets'
file_names = os.listdir('.\datasets')
print(file_names)

['precios_semanas_20200419_20200426.xlsx', 'precios_semana_20200413.csv', 'precios_semana_20200503.json', 'precios_semana_20200518.txt', 'producto.parquet', 'sucursal.csv']


In [3]:
# Creamos un diccionario con los nombres de cada archivo y su extensión
datasets_extensions = {}
for x in file_names:
    datasets_extensions[x] = x.split('.')

for x in datasets_extensions:
    print(f'\nKey: {x}\nValue: {datasets_extensions[x]}\n')


Key: precios_semanas_20200419_20200426.xlsx
Value: ['precios_semanas_20200419_20200426', 'xlsx']


Key: precios_semana_20200413.csv
Value: ['precios_semana_20200413', 'csv']


Key: precios_semana_20200503.json
Value: ['precios_semana_20200503', 'json']


Key: precios_semana_20200518.txt
Value: ['precios_semana_20200518', 'txt']


Key: producto.parquet
Value: ['producto', 'parquet']


Key: sucursal.csv
Value: ['sucursal', 'csv']



Ahora usaremos las extensiones y los nombres de los archivos para importar los datasets en objetos 'DataFrame' de pandas.

Habiendo estudiado un poco la naturaleza de los datasets antes de importarlos, se pudo observar que los registros
o *tablas de hecho* son las que llevan por nombre 'precios_...', mientras que las *dimensiones* son los datasets de 
'producto' y 'sucursal', pues proveen información adicional a las primeras tablas.

Así, los datasets de precios serán organizados en el diccionario 'ps_2020' (precio semanal 2020) y los otros dos en el 
diccionario 'dims' (dimensiones).

In [4]:
# Creamos los diccionarios y automatizamos la importación de los archivos contenidos en el directorio 'datasets'
ps_2020 = {}
dims = {}
for x in datasets_extensions:
    path = f'datasets/{x}'
    if x[:7] == 'precios':
        if datasets_extensions[x][1] in ['xlsx', 'xls']:
            xl_dict = pd.read_excel(path, sheet_name=None, date_parser=None)
            for sheet in xl_dict:
                name = f'{sheet[-8:-4]}-{sheet[-4:-2]}-{sheet[-2:]}'
                ps_2020[name] = pd.DataFrame(xl_dict[sheet])
        else:
            name = f'{datasets_extensions[x][0][-8:-4]}-{datasets_extensions[x][0][-4:-2]}-{datasets_extensions[x][0][-2:]}'
            if datasets_extensions[x][1] == 'csv':
                ps_2020[name] = pd.read_csv(path, encoding='UTF-16 LE')
            elif datasets_extensions[x][1] == 'json':
                ps_2020[name] = pd.read_json(path)
            elif datasets_extensions[x][1] == 'txt':
                ps_2020[name] = pd.read_csv(path, delimiter='|')
    else:
        name = datasets_extensions[x][0]
        if datasets_extensions[x][1] == 'csv':
            dims[name] = pd.read_csv(path)
        elif datasets_extensions[x][1] == 'parquet':
            dims[name] = pd.read_parquet(path)

In [13]:
for x in ps_2020:
    print(f'• DataFrame: {x}\n• Shape: {ps_2020[x].shape}\n• Columnas: {ps_2020[x].columns}\n')
for x in dims:
    print(f'• DataFrame: {x}\n• Shape: {dims[x].shape}\n• Columnas: {dims[x].columns}\n')

• DataFrame: 2020-04-26
• Shape: (474692, 3)
• Columnas: Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')

• DataFrame: 2020-04-19
• Shape: (458543, 3)
• Columnas: Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')

• DataFrame: 2020-04-13
• Shape: (472134, 3)
• Columnas: Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')

• DataFrame: 2020-05-03
• Shape: (397734, 3)
• Columnas: Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')

• DataFrame: 2020-05-18
• Shape: (415104, 3)
• Columnas: Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')

• DataFrame: producto
• Shape: (72038, 7)
• Columnas: Index(['id', 'marca', 'nombre', 'presentacion', 'categoria1', 'categoria2',
       'categoria3'],
      dtype='object')

• DataFrame: sucursal
• Shape: (2333, 12)
• Columnas: Index(['id', 'comercioId', 'banderaId', 'banderaDescripcion',
       'comercioRazonSocial', 'provincia', 'localidad', 'direccion', 'lat',
       'lng', 'sucursal

## Proceso de limpieza de datos
Una vez creadas las tablas podemos empezar el trabajo de limpieza de los datos, comenzando por buscar los registros duplicados y valores faltantes.

In [6]:
# Organizamos las columnas de los dataframes para que coincidan entre sí y facilitar la visualización
cols = ['precio', 'producto_id', 'sucursal_id']
for x in ps_2020:
    ps_2020[x] = ps_2020[x][cols]
    print(f'----------------{x}----------------\n{ps_2020[x].columns}')

----------------2020-04-26----------------
Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')
----------------2020-04-19----------------
Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')
----------------2020-04-13----------------
Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')
----------------2020-05-03----------------
Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')
----------------2020-05-18----------------
Index(['precio', 'producto_id', 'sucursal_id'], dtype='object')


In [37]:
# Usamos un ciclo para obtener la información de todos los DataFrames
for x in ps_2020:
    semana = ps_2020[x]
    print(f'\n-----DataFrame: {x}-----\n\n• Valores nulos:\n{semana.isnull().sum()}\n\n• Registros duplicados:\n{semana.duplicated().value_counts()}\n')


-----DataFrame: 2020-04-26-----

• Valores nulos:
producto_id    9302
sucursal_id       0
precio            0
dtype: int64

• Registros duplicados:
False    474692
dtype: int64


-----DataFrame: 2020-04-19-----

• Valores nulos:
producto_id    0
sucursal_id    0
precio         0
dtype: int64

• Registros duplicados:
False    458543
dtype: int64


-----DataFrame: 2020-04-13-----

• Valores nulos:
producto_id    3
sucursal_id    3
precio         0
dtype: int64

• Registros duplicados:
False    472134
dtype: int64


-----DataFrame: 2020-05-03-----

• Valores nulos:
producto_id    0
sucursal_id    0
precio         0
dtype: int64

• Registros duplicados:
False    397734
dtype: int64


-----DataFrame: 2020-05-18-----

• Valores nulos:
producto_id    3
sucursal_id    3
precio         0
dtype: int64

• Registros duplicados:
False    415104
dtype: int64



In [8]:
# Habiendo detectado registros duplicados procedemos a eliminarlos
for x in ps_2020:
    ps_2020[x].drop_duplicates(inplace=True)

In [31]:
# Ahora podemos generar una visualización preliminar de las tablas para continuar con el proceso de limpieza y poder lidiar con los valores nulos
for x in ps_2020:
    print(f'-----DataFrame: {x}-----\n{ps_2020[x].head(15)}')

-----DataFrame: 2020-04-26-----
     precio  producto_id sucursal_id  precio2
0     399.0       2288.0     2-1-092    399.0
1     299.0       2288.0     2-1-206    299.0
2     399.0       2288.0     2-2-241    399.0
3   49999.0     205870.0     9-1-430  49999.0
4   53999.0     205870.0       9-2-4  53999.0
5   53999.0     205870.0    9-3-5218  53999.0
6   58999.0     205894.0     9-1-430  58999.0
7   18999.0     205955.0     9-1-430  18999.0
8   10499.0     205979.0     9-1-430  10499.0
9    2290.0     206020.0     9-1-430   2290.0
10  27999.0     206044.0     9-1-430  27999.0
11   2190.0     206044.0     9-1-691   2190.0
12  22999.0     206051.0     9-1-430  22999.0
13  45999.0     206105.0     9-1-430  45999.0
14  38999.0     206136.0     9-1-430  38999.0
-----DataFrame: 2020-04-19-----
    precio producto_id          sucursal_id  precio2
0    29.90        2288              2-1-184    29.90
1    39.90        2288              2-1-206    39.90
2   499.99      205870              9-1-4

In [10]:
for x in ps_2020:
    print(ps_2020[x].info())

<class 'pandas.core.frame.DataFrame'>
Int64Index: 474692 entries, 0 to 478908
Data columns (total 3 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   precio       473053 non-null  float64
 1   producto_id  465390 non-null  float64
 2   sucursal_id  474692 non-null  object 
dtypes: float64(2), object(1)
memory usage: 14.5+ MB
None
<class 'pandas.core.frame.DataFrame'>
Int64Index: 458543 entries, 0 to 458542
Data columns (total 3 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   precio       456736 non-null  float64
 1   producto_id  458543 non-null  object 
 2   sucursal_id  458543 non-null  object 
dtypes: float64(1), object(2)
memory usage: 14.0+ MB
None
<class 'pandas.core.frame.DataFrame'>
Int64Index: 472134 entries, 0 to 472165
Data columns (total 3 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   precio       472133 non-null  flo

### • Primeros problemas con el formato de los datos:
Con la anterior visualización preliminar se pudieron ver tres cuestiones a tener en cuenta y que se abordarán a continuación:
1) En uno de los DataFrames (2020-05-03), el tipo de data de la columna 'precio' no es *float64* como en los demás.
2) El formato de la columna 'producto_id' es diferente en varios DataFrames.
3) Algunos valores de la columna 'sucursal_id' de al menos la tabla '2020-04-19' son interpretados en como de tipo 'datetime'.
4) 

#### 1) El tipo de data de la columna 'precio' no es *float64* en todos los DataFrames

In [32]:
# Ahora exploramos un poco los datos de '2020-05-03.precio', columna que no era del tipo 'float64'
precio_dtype = set()
strings = set()
zeros = 0
for x in ps_2020['2020-05-03'].precio:
    precio_dtype.add(str(type(x)))
    if type(x) == str:
        #print(x)
        strings.add(x)
    elif x == 0:
        zeros += 1
print(f'• Tipos de datos que encontramos en la columna precios del DataFrame 2020-05-03:\n{precio_dtype}\n')
print(f'• Valores que encontramos para los strings:\n{strings}\n')
print(f'• Apariciones de 0 en la columna:\n{zeros}')

• Tipos de datos que encontramos en la columna precios del DataFrame 2020-05-03:
{"<class 'str'>", "<class 'float'>", "<class 'int'>"}

• Valores que encontramos para los strings:
{''}

• Apariciones de 0 en la columna:
0


##### • Ya que detectamos que no hay registros en la columna con valor de '0', éste es un buen candidato para remplazar por ahora los valores faltantes de 'precio'

In [27]:
# Procedemos a detectar si en los demás DataFrames hay valores de '0' en la columna precio
has_zeros = {}
for x in ps_2020:
    has_zeros[x] = (0 in ps_2020[x].precio.unique())
for x in has_zeros:
    print(f"DataFrame: {x}\nEstá 0 en la columna 'precio': {has_zeros[x]}\n")

DataFrame: 2020-04-26
Está 0 en la columna 'precio': False

DataFrame: 2020-04-19
Está 0 en la columna 'precio': False

DataFrame: 2020-04-13
Está 0 en la columna 'precio': False

DataFrame: 2020-05-03
Está 0 en la columna 'precio': False

DataFrame: 2020-05-18
Está 0 en la columna 'precio': False



In [29]:
# Dado que ningún DataFrame tiene el dato '0' en la columna 'precio', procedemos a remplazar los valores faltantes con '0'
for x in ps_2020:
    ps_2020[x]['precio2'] = ps_2020[x]['precio'].replace('', np.nan).fillna(0)


In [35]:
# Volvemos a explorar los datos de '2020-05-03.precio', columna que no era del tipo 'float64'
precio_dtype = set()
zeros = 0
for x in ps_2020['2020-05-03'].precio2:
    precio_dtype.add(str(type(x)))
    if x == 0:
        zeros += 1
print(f'• Tipos de datos que encontramos en la columna precios2 del DataFrame 2020-05-03:\n{precio_dtype}\n')
print(f'• Apariciones de 0 en la columna:\n{zeros}')

• Tipos de datos que encontramos en la columna precios2 del DataFrame 2020-05-03:
{"<class 'float'>"}

• Apariciones de 0 en la columna:
2124


Con el anterior test podemos corroborar que la nueva columna ('precio2') ya no tiene valores del tipo *str* así que podemos borrar la columna original y quedarnos con la nueva.

In [36]:
for x in ps_2020:
    ps_2020[x].drop('precio', axis=1, inplace=True)
    ps_2020[x].rename(columns = {'precio2':'precio'}, inplace = True)

#### 2) Unificar el formato de las columnas 'producto_id', dejándolo como un *str*

In [None]:
pass

#### 3) Corregir los valores de las columnas 'sucursal_id' que se hayan remplazado por valores tipo *datetime*

In [None]:
# Verificamos cuales datasets tienen valores del tipo 'datetime' en la columna sucursal_id
datasets_w_datetime = []
for x in ps_2020:
    checker = False
    for y in ps_2020[x].sucursal_id:
        if type(y) == datetime:
            checker = True
    if checker:
        datasets_w_datetime.append(x)
print(datasets_w_datetime)

In [None]:
# Creamos una función que se pueda aplicar a la columna 'sucursal_id' y nos regrese cada valor como un str
def sucursal_id_2str(registro):
    if type(registro) == datetime:
        spl = registro.strftime('%d-%m-%Y').split('-')
        #print(f'{x}:{spl}')
        if spl[0][0] == '0':
            spl[0] = spl[0][1]
        if spl[1][0] == '0':
            spl[1] = spl[1][1]
        return str(f'{spl[0]}-{spl[1]}-{spl[2]}')
    else:
        return str(registro)

In [None]:
# Aplicamos la función a las columnas de 'sucursal_id' y guardamos los valores en una nueva columna 'sucursal_id2'
for x in ps_2020:
    ps_2020[x]['sucursal_id2'] = ps_2020[x].sucursal_id.apply(sucursal_id_2str)

In [None]:
# Verificamos cuales datasets tienen valores del tipo 'datetime' en la columna 'sucursal_id2'
datasets_w_datetime = []
for x in ps_2020:
    checker = False
    for y in ps_2020[x].sucursal_id2:
        if type(y) == datetime:
            checker = True
    if checker:
        datasets_w_datetime.append(x)
print(datasets_w_datetime)

Con el anterior chequeo podemos corroborar que la nueva columna creada ya no tiene valores del tipo 'datetime' así que podemos borrar la columna anterior y quedarnos con la nueva.

In [None]:
for x in ps_2020:
    ps_2020[x].drop('sucursal_id', axis=1, inplace=True)
    ps_2020[x].rename(columns = {'sucursal_id2':'sucursal_id'}, inplace = True)

In [None]:
for x in ps_2020:
    print(ps_2020[x].columns)

In [None]:
for x in ps_2020:
    print(ps_2020[x].info())

In [None]:
for x in ps_2020:
    semana = ps_2020[x]
    print(f'----------------{x}----------------\nShape: {semana.shape}\n\nValores nulos:\n{semana.isnull().sum()}\n\nRegistros duplicados:\n{semana.duplicated().value_counts()}')