## Importar librerias

In [35]:
import pandas as pd
import os
import glob
from datetime import datetime
from sqlalchemy import create_engine

## Importar archivos (Extract)

In [2]:
## Creamos una lista con los directorios donde se encuentran los archivos de precios que constituyen nuestra tabla de hechos
path = os.getcwd() + "\\Dataset\\"
files = glob.glob(path + "/*precios*")

#un solo archivo
#new=glob.glob(path + "/precios_semana_20200413.csv")#para la carga incremental


### Lectura de archivos generalizada

In [3]:
def read_file(path): 

    for i in path:
        ext = os.path.splitext(i)[-1].lower() #deja solo las extensiones
        if ext=='.xlsx':
            xls = pd.ExcelFile(i)
            sheets=xls.sheet_names
            if len(sheets)==1:
                df=pd.read_excel(i)
                return df
            elif len(sheets)==2:
                df1=pd.read_excel(i,sheet_name=0).assign(semana=sheets[0])
                df2=pd.read_excel(i,sheet_name=1).assign(semana=sheets[1])
                return df1, df2
            else:
                print('Archivo Excel con mas de dos sheets')
        elif ext=='.csv' or ext=='.txt':
            df=pd.read_csv(i,sep=',',encoding='utf-16').assign(semana=os.path.basename(i).split('.')[0])
            return df
        elif ext=='.json':
            df=pd.read_json(i).assign(semana=os.path.basename(i).split('.')[0])
            return df
        elif ext=='.parquet':
            df=pd.read_parquet(i)
            return df
        else:
            print("I just can't")

### Lectura de archivos

Solo archivos de *precio* (Tabla de hechos)

In [4]:
df_precios = pd.read_excel(files[0], dtype=object ,sheet_name=1)
df_precios1 = pd.read_excel(files[0], sheet_name=0)
df_precios2 = pd.read_csv(files[1],sep=',',encoding='utf-16')
df_precios3 = pd.read_json(files[2])
df_delta = pd.read_csv(files[-1], sep='|')

Tabla *sucursal*

In [7]:
sucu_file = glob.glob(path + "/*sucursal*")
sucursal = pd.read_csv(sucu_file[0])
sucursal.sample(3)

Unnamed: 0,id,comercioId,banderaId,banderaDescripcion,comercioRazonSocial,provincia,localidad,direccion,lat,lng,sucursalNombre,sucursalTipo
2099,9-1-232,9,1,Vea,Jumbo Retail Argentina S.A.,AR-X,VILLA MARIA,Boulevard Velez Sarsfield 411,-32.4086,-63.2565,Villa María II.,Supermercado
1186,15-1-280,15,1,Supermercados DIA,DIA Argentina S.A,AR-B,San Nicolas,Cl Pellegrini 328,-33.334603,-60.218897,280 - San Nicolas,Autoservicio
517,10-3-674,10,3,Express,INC S.A.,AR-C,Ciudad Autónoma de Buenos Aires,Virrey Del Pino 2527,-34.567072,-58.454065,Virrey del Pino 2527,Autoservicio


Tabla de *producto* y ETL para esta tabla

Disculpas! No es lo ideal tener un ETL de tablas separados. Esto es solo por cuestiones de tiempo :D

Esto se debe incluir en la seccion Transformaciones donde se encuentran todas las funciones de transformacion para todas las tablas :)

In [6]:
#Lectura de archivo
path1=glob.glob(path + "/producto.parquet")
producto=read_file(path1) #con funcion read_file definida arriba
producto.sample(3)

Unnamed: 0,id,marca,nombre,presentacion,categoria1,categoria2,categoria3
40710,7792350001952,INALPA,Jardinera Inalpa 3 Kg,3.0 kg,,,
42246,7792850000691,CHUKER,Edulcorante Liquido Chuker 500 Cc,500.0 cc,,,
59101,7798107413641,ETNIA,Vino Tinto Roble Merlot Etnia 750 Ml,750.0 ml,,,


In [8]:
#Veamos los nulos
producto.isna().sum()

id                  0
marca               2
nombre              2
presentacion        2
categoria1      72034
categoria2      72034
categoria3      72034
dtype: int64

In [10]:
#Se eliminaran las columnas categoria ya que no aportan informacion relevante
producto.drop(columns=['categoria1','categoria2','categoria3'], inplace=True)
producto.head(1)

Unnamed: 0,id,marca,nombre,presentacion
0,1663,LA ANÓNIMA,Radicheta Atada La Anonima 1 Un,1.0 un


In [11]:
#Funcion para convertir de la columna producto.id los strings y datos tipo float a enteros
def id_change(x):
    if isinstance(x,str):
        x = x.split('-')[-1]
    elif isinstance(x,float):
        x = int(x)
    else:    
        x=x
    return int(x)

producto.id = producto.id.apply(id_change) #Aplicando la funcion

producto.drop_duplicates(subset=['id'],inplace=True) #eliminando duplicados en la columna producto.id ya que seran PK

## Transformacion



-  La funcion unDate(x) trabajaba a nivel Serie y originalmente estaba pensada para ser usada en un .apply(), por eso no estaba funcionando. Para corregirlo (no estoy seguro de que sea la mejor manera), la renombre a unDateSeries(x) y cree una unDate(df) que recibe un df y aplica unDateSeries a la serie Sucursal_id.  
-  La typeCheck() para la parte de producto no funcionaba porque trataba de aplicar la funcion en forma vectorizaa sin tener en cuenta los diferentes casos dentro de una misma columna de manera correcta. Ya modifique y quedó funcionando todo.
**A revisar**  
La carga de archivos automatizada no contempla el hecho de que la segunda hoja del sheet tiene datos anteriores a la primera hoja (estan invertidos)

In [12]:
def unDate(df):
    
    def unDateSeries(x):
        """read_excel me trae algunas sucursal_id como si fueran datetime. Esta funcion toma elementos de una serie, 
           chequea si son datetime y en caso de serlo, los convierte a str con el formato acorde al orden de identificadores de sucursal"""
        if isinstance(x,datetime):
            x=x.strftime("%#d-%#m-%Y")
        return x

    df.sucursal_id = df.sucursal_id.apply(unDateSeries)
    return df 

def nasDups(df):
    """Cosas básicas: remover duplicados y nulos"""
    df = df.copy()
    df = df.dropna()
    df = df.drop_duplicates()
    return df

def typeCheck(df):
    """En base a la exploracion inicial, se detectaron errores comunes en los diferentes campos. 
       Esta funcion soluciona todos los problemas encontrados en los diferentes archivos de la fuente de datos"""

    if df.precio.dtype != "float":
        """Algunos precios llegan con un string vacio en vez de nulo. Los convertimos a None para removerlos despues. 
           A los que tienen valores, los convertimos a float"""
        try:
            df.precio = df.precio.apply(lambda x: None if x == '' else x)
            df.precio = df.precio.astype(float)
        except:
            print("Error type precios")

    if df.sucursal_id.dtype != "str":
        try:
            df.sucursal_id = df.sucursal_id.astype('string')
        except:
            print('Error type sucursal_id')
            
    def prod(x):
        if isinstance(x,str):
            x = x.split('-')[-1]
        elif isinstance(x,float):
            x = int(x)
        else:    
            x=x
        return int(x)
    df.producto_id = df.producto_id.apply(prod)

    return df

def splitProd(df):
    """Algunos productos tenian hardcodeada la sucursal. Removemos esa porcion del string y nos quedamos 
       unicamente con el codigo de producto (12-13 char)"""
    df.producto_id = df.producto_id.str.split('-').str[-1]
    
    return df


En la tabla sucursal y producto se debe tener especial atencion en el tipo de dato, pues postgres puede complejizar la ingest

In [13]:
def etl(df):
    df = nasDups(df)
    df = unDate(df)
    df = typeCheck(df)
    #df = splitProd(df)
    return df

def etl_sucu(sucursal):
    sucursal.id=sucursal.id.astype('string')    
    sucursal.comercioId = sucursal.comercioId.astype(int)
    sucursal.banderaId = sucursal.banderaId.astype(int)
    sucursal.banderaDescripcion = sucursal.banderaDescripcion.astype('string')
    sucursal.comercioRazonSocial = sucursal.comercioRazonSocial.astype('string')
    sucursal.provincia = sucursal.provincia.astype('string')
    sucursal.localidad = sucursal.localidad.astype('string')
    sucursal.direccion = sucursal.direccion.astype('string')
    sucursal.sucursalNombre = sucursal.sucursalNombre.astype('string')
    sucursal.sucursalTipo = sucursal.sucursalTipo.astype('string')
    sucursal.columns = sucursal.columns.str.lower()
    return sucursal

def etl_prod(producto):
    
    producto=producto.astype({'marca':str,'nombre':str,'presentacion':str})
    return producto

## Aplicación de transformación a Dataframes

In [14]:
df_precios = etl(df_precios)
df_precios1 = etl(df_precios1)
df_precios2 = etl(df_precios2)
df_precios3 = etl(df_precios3)
df_delta = etl(df_delta)
sucursal = etl_sucu(sucursal)
producto=etl_prod(producto)

### Carga (Load - Postgresql)

Paso inicial de creacion de tablas.
Una vez creadas por primera vez ya no se realiza este paso pues se realizaran las cargas incrementales

In [25]:
# import psycopg2
# tabla_precio = 'DROP TABLE IF EXISTS Precios CASCADE; CREATE TABLE Precios (precio float(10), producto_id bigint, sucursal_id varchar(10), FOREIGN KEY(sucursal_id) REFERENCES Sucursal(id), FOREIGN KEY(producto_id) REFERENCES Producto(id));'
# tabla_producto = 'DROP TABLE IF EXISTS Producto CASCADE; CREATE TABLE Producto (id bigint NOT NULL, marca text, nombre text, presentacion varchar(10), PRIMARY KEY(id));'
# tabla_sucursal = 'DROP TABLE IF EXISTS Sucursal CASCADE; CREATE TABLE Sucursal (id varchar(12) NOT NULL, comercioid int, banderaid int, banderadescripcion text, comerciorazonsocial text, provincia text, localidad text, direccion text, lat varchar(20), lng varchar(20), sucursalnombre text, sucursaltipo text, PRIMARY KEY(id))'

# conn = psycopg2.connect(
#     host='localhost',
#    user='admin',
#    password='admin1234',
#    database='productDb',
#    port='5432'
# )

# cur = conn.cursor()

# cur.execute(tabla_producto)
# cur.execute(tabla_sucursal)
# cur.execute(tabla_precio)

# conn.commit()
# cur.close()

Carga sucursal

In [36]:
from sqlalchemy import create_engine


cone = create_engine('postgresql://admin:admin1234@localhost:5432/productDb', pool_size=50, max_overflow=0)

sucursal.to_sql(name='sucursal',con=cone, if_exists='append', index=False)
print('La carga se ha hecho con exito!')

La carga se ha hecho con exito!


Carga de producto

In [37]:
producto.to_sql(name='producto',con=cone, if_exists='append', index=False)
print('La carga se ha hecho con exito!')

La carga se ha hecho con exito!


Carga de precios

In [38]:
df_precios.to_sql(name='precios',con=cone, if_exists='append', index=False)
print('La carga se ha hecho con exito!')

La carga se ha hecho con exito!


In [29]:
df_precios1.to_sql(name='precios', con=cone, if_exists='append', index=False)

753

In [39]:
df_precios2.to_sql(name='precios', con=cone, if_exists='append', index=False)

131

In [40]:
df_precios3.to_sql(name='precios', con=cone, if_exists='append', index=False)

734

In [None]:
tabla_producto = 'ALTER TABLE producto ADD PRIMARY KEY(id);'
tabla_sucursal = 'ALTER TABLE sucursal ADD PRIMARY KEY(id);'

tabla_precio_2 = 'ALTER TABLE precios ADD FOREIGN KEY (sucursal_id) REFERENCES sucursal (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION NOT VALID ;'
tabla_precio_3 = 'ALTER TABLE precios ADD FOREIGN KEY (producto_id) REFERENCES producto (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION NOT VALID;'


Carga incremental de prueba!

In [42]:
df_delta.to_sql(name='precios', con=cone, if_exists='append', index=False)

145

HOLI NO SE ESTO PARA QUE ES, HASTA EL PUNTO ANTERIOR TODO YA ANDUVO!


In [43]:
import psycopg2

conn = psycopg2.connect(
    host='localhost',
   user='admin',
   password='admin1234',
   database='productDb',
   port='5432'
)

cur = conn.cursor()
cur.execute(tabla_producto)
cur.execute(tabla_sucursal)
cur.execute(tabla_precio_2)
cur.execute(tabla_precio_3)
conn.commit()
cur.close()