# ETL

In [43]:
#Importamos los paquete necesarios:
import os
import pandas as pd
import glob ##New package learned
import shutil ##New package learned   
from sqlalchemy import create_engine

In [44]:
#Importamos y configuramos el modulo de logging
import logging

logging.basicConfig(
    level= logging.INFO,
    format= '%(asctime)s - %(message)s',
    filename= 'LOGGINGprocess_ETL.log',
    filemode= 'a'
)

# Extract:


### Extracción dependiendo del tipo de formato

In [45]:
def extractANDclean_CSV(file_csv):
    '''Transformamos un CSV en un datafram y lo renombramos como documento procesado
    
    IN: archivo CSV
    OUT: DataFrame
    '''
    logging.info(f"Documento {file_csv} procesado")

    return pd.read_csv(file_csv)

def extractANDclean_JSON(file_json):
    '''Transformamos un JSON en un datafram y lo renombramos como documento procesado

    IN: archivo JSON
    OUT: DataFrame
    '''
    logging.info(f"Documento {file_json} procesado")

    return pd.read_json(file_json, lines=True) ###REMARK

def extractANDclean_TXT(file_txt):
    '''Transformamos un TXT en un datafram y lo renombramos como documento procesado

    IN: archivo TXT
    OUT: DataFrame
    '''
    logging.info(f"Documento {file_txt} procesado")

    return pd.read_csv(file_txt,sep='|') ###REMARK

def extractANDclean_XLSX(file_xlsx):
    '''Transformamos un XLSX en un datafram y lo renombramos como documento procesado

    IN: archivo XLSX
    OUT: DataFrame
    '''
    logging.info(f"Documento {file_xlsx} proceado")

    return pd.read_excel(file_xlsx)

### Extracción principal de datos:

In [46]:

def extract():
    '''Función para la extracción de datos des de los diferentes tipos de documentos

    IN: None
    OUT: DataFrame total, Documentos renombrados y desplazados a la carpeta 'processed_data'
    '''
    logging.info(f"Se inicia la extraccion...")

    ######
    #Generamos y damos las rutas necesarias para poder extraer y almacenar los datasets:
    ######
    dirty_data = 'dirty_data/*' #Datos sucios


    if os.path.exists('processed_data'): #Si el directorio existe loggeamos la información:
        logging.info(f"Directorio 'processed_data' existente")
    else:
        logging.info(f"Creamos el directorio 'processed_data'")
        os.makedirs("processed_data", exist_ok=True)#Generamos nuevo directorio para datos limpios


    #Lista para almacenar los diferentes dataframes extraidos de los diferentes formatos:
    list_of_dfs = []

    #######
    #Extraemos la información de cada formato:
    #######
    for doc in glob.glob(dirty_data): ###REMARK
        
        try:
            if doc[-4:] == ".csv": list_of_dfs.append(extractANDclean_CSV(doc))
            elif doc[-5:] == ".json": list_of_dfs.append(extractANDclean_JSON(doc))
            elif doc[-5:] == ".xlsx": list_of_dfs.append(extractANDclean_XLSX(doc))
                #Presuponemos que el separador para los TXT será "|":
            elif doc[-4:] == ".txt": list_of_dfs.append(extractANDclean_TXT(doc))
            
        
        #Levantamos un error si el formato no existe:
        except:
            logging.error(f"ERROR: Formato del documento {doc} desconocido")
            raise TypeError(f"Documento con formato no contemplado: {doc}")

        
        ######
        #Desplazar el datasets utilizado:
        ######
        logging.info(f"Movemos a la ruta de 'processed_data' al docuemnto {doc[11:]}")
        os.rename(doc, 'processed_data/processed_'+doc[11:])
        

    #Concatenamos todos los df con todos los registros de los diferentes documentos:
    return pd.concat(list_of_dfs, ignore_index=True) ###REMARK

# Transform:

In [None]:
def transform(dataframe_total):
    '''Función encargada de transformar, limpiar y depurar el DataFrame total generado con la función extract.

    IN: DataFrame
    OUT: DataFrame
    '''
    #####--- HAY QUE REPLANTEAR ESTA PARTE CUANDO SE HAYA CORREGIDO ---#####
    df_2_clean = pd.read_csv(dataframe_total)
    ########################################################################


    #Columnas: id_venta[int], fecha[str, datetime], cliente[str], producto[str], precio[flo], cantidad[flo]

    #La columna 'id_venta', no corresponde al indice total sino al indice de su respectivo dataset de procedencia.
    #Corregimos el 'id_venta' al indice total de la suma de datasets:

    #Limpiamos de 'NaN' y 'ERROR' las columnas fecha, cliente, producto, precio, cantidad:

    ######
    #Normalizamos la columna fecha[str, datetime]:
    ######

    ######
    #Noramlizamos la columna cliente[str, capitalized]:
    ######


    ######
    #Normalizamos la columna producto[str, capitalized]:
    ######

    ######
    #Normalizamos la columna precio[flo]:
    ######

    ######
    #Normalizamos la columna cantidad[flo]:
    ######

# Ejecución:

In [54]:
#total_df = extract()

cleaned_df = transform('to_B_cleaned.csv')#cleaned_df = transform(total_df)

   id_venta       fecha   cliente producto precio  cantidad
0         1  2023.09.26   hicham     Mouse    -10       NaN
1         2  2023.12.27     MARTA  Teclado  ERROR       NaN
2         3  2023-05-05      luis   Cables   1200       5.0
3         4         NaN    PeDro   Teclado  ERROR       5.0
4         5  2023.08.15     sofia    Mouse   1200       2.0
