**Daniel Carvajal Correa** 🚀🎑

# PASO 1: Descarga de archivos

En la siguiente celda se encuentra el código para descargar el archivo .zip que contiene los archivos .csv para realizar la prueba. La descarga se realiza a partir del ID que se extrae del URL proporcionado en la guía.

Durante su implementación me encontré con problemas como:

- El nombre del archivo .zip presenta espacios o caracteres tales como paréntesis que dificultan su manipulación. Para solucionarlo, implementé una función llamada clean_text que se encarga de eliminar este tipo de caracteres en el nombre del archivo y luego realizar su renombramiento.

- Al ejecutar unzip cuando la celda ya se había ejecutado una vez, se realizaba la petición de renombramiento de archivos nuevamente. Para evitar esto, agregué un if que verifica si la carpeta ya ha sido extraída anteriormente.

In [None]:
import gdown
import os
import re

#El id se extrae de la URL enviada en la hoja del test 'https://drive.google.com/file/d/1ejZpGTvZa81ZGD7IRWjObFeVuYbsSvuB/view?usp=sharing'
id_file = '1ejZpGTvZa81ZGD7IRWjObFeVuYbsSvuB'
!gdown {id_file}

def clean_text(text2clean:str) -> str:
  text2clean = text2clean.replace(" ", "")
  text2clean = re.sub(r'\([^)]*\)', '', text2clean)
  return text2clean

files = os.listdir('/content')
for filename in files:
  if filename.endswith('.zip'):
    new_filename = clean_text(filename)
    os.rename(filename, new_filename)
    folder_name = new_filename.rsplit('.',1)[0]
    if not os.path.exists(folder_name):
      !unzip {new_filename}

# PASO 2: Creación de la base de datos usando SQLite
En las siguientes celdas se hace la creación de la estructura que llevará la base de datos SQLite. Para ello utilicé dos tablas, en una estarán los datos en crudo y en la otra estará la parte de estadísticas analizadas por cada INSERT que se haga.

In [None]:
import sqlite3
import pandas as pd
from datetime import datetime
db_con = sqlite3.connect('test_database.db')

In [None]:
#@markdown Esta celda crea la tabla donde se guardara la información de los csvs. 
with db_con:
  db_con.execute("""
    CREATE TABLE IF NOT EXISTS data (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      timestamp DATE,
      price REAL,
      user_id INTEGER
    )
  """)

In [None]:
#@markdown Esta celda crea la tabla donde se guardara la estadistica de los datos. 
with db_con:
  db_con.execute("""
    CREATE TABLE IF NOT EXISTS stats (
      element_count INTEGER PRIMARY KEY AUTOINCREMENT,
      summatory REAL,
      average REAL,
      max_value REAL,
      min_value REAL,
      FOREIGN KEY (element_count) REFERENCES data (id)
    )
  """)

In [None]:
#@markdown Guardar cambios
db_con.commit()

# PASO 3: Carga de datos

Para preparar la carga de datos, considere crear un patrón usando expresión regular que me permitiera identificar los archivos de tipo: 2012-1.csv y excluir el validation.csv. Como en la guía dice que puedo asumir que los archivos están correctamente ordenados por fecha, con files.sort() acomodo el orden de los archivos y los subo uno por uno.

La función load_csv2db utiliza la declaración 'with open' para abrir el archivo CSV y asegurarse de cerrarlo una vez termine de cargar el CSV. Para leer los datos utilicé pandas y la función read_csv, le ajusté el parámetro de chunksize en 1 para ir cargando línea por línea a la base de datos. Esta función también se apoya en una verificación utilizando booleanos para verificar si el registro en la base de datos fue exitoso. En caso de no ser así, se realiza un rollback.

La función insert_data recibe una línea de pandas y la inserta al final en la base de datos. Si existe algún error, devuelve False para realizar un rollback en la base de datos.

La función update_stats recibe una línea de pandas y recupera la última línea almacenada en la tabla "stats" de la base de datos. Si no se encuentra ninguna línea, se inserta la línea sin realizar ninguna verificación. En caso de existir previamente una línea, se realiza el cálculo del promedio utilizando la columna "sumatoria" y la columna que tiene el valor de cuántos datos hay almacenados en ese momento. Se verifica si hay alguna novedad en las columnas de "max" y "min" value, y se inserta la línea en la tabla. Si ocurre algún error, devuelve False para realizar un rollback en la base de datos.

La función stats2float devuelve las variables en un tipo más manipulable que un DataFrame de pandas.

In [None]:
def load_csv2db(pathfile:str) -> None:
  with open(pathfile) as data: 
      for minibatch in pd.read_csv(data, chunksize=1):
        if minibatch.isnull().values.any():
          print('This row will be ignored, there is a Nan value')
          print(minibatch)
        else:
          verify_data = insert_data(minibatch)
          verify_stats = update_stats(minibatch)
          if verify_data and verify_stats:
            db_con.commit()
          else:
            db_con.rollback()

def insert_data(data_rows) -> bool:
  try:
    data_rows.to_sql('data', db_con, if_exists='append', index=False, index_label=id)
    return True
  except Exception as e:
    print(f'An error ocurred: {e}')
    return False

def update_stats(data_rows) -> bool:
  try:
    last_row = pd.read_sql_query("SELECT * FROM stats ORDER BY element_count DESC LIMIT 1", db_con)
    insert_stats_query = "INSERT INTO stats (summatory, average, max_value, min_value) VALUES (?, ?, ?, ?)"
    price = float(data_rows['price'])
    if last_row.empty:
      with db_con:
        values = (price, price, price, price)
        db_con.execute(insert_stats_query, values)
        print(f'el recuento es:1, el promedio es:{price}, el valor maximo es:{price}, el valor minimo es:{price}') #Este print hace parte de la comprobacion de resultados
    else:
      element_count, summatory, max_value, min_value = stats2float(last_row)
      summatory += price
      average = summatory/(element_count + 1)   #el promedio se hace linea por linea utilizando una sumatoria y el numero de elementos guardados en la ultima linea de la tabla stats
      if price > max_value:
        max_value = price
      if price < min_value: 
        min_value = price
      values = (summatory, average, max_value, min_value)
      db_con.execute(insert_stats_query, values)
      print(f'el recuento es:{element_count + 1}, el promedio es:{average}, el valor maximo es:{max_value}, el valor minimo es:{min_value}') #Este print hace parte de la comprobacion de resultados
      return True
  except Exception as e:
    print(f'An error ocurred: {e}')
    return False
    

def stats2float(data_row):
  element_count = int(data_row['element_count'])
  summatory = float(data_row['summatory'])
  max_value = float(data_row['max_value'])
  min_value = float(data_row['min_value'])
  return element_count, summatory, max_value, min_value


In [None]:
csv_filename_pattern = re.compile(r'[0-9]+-[0-9]+\.[A-Za-z]+', re.IGNORECASE)
path2files = f'/content/{folder_name}'
files = os.listdir(path2files)
files.sort()
for file in files:
  if csv_filename_pattern.match(file):
    load_csv2db(f'{path2files}/{file}')

    

# COMPROBACIÓN DE RESULTADOS:

**Imprime el valor actual de las estadísticas en ejecución.**
El primer item de la comprobacion de resultados se muestra con un print en la celda anterior. En donde a medida que va integrando lineas a la base de datos va haciendo un recuento de las estadisticas requeridas.

In [None]:
#@markdown Realiza una consulta en la base de datos del: recuento total de filas, valor promedio, valor mínimo y valor máximo para el campo “price”.
with db_con:
  verification_query = db_con.execute("SELECT COUNT(price), AVG(price), MAX(price), MIN(price) FROM data")
  results = verification_query.fetchone()
  print(f'el recuento es:{results[0]}, el promedio es:{results[1]}, el valor maximo es:{results[2]}, el valor minimo es:{results[3]}') 



In [None]:
#@markdown Ejecuta el archivo “validation.csv” a través de todo el pipeline y muestra el valor de las estadísticas en ejecución.

load_csv2db('/content/dataPruebaDataEngineer/validation.csv')

In [None]:
#@markdown Realice una nueva consulta en la base de datos después de cargar “validation.csv”, para observar cómo cambiaron los valores del: recuento total de filas, valor promedio, valor mínimo y valor máximo para el campo “price”.
with db_con:
  verification_query = db_con.execute("SELECT COUNT(price), AVG(price), MAX(price), MIN(price) FROM data")
  results = verification_query.fetchone()
  print(f'el recuento es:{results[0]}, el promedio es:{results[1]}, el valor maximo es:{results[2]}, el valor minimo es:{results[3]}') 

# VISUALIZACIÓN DE LOS DATOS

In [None]:
import matplotlib.pyplot as plt
stats_frame = pd.read_sql_query("SELECT * FROM stats", db_con)
data_frame = pd.read_sql_query("SELECT * FROM data", db_con)

fig, ax = plt.subplots()
ax.plot(stats_frame['element_count'], stats_frame['average'], label='promedio')
ax.plot(stats_frame['element_count'], stats_frame['max_value'], label='valor maximo')
ax.plot(stats_frame['element_count'], stats_frame['min_value'], label='valor minimo')
ax.scatter(stats_frame['element_count'], data_frame['price'], color='red', marker='o', s=10, label='precio de cada elemento')

ax.set_xlabel('Numero de elementos agregados')
ax.set_ylabel('Precio')
ax.legend(loc='lower right')
plt.title('Grafica de promedio, maximo valor y minimo valor con relacion a cada elementro ingresado')
plt.show

# CELDAS DE UTILIDADES
las celdas de este apartado se crearon para realizar pruebas durante la creacion de este notebook, ignorar en su ejecucion.

In [None]:
#celda creada para la visualizacion de la db facilmente
query = 'SELECT * FROM data'
df = pd.read_sql_query(query, db_con)
print(df)

query = 'SELECT * FROM stats'
df = pd.read_sql_query(query, db_con)
print(df)

In [None]:
#Celda creada para eliminar las tablas y hacer pruebas de nuevo
with db_con:
  db_con.execute("DROP TABLE IF EXISTS data")
  db_con.execute("DROP TABLE IF EXISTS stats")
db_con.commit()

In [None]:
#Celda para exportar tablas a csv
import csv
cursor = db_con.cursor()
cursor.execute("SELECT * FROM stats")

rows = cursor.fetchall()

csv_file = "output.csv"

with open(csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow([i[0] for i in cursor.description])
    writer.writerows(rows)