# Práctica ETL con Python 🐍
- Tomar como referencia una base de datos realizada en cuatrimestres anteriores o actual
- La extracción involucre el contenido de por lo menos tres tablas relacionadas de la base de datos seleccionada en el numeral
- La transformación deberán ser como mínimo: cambio de nombre de columnas, eliminación de columnas,  eliminación de datos null y combinación de datos de los 3 dataset (tablas de la base de datos relacional) . Para la combinación utilizar el método "merge" de python (parametro inner, right o left)
- Cargar el resultado de las transformaciones en una tabla en postgres.

## Importacion de librerias

In [None]:
import pyodbc
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import sys

## Conexion con SQL Server

In [None]:
server = 'DESKTOP-C20IQMM'
database = 'heladeria'

# Cadena de conexion
connection_string_sql_server = f'DRIVER={{SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes;'

# Conexion a la base de datos
connection_sql_server = pyodbc.connect(connection_string_sql_server)

## Crecion de los archivos ```.csv``` de las tablas

In [None]:
array_tables = ['clientes', 'productos', 'ventas']

queries = [f"SELECT * FROM {table}" for table in array_tables]

# Leer cada tabla y crear un array de DataFrame de pandas
data_frames = [pd.read_sql_query(query, connection_sql_server) for query in queries]

# Se cierra la conexino a SQL server
connection_sql_server.close()

# Reemplaza 'ruta_del_archivo.csv' con la ubicación y el nombre que desees para el archivo CSV
ruta_de_archivos_csv = [f"{table}_table.csv" for table in array_tables]

# Guardar el DataFrame como un archivo CSV
for i, data_frame in enumerate(data_frames):
    data_frame.to_csv(ruta_de_archivos_csv[i], index=False, encoding='utf-8')

## Lectura de los arhivos ```.csv``` generados

In [None]:
# Leemos el contenido de los archivos csv
data_clientes, data_productos, data_ventas = map(lambda path: pd.read_csv(path), ruta_de_archivos_csv)

In [None]:
data_clientes

In [None]:
data_productos

In [None]:
data_ventas

## Cambio de nombre de columnas

In [None]:
# Se renombrará la columnas <nombre> por <nombres>
# axis = 1 => especifica que es la columna
data_clientes = data_clientes.rename({'nombre': 'nombres'}, axis=1)

data_clientes

## Eliminación de columnas

In [None]:
# Se eliminan la columna 4 (TELEFONO)
# axis = 1 => especifica que es la columna
data_clientes.drop(data_clientes.columns[[3]], axis=1, inplace=True)

data_clientes

## Eliminación de datos null 

In [None]:
# inplace=True => reemplaza el DataFrime original
data_productos.dropna(subset=['descripcion'], inplace=True)

data_productos

## Combinacion  de datos de los 3 dataset (tablas de la base de datos relacional)
- Para la combinación utilizar el método "merge" de python (parametro inner, right o left)

In [None]:
data_merged_ventas_clientes = pd.merge(data_ventas,
                                       data_clientes,
                                       how='inner',
                                       left_on=["id_cliente",],
                                       right_on=["id_cliente"],
                                       suffixes=('', '_cliente')
                                       )

data_merged_ventas_clientes

In [None]:
data_merged_ventas_clientes_productos = pd.merge(data_merged_ventas_clientes,
                                                 data_productos,
                                                 how='inner',
                                                 left_on=["id_producto",],
                                                 right_on=["id_producto"],
                                                 suffixes=('', '_producto')
                                                 )

data_merged_ventas_clientes_productos

## Carga de datos

- BD : postgre
- Database : etl_practice_python

In [None]:
# Se comprueban los campos de la nueva coleccion
data_merged_ventas_clientes_productos.columns

### Query de creacion de tabla

```sql
CREATE TABLE table_merged (
  "id_venta" INT,
  "id_cliente" INT,
  "id_producto" INT,
  "fecha_venta" DATE,
  "cantidad" INT,
  "nombres" VARCHAR(50),
  "direccion" VARCHAR(50),
  "nombre" VARCHAR(50),
  "precio" DECIMAL(8, 2),
  "descripcion" VARCHAR(200)
)
```

## Conexión a postgres

In [None]:
# Declaramos un diccionario con datos para la conexion a la bd
postgre_dictionary_connection = {
  "host": "localhost",
  "user": "postgres",
  "password": "pato",
  "database": "etl_practice_python"
}

In [None]:
# Declaramos la funcion para conectar a postgre
def connect_to_postgre(dictionary_connection):
    connection = None

    try:
        print('Connecting to PostgreSQL server')
        connection = psycopg2.connect(**dictionary_connection)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1)

    print("Connection successful")
    return connection

In [None]:
# Guardamos en <connection> la conexion a postgree
connection_postgre = connect_to_postgre(postgre_dictionary_connection)

In [None]:
# Definicion una funcion para insertar datos en postgree
def execute_many(connection, data, table):
    tuples = [tuple(x) for x in data.to_numpy()]

    cols = ','.join(list(data.columns))

    query = "INSERT INTO %s(%s) VALUES (%%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s, %%s)" % (
        table, cols)

    cursor = connection.cursor()

    try:
        cursor.executemany(query, tuples)
        connection.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        connection.rollback()
        cursor.close()
        return 1
    print("execute_many() done")
    cursor.close()

In [None]:
# Ejecutamos la funcion
execute_many(connection_postgre, data_merged_ventas_clientes_productos, 'table_merged')