# ETL para adquisicion de datos

Este script realiza la limpieza de datos provenientes de un archivo CSV para luego insertarlos en una base de datos. Para garantizar el correcto funcionamiento, es crucial ejecutar todas las celdas en serie, con una excepción en la segunda que debe ser ignorada si no se planea ejecutar.

A continuación, se detallan las bibliotecas necesarias y las instrucciones para su instalación:

`numpy`: Instálalo usando el comando 'pip/conda install numpy' en tu terminal o consola.

`pandas`: Instálalo usando el comando 'pip/conda install pandas' en tu terminal o consola.

`mysql`.connector: Para instalarlo con pip, usa el comando 'pip install mysql-connector-python'.

`datetime`: Esta biblioteca se instala automáticamente con Python.

En este primer bloque se carga el archivo CSV. La dirección del archivo, incluyendo su ubicación relativa a la carpeta, debe ser especificada entre comillas simples, por ejemplo: 'Subcarpeta/archivo.csv'. Además, el separador de campos debe ser definido en el argumento sep, eligiendo entre comas (`,`), tabulaciones (`   `), punto y coma (`;`) o cualquier otro separador.

Para manejar los datos faltantes, se emplea la cadena 'No reporta' como valor de relleno.

In [1]:
import pandas as pd 
import numpy as np
from datetime import datetime
Df=pd.read_csv('OneDrive_1_3-7-2024/Casos_positivos_de_COVID-19-Cund-Boy.csv', sep=';').fillna('No reporta')
Df.head(2)

Unnamed: 0,id_case,id_municipality,name_municipality,id_department,name_department,age,gender,type_contagion,status,date_symptom,date_death,date_diagnosis,date_recovery
0,489982,25,CUNDINAMARCA,25899,ZIPAQUIRA,22,F,Comunitaria,Recuperado,2/08/2020,No reporta,18/08/2020,20/12/2020
1,5932109,25,CUNDINAMARCA,25758,SOPO,50,M,Comunitaria,Recuperado,18/01/2022,No reporta,2/02/2022,5/02/2022


## En caso de desear añadir datos provenientes de más de un archivo, se debe utilizar la siguiente casilla. De lo contrario, saltar directamente a la tercera.
Para esto usar los Dataframes necesarios (D2,D3,....DN), especificando la ruta como se hizo en la primera casilla, adicionalmente usar entre los parentesis del comando `pd.concat` unicamente los Dataframes usados, `asegurarse de eliminar los sobrantes` ([D1,D2,...,DN])

In [None]:
D2=pd.read_csv('', sep=';').fillna('No reporta')
#D3=pd.read_csv('', sep=';').fillna('No reporta')
#D4=pd.read_csv('', sep=';').fillna('No reporta')
Df=pd.concat([D1,D2,D3,D4]).drop_duplicates('id_case')

### Conexion con el servidor

Para esta propuesta, se establece la conexión con el servidor local utilizando XAMPP. Es importante tener activo el servidor local y tener en cuenta que no se podrá acceder desde otra aplicación si mysql.connector está activo.

Sin embargo, es posible cambiar la ubicación del host, usuario y contraseña. Solo necesitas modificar los valores correspondientes entre comillas dobles según tus requerimientos.

In [2]:
import mysql.connector
cnx = mysql.connector.connect(
  host="localhost",
  user="root",
  password="",
  database='Casos_Covid')

cursor = cnx.cursor()

### Carga de Datos unicos
Con el objetivo de evitar peticiones innecesarias, se procede a eliminar los datos de la tabla en la base de datos cuyo campo id_case ya esté registrado. Este campo se trata como un identificador único para cada registro.

Es importante destacar que si todas las tablas excepto la última (cases) no se llenan con datos, el código las tratará como si ninguna tabla hubiera sido llenada.

In [None]:
cursor.execute("SELECT id_case FROM cases" )

result = cursor.fetchall()

id_case=list(Df["id_case"])
Data_cleaner= np.setdiff1d(id_case,result)

mask = Df['id_case'].isin(Data_cleaner)
Data = Df[mask].copy()
Data.head(2)

### Extraccion unica de Departamentos y municipios

Se parte del supuesto de que los datos provenientes de otros departamentos y municipios poseerán identificadores que no se superponen con los actuales. Sin embargo, en caso de interferencia, se deberá aplicar un enfoque similar al utilizado en la fórmula `clean_datos_previos`, que se encuentra en la siguiente casilla.

Por último, se intercambian las columnas ya que los campos de `municipios` y `departamentos` están cruzados.

In [None]:
name_department=Data.drop_duplicates(['id_municipality', 'name_municipality'], keep='last').reset_index()
name_municipality=Data.drop_duplicates(['id_department', 'name_department'], keep='last').reset_index()  

### Extraccion unica de datos sobre genero, estado y el tipo de contagio
mediante la funcion clean_datos_previos se limpian los datos para subir unicamente aquellos que no esten registrados en las tablas correspondientes, con esto se consigue que el identificador permanezca sin saltos de numero

In [None]:
def clean_datos_previos(df, columna, sql_table, sql_columna):
    data=df[columna].drop_duplicates().reset_index()
    df_list=list(df[columna])
    query="SELECT "+sql_columna+" FROM "+ sql_table 
    cursor.execute(query)
    
    result = cursor.fetchall()
    df_cleaner= np.setdiff1d(df_list,result)
    
    mask = data[columna].isin(df_cleaner)
    return data[mask].copy()
gender = clean_datos_previos(Data, 'gender', 'Gender','name')
status = clean_datos_previos(Data, 'status', 'Status','name')
type_contagion = clean_datos_previos(Data, 'type_contagion', 'Type_Contagion','name')
type_contagion.head()

### Conversion de fechas al formato admitido por SQL

In [None]:
fechas=Data[['date_symptom','date_death','date_diagnosis','date_recovery']]
for col in fechas.columns:
    Data[col] = pd.to_datetime(Data[col], format='%d/%m/%Y', errors='coerce')

Data.head(2)

### Actualización de datos en las tablas menores

In [None]:
for i in range(0,len(name_department)):
    try: 
        add_Departamento = ("INSERT INTO Department "
                            "(id_department,name) "
                            "VALUES (%(id_dep)s, %(name)s)")
        data_Departamento = ({'id_dep': int(name_department['id_municipality'][i]),
                             'name': name_department['name_municipality'][i]})
        cursor.execute(add_Departamento, data_Departamento)
    
        
    except:
        pass
    cnx.commit()

In [None]:
for i in range(0,len(name_municipality)):
    try: 
        add_Municipio = ("INSERT INTO Municipality "
                            "(id_municipality,name, id_department) "
                            "VALUES (%(id_mun)s, %(name)s, %(id_dep)s)")
        data_Municipio = ({'id_mun': int(name_municipality['id_department'][i]),
                             'name': name_municipality['name_department'][i]+', '+name_municipality['name_municipality'][i],
                            'id_dep': int(name_municipality['id_municipality'][i])})
        cursor.execute(add_Municipio, data_Municipio)
        
    except:
        pass
    cnx.commit()

In [None]:
for i in range(0,len(gender)):
    try:
        add_nombre = ("INSERT INTO Gender "
                        "(name) "
                        "VALUES (%(name)s)")
        data_nombre = ({'name': gender['gender'][i]})
        cursor.execute(add_nombre, data_nombre)
        
    except:
        pass
    cnx.commit()

In [None]:
for i in range(0,len(status)):
    try:
        add_status = ("INSERT INTO Status "
                            "(name)"
                            "VALUES (%(name)s)")
        data_status = ({'name': status['status'][i]})
        cursor.execute(add_status, data_status)
        
    except:
        pass
    cnx.commit()

In [None]:
for i in range(0,len(type_contagion)):
    try:
        add_type = ("INSERT INTO Type_Contagion "
                            "(name) "
                            "VALUES (%(name)s)")
        data_type = ({'name': type_contagion['type_contagion'][i]})
        cursor.execute(add_type, data_type)
    except:
        pass
    cnx.commit()

### Adquisicion de los identificadores de las tablas de status, gender y type_contagion 

In [None]:
cursor.execute("SELECT * FROM Status" )

result = cursor.fetchall()
st_sql=pd.DataFrame(result, columns=['id_status', 'status'])
Data=Data.merge(st_sql)

cursor.execute("SELECT * FROM Gender" )

result = cursor.fetchall()
gen_sql=pd.DataFrame(result, columns=['id_gender', 'gender'])

Data=Data.merge(gen_sql)

cursor.execute("SELECT * FROM Type_Contagion" )

result = cursor.fetchall()
gen_type_con=pd.DataFrame(result, columns=['id_type_con', 'type_contagion'])

Data=Data.merge(gen_type_con)
Data.head(2)

### Actualizacion de datos individuales 
La siguiente casilla es la mas compleja de ejecutar, ya que realiza la actualización para cada dato de forma individual.

In [None]:
for i in range(0,len(Data)):
    add_case = ("""INSERT INTO Cases 
                        (id_case,
                            id_municipality,
                            age,
                            id_gender,
                            id_type_Contagion,
                            id_status,
                            date_symptom,
                            date_death,
                            date_diagnosis,
                            date_recovery)
                     VALUES (
                            %(id_case)s,
                            %(id_municipality)s,
                            %(age)s,
                            %(id_gender)s,
                            %(id_type_Contagion)s,
                            %(id_status)s,
                            %(date_symptom)s,
                            %(date_death)s,
                            %(date_diagnosis)s,
                            %(date_recovery)s) """)
    data_case = ({    'id_case' : int(Data['id_case'][i]),
                      'id_municipality':int(Data['id_department'][i]),
                      'age':int(Data['age'][i]),
                      'id_gender':int(Data['id_gender'][i]),
                      'id_type_Contagion':int(Data['id_type_con'][i]),
                      'id_status':int(Data['id_status'][i]),
                      'date_symptom':str(Data['date_symptom'][i]),
                      'date_death':str(Data['date_death'][i]),
                      'date_diagnosis':str(Data['date_diagnosis'][i]),
                      'date_recovery':str(Data['date_recovery'][i])})
    try:
        cursor.execute(add_case, data_case)
    except:
        print('Dato ya agregado')
    cnx.commit()

### cierre de la conexion
Al ejecutar la siguiente casilla se cierra la conexion con el servidor y la base de datos deja de estar disponible para este documento

In [None]:
cursor.close()
cnx.close()