# Prueba de ingeniería de datos

***
#### 1. Descarga la carpeta comprimida que contiene los datos y déjalos en una carpeta exclusiva para este reto que vas a realizar.✔️
##### <font color='green'>Solución:</font>
<font color='green'>Archivos descargados en la ruta `"./batched_files"`</font>

***
#### 2. Construye un pipeline que sea capaz de:✔️
> a) Cargar todos los archivos .CSV excepto el llamado “validation.csv” (El pipeline no debe contener todo el conjunto de datos, es decir, los 5 archivos .CSV al mismo tiempo en memoria en cualquier momento).
> b) Almacena los datos de los archivos .CSV, en una base de datos de tu elección (ejemplo: PostgreSQL, MySQL, etc). El diseño de esta base de datos dependerá de ti, crea la tabla o tablas que creas necesarias con el esquema que creas es adecuado, pero ten presente que todos los .CSV deben ir en la misma base de datos.
> c) A medida que los datos son cargados, realiza un seguimiento de:
>> - Recuento (Count) de filas cargadas a la base de datos.
>> - Valor medio, mínimo y máximo del campo “price”.
##### <font color='green'>Solución:</font>

<font color='green'>i. Identificar archivos de batches y ordenarlos</font>

In [10]:
import pandas as pd
import os

batch_folder = "./batched_files/"
list_of_files = os.listdir(batch_folder)
list_of_files.sort()
list_of_files.remove('validation.csv')
list_of_files

['2012-1.csv', '2012-2.csv', '2012-3.csv', '2012-4.csv', '2012-5.csv']

<font color='green'>ii. Conectarse a la base de datos</font>

In [11]:
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
# Define the database connection parameters
db_host = "localhost"
db_user = "pragma"
db_password = "password"
db_name = "pragma"
# Create a SQLAlchemy engine
connection_string = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}"
engine = create_engine(connection_string)

<font color='green'>iii. Para cada fila del batch, leer cada registro, actualizar en la base de datos</font>

In [12]:
import datetime as datetime
import numpy as np
from typing import Tuple

def extract_batch(batch_file_path: str) -> pd.DataFrame:
    """
    Reads the csv batch file into a pandas DataFrame.
    ----

    ## Parameters:
    
    batch_file_path: contains the path of the batch file

    """
    batch = pd.read_csv(batch_file_path,parse_dates=[0] ,date_format="M/DD/YYYY")
    batch["date"]=pd.to_datetime(batch["timestamp"])
    batch = batch.drop("timestamp", axis=1)
    return batch

def load_batch(
        batch: pd.DataFrame,
        engine:sqlalchemy.engine,
        price_max: float,
        price_min: float,
        price_mean: float,
        price_count: int,
        row_count: int,
        print_stats: bool = True,
    ) -> Tuple[float, float, float, int, int]:
    for __, row in batch.iterrows():
        price = float(row["price"])
        if not np.isnan(price):
            price_max = max(price_max,price)
            price_min = min(price_min,price)
            price_mean = (price_mean*price_count + price)/(price_count+1)
            price_count = price_count+1
        row_count +=1
        pd.DataFrame(row).T.to_sql(name="purchases",con=engine,if_exists="append",index=False)
        if print_stats:
            print("row_count: ", row_count, end=" ")
            print("price_max: ", price_max, end=" ")
            print("price_min: ", price_min, end=" ")
            print("price_mean: ", price_mean)
    return(price_max, price_min, price_mean, price_count, row_count)

In [13]:
# inicializar estadísticos:
price_max = -np.inf
price_mean = 0.
price_min = np.inf
price_count = 0
row_count = 0

# populate the database
for file_path in list_of_files:
    batch_file_path = f"{batch_folder}{file_path}"
    batch = extract_batch(batch_file_path)
    price_max, price_min, price_mean, price_count, row_count = load_batch(
        batch, engine, price_max, price_min, price_mean, price_count, row_count)


row_count:  1 price_max:  50.0 price_min:  50.0 price_mean:  50.0
row_count:  2 price_max:  87.0 price_min:  50.0 price_mean:  68.5
row_count:  3 price_max:  87.0 price_min:  50.0 price_mean:  67.0
row_count:  4 price_max:  87.0 price_min:  20.0 price_mean:  55.25
row_count:  5 price_max:  87.0 price_min:  14.0 price_mean:  47.0
row_count:  6 price_max:  95.0 price_min:  14.0 price_mean:  55.0
row_count:  7 price_max:  95.0 price_min:  14.0 price_mean:  60.714285714285715
row_count:  8 price_max:  95.0 price_min:  14.0 price_mean:  60.875
row_count:  9 price_max:  95.0 price_min:  14.0 price_mean:  59.22222222222222
row_count:  10 price_max:  97.0 price_min:  14.0 price_mean:  63.0
row_count:  11 price_max:  97.0 price_min:  14.0 price_mean:  60.09090909090909
row_count:  12 price_max:  97.0 price_min:  14.0 price_mean:  60.09090909090909
row_count:  13 price_max:  97.0 price_min:  14.0 price_mean:  58.166666666666664
row_count:  14 price_max:  97.0 price_min:  14.0 price_mean:  59.384

***
#### 3. Comprobación de resultados:✔️
Imprime el valor actual de las estadísticas en ejecución. Idealmente después de la inserción de cada
fila.
> - Realiza una consulta en la base de datos del: recuento total de filas, valor promedio, valor mínimo y valor máximo para el campo “price”.
> - Ejecuta el archivo “validation.csv” a través de todo el pipeline y muestra el valor de las estadísticas en ejecución.
> - Realice una nueva consulta en la base de datos después de cargar “validation.csv”, para observar cómo cambiaron los valores del: recuento total de filas, valor promedio, valor mínimo y valor máximo para el campo “price”.

##### <font color='green'>Solución:</font>

<font color='green'>consulta a la base de datos previa a la validación</font>

In [14]:
price_stats = pd.read_sql("SELECT COUNT(id) AS count, MAX(price) AS max, MIN(price) AS min, AVG(price) as mean FROM purchases", con=engine)
price_stats

Unnamed: 0,count,max,min,mean
0,143,100.0,10.0,57.88489


<font color='green'>Correr el pipeline para la validación</font>

In [15]:
batch_file_path = f"{batch_folder}validation.csv"
batch = extract_batch(batch_file_path)
price_max, price_min, price_mean, price_count, row_count = load_batch(
    batch, engine, price_max, price_min, price_mean, price_count, row_count)

# close the database connection
engine.dispose()

row_count:  144 price_max:  100.0 price_min:  10.0 price_mean:  57.657142857142865
row_count:  145 price_max:  100.0 price_min:  10.0 price_mean:  57.32624113475178
row_count:  146 price_max:  100.0 price_min:  10.0 price_mean:  57.01408450704226
row_count:  147 price_max:  100.0 price_min:  10.0 price_mean:  57.25874125874127
row_count:  148 price_max:  100.0 price_min:  10.0 price_mean:  57.076388888888886
row_count:  149 price_max:  100.0 price_min:  10.0 price_mean:  57.110344827586204
row_count:  150 price_max:  100.0 price_min:  10.0 price_mean:  56.80821917808219
row_count:  151 price_max:  100.0 price_min:  10.0 price_mean:  57.006802721088434


In [17]:
price_stats = pd.read_sql("SELECT COUNT(id) AS count, MAX(price) AS max, MIN(price) AS min, AVG(price) as mean FROM purchases", con=engine)
price_stats

Unnamed: 0,count,max,min,mean
0,151,100.0,10.0,57.0068
