# Ingest data to MySQL db

In [1]:
import os
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from mysql.connector import connect, Error, IntegrityError
from pathlib import Path

Agarramos los datos

In [2]:
pwd = Path.cwd()
data_prov = pwd.parent / "data" / / "cleaned" / "proveedores-clean.csv"
data_donan = pwd.parent / "data" / "cleaned" / "donantes-clean.csv"
df = pd.read_csv(data_prov)
df2 = pd.read_csv(data_donan)
np.random.seed(42)

In [3]:
df.sample(3)

Unnamed: 0,Numero Proveedor,Nombre Proveedor,CUIT,Categoria Proveedor,Tipo de Contribuyente,Contacto,Correo Electronico,Telefono,Razon Social,Importe,Fecha,Nro_Cuenta,Ciudad
521,P00020,Juegos del Norte,30-98765437-1,Materiales,Responsable inscripto,Pablo Fernández,pablo.fernandez@empresa.zom,0123-4567,S.R.L,292966.0,2024-04-15,506100,Rawson
388,P00094,Gabriela Martínez,30-98765437-1,Agente impositivo,Monotributista,Gabriela Martínez,gabriela.martinez@empresa.zom,4567-8901,S.R.L,37732.0,2024-03-21,503100,Viedma
6,P00006,Grupo Alfa,30-97531246-1,Servicios,Responsable inscripto,Alberto Fernández,alberto.fernandez@empresa.zom,6789-0123,S.R.L,251833.0,2024-01-09,501400,San Miguel de Tucumán


In [4]:
df2.sample(3)

Unnamed: 0,Numero,Nombre,Tipo,Contacto,Correo Electronico,Telefono,Razon Social,Tipo de Contribuyente,CUIT,Fecha,Activo,Frecuencia,Importe,Nro de Cuenta,Pais
243,D00123,Merendero Solidario,ONG,Contacto VS,solidario@mail.com,(+)591 5550-9225,GOB,Monotributista,24-72582356-8,2021-02-01,True,Mensual,455378.0,405100,Bolivia
101,D00098,Pablo Fernández,Individuo,Contacto TTTT,Pablo Fernández@example.com,(011) 5000-0098,S.A,Monotributista,27-89012344-7,2024-05-05,True,Bimestral,133131.0,403101,Colombia
230,D00135,Logística Global S.R.L.,Campaña,Contacto ALG,global@logisticaglobal.com,(+)56 9 7654 3210,S.R.L,Monotributista,20-98765432-1,2020-01-01,True,Bimestral,545228.0,405100,Chile


## Mandar data a las tablas

### Conectando a la base de datos que esta en Docker

In [5]:
load_dotenv()
db_config = {
    "host": os.getenv("DATABASE_HOST"),   ## gateway of container
    "user": os.getenv("DATABASE_USER"),
    "password": os.getenv("DATABASE_PASSWORD"),
    "database": os.getenv("DATABASE_NAME")
}
def create_connection(db_config):
    conn = None
    try:
        conn = connect(**db_config)   # < --
        if conn.is_connected():
            print("Conectadisimo")
    except Error as e:
        print(f"Error: {e}")
    return conn

# conn = mysql.connector.connect(**db_config)
# cursor = conn.cursor(buffered=True)
conn = create_connection(db_config)

Conectadisimo


In [6]:
# Funcion para insertar los datos, evitar duplicados y corregir error de tipo de dato
def insert_or_ignore(query: str, data: tuple) -> None:
    # Importante que el `dato` sea una tupla en forma de `(value,)`
    # si es que los datos vienen de una sola columna
    try:
        data = tuple(int(x) if isinstance(x, (np.integer)) else x for x in data) # para Nro_Cuenta
        cursor.execute(query, data)
        conn.commit()
    except IntegrityError as e:
        print(f"Error: {e}")

In [7]:
# Creamos el cursor
def crear_cursor(conn):    
    try:
        cursor = conn.cursor(buffered=True)
    except Error as e:
        print(f"Error: {e}")
    return cursor
cursor = crear_cursor(conn)

Antes de arrancar para simplificar las cosas hay que cambiar el nombre de las columnas del `DataFrame` para que sean iguales a las de la base de datos

In [8]:
rename_cols = {"Numero Proveedor": "numero",
                   "Nombre Proveedor": "nombre",
                   "CUIT": "cuit",
                   "Contacto": "contacto",
                   "Correo Electronico": "mail",
                   "Telefono": "telefono",
                   "Categoria Proveedor": "categoria",
                   "Tipo de Contribuyente": "contribuyente",
                   "Razon Social": "razon",
                   "Ciudad": "ciudad",
                   "Nro_Cuenta": "nro_cuenta",
                   "Importe": "importe",
                   "Fecha": "fecha",}

df.rename(columns=rename_cols, inplace=True)


Funcion `ingest_table` para insterar los datos a su correspondiente table

In [9]:
df.columns

Index(['numero', 'nombre', 'cuit', 'categoria', 'contribuyente', 'contacto',
       'mail', 'telefono', 'razon', 'importe', 'fecha', 'nro_cuenta',
       'ciudad'],
      dtype='object')

Funcion para meter los datos a las tablas

In [10]:
def ingest_table(df: pd.DataFrame, table: str, col_name: str) -> None:
    count = 0
    unique_table = df[col_name].unique()
    for value in unique_table:
        query = f"""
        INSERT INTO `{table}` (`{col_name}`) VALUES (%s)
        ON DUPLICATE KEY UPDATE `{col_name}`=VALUES(`{col_name}`);
        """
        insert_or_ignore(query, (value,)) ## placeholder (%s)
        count += 1
    print(f"Cantidad de filas insertadas en {table}: {count}")


In [11]:
def get_all_tables(cursor):
    cursor.execute("SHOW TABLES")
    tables = cursor.fetchall()
    return [table[0] for table in tables]

# Usage
all_tables = get_all_tables(cursor)
print("All tables:", all_tables)

All tables: ['categoria_proveedores', 'ciudades', 'cuentas', 'donantes', 'frecuencias', 'gastos', 'ingresos', 'paises', 'proveedores', 'razones_sociales', 'tipo_contribuyentes', 'tipos']


### Tablas relacionadas a proveedores

No es la forma mas elegante pero sirve

In [12]:
# los datos deberian venir de un archivo de configuracion imagino
table_to_col = {"categoria_proveedores": "categoria",
                "tipo_contribuyentes": "contribuyente",
                "razones_sociales": "razon",
                "ciudades": "ciudad",
                "cuentas": "nro_cuenta"}

for table, col in table_to_col.items():
    ingest_table(df, table, col)

Cantidad de filas insertadas en categoria_proveedores: 13
Cantidad de filas insertadas en tipo_contribuyentes: 2
Cantidad de filas insertadas en razones_sociales: 4
Cantidad de filas insertadas en ciudades: 42
Cantidad de filas insertadas en cuentas: 8


In [13]:
tuple(table_to_col.items())

(('categoria_proveedores', 'categoria'),
 ('tipo_contribuyentes', 'contribuyente'),
 ('razones_sociales', 'razon'),
 ('ciudades', 'ciudad'),
 ('cuentas', 'nro_cuenta'))

## Proveedores

In [14]:
# datos unicos de los proveedores basado en el numero
df_proveedores = df.drop_duplicates(subset='numero')
como = "numero"
print(f"len df: {len(df)}")
print(f'dfpro: {df_proveedores[como].count()}, df: {df[como].count()}')
print(f'dfpro: {df_proveedores[como].nunique()}, df: {df[como].nunique()}')

len df: 523
dfpro: 147, df: 523
dfpro: 147, df: 147


Fetch foreign keys

In [15]:
# funciona para mapear las foreign keys con la columna pertinente
def fetch_fk(cursor, id, col, table) -> dict:
    try:
        cursor.execute(f"SELECT `{id}`, `{col}` FROM `{table}`")
    except Exception as e:
        print(f"Error {e}")
    mapping = {col: id for id, col in cursor.fetchall()}
    print(f"Fecth fkeys from {table}")
    return mapping

# complicandola de chill
# traer las fk de todas las tablas que tengan
list_fk = ["id_categoria", "id_contribuyente", "id_razon", "id_ciudad"]
table_cols: tuple[tuple] = tuple(table_to_col.items())

In [16]:
# Agarramos todas las foreign keys y las guardamos para hacerlo mas rapido
# Fetch id_categoria    
categoria_mapping = fetch_fk(cursor, list_fk[0], table_cols[0][1], table_cols[0][0])
# # Fetch id_contribuyente
contribuyente_mapping = fetch_fk(cursor, list_fk[1], table_cols[1][1], table_cols[1][0])
# # Fetch id_razon
razon_mapping = fetch_fk(cursor, list_fk[2], table_cols[2][1], table_cols[2][0])
# # Fetch id_ciudad
ciudades_mapping = fetch_fk(cursor, list_fk[3], table_cols[3][1], table_cols[3][0])

Fecth fkeys from categoria_proveedores
Fecth fkeys from tipo_contribuyentes
Fecth fkeys from razones_sociales
Fecth fkeys from ciudades


In [17]:
def ingest_proveedores(df_proveedores: pd.DataFrame, cursor) -> None:
    # to test
    count = 0
    # Step 6: Insert unique data into proveedores
    for _, row in df_proveedores.iterrows():
        # Agarramos la foreign key que mapee con el valor de la fila en cuestion
        id_categoria = categoria_mapping.get(row['categoria'])
        id_contribuyente = contribuyente_mapping.get(row['contribuyente'])
        id_razon = razon_mapping.get(row['razon'])
        id_ciudad = ciudades_mapping.get(row['ciudad'])
    
        query = """
        INSERT INTO proveedores (numero, nombre, cuit, contacto, mail, telefono, id_categoria, id_contribuyente, id_razon, id_ciudad)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE 
            nombre = VALUES(nombre), 
            cuit = VALUES(cuit), 
            contacto = VALUES(contacto), 
            mail = VALUES(mail), 
            telefono = VALUES(telefono), 
            id_categoria = VALUES(id_categoria), 
            id_contribuyente = VALUES(id_contribuyente), 
            id_razon = VALUES(id_razon), 
            id_ciudad = VALUES(id_ciudad);
        """
        data = (
            row['numero'], row['nombre'], row['cuit'], row['contacto'], 
            row['mail'], row['telefono'], id_categoria, 
            id_contribuyente, id_razon, id_ciudad
        )
    
        # Insert into proveedores table
        insert_or_ignore(query, data)
        count += 1
        #insert_or_ignore(query, (ciudad,))
    print(f"Cantidad de filas insertadas: {count}")
        
ingest_proveedores(df_proveedores, cursor)

Cantidad de filas insertadas: 147


In [18]:
# Fetch id_proveedor --> se llama 'id'
proveedor_mapping = fetch_fk(cursor, "id", "numero", "proveedores")
# Fetch id_cuenta
cuenta_mapping = fetch_fk(cursor, "id_cuenta", "nro_cuenta", "cuentas")

Fecth fkeys from proveedores
Fecth fkeys from cuentas


In [19]:
def ingest_gastos(df: pd.DataFrame) -> None:
    count = 0
    # Step 8: Insert unique data into gastos --> todo el df ya que son todas las transacciones
    for _, row in df.iterrows():  # This should iterate through the full dataset
        # Retrieve foreign key ids from pre-fetched mappings
        id_proveedor = proveedor_mapping.get(row['numero'])
        id_cuenta = cuenta_mapping.get(row['nro_cuenta'])
    
        # Only insert if foreign keys are found
        if id_proveedor is not None and id_cuenta is not None:
            query = """
            INSERT INTO gastos (importe, fecha, id_proveedor, id_cuenta)
            VALUES (%s, %s, %s, %s);
            """
            data = (row['importe'], row['fecha'], id_proveedor, id_cuenta)
            insert_or_ignore(query, data)
            count += 1
        else:
            print(f"Foreign key not found for {row['numero']} or {row['nro_cuenta']}")
    print(f"Cantidad de filas insertadas: {count}")

ingest_gastos(df)

Cantidad de filas insertadas: 523


## Donantes

Hay que reenombrar tambien las columnas matcheando las de las tablas

In [20]:
l1 = all_tables
l2 = df2.columns.to_list()
l1.extend([np.nan] * (len(l2) - len(l1)))
temp_df = pd.DataFrame({"col_df": l2, "tablas_db": l1})
temp_df


Unnamed: 0,col_df,tablas_db
0,Numero,categoria_proveedores
1,Nombre,ciudades
2,Tipo,cuentas
3,Contacto,donantes
4,Correo Electronico,frecuencias
5,Telefono,gastos
6,Razon Social,ingresos
7,Tipo de Contribuyente,paises
8,CUIT,proveedores
9,Fecha,razones_sociales


In [21]:

rename_cols = {"Numero": "numero",
                   "Nombre": "nombre",
                   "CUIT": "cuit",
                   "Contacto": "contacto",
                   "Correo Electronico": "mail",
                   "Telefono": "telefono",
                   "Frecuencia": "frecuencia",
                   "Tipo de Contribuyente": "contribuyente",
                   "Tipo": "tipo",
                   "Activo": "activo",
                   "Razon Social": "razon",
                   "Pais": "pais",
                   "Nro de Cuenta": "nro_cuenta",
                   "Importe": "importe",
                   "Fecha": "fecha",}

df2.rename(columns=rename_cols, inplace=True)

In [22]:
df2.columns

Index(['numero', 'nombre', 'tipo', 'contacto', 'mail', 'telefono', 'razon',
       'contribuyente', 'cuit', 'fecha', 'activo', 'frecuencia', 'importe',
       'nro_cuenta', 'pais'],
      dtype='object')

### Insertar datos en las tablas para despues relacionarla con la tabla `donantes`

In [23]:
# los datos deberian venir de un archivo de configuracion imagino
# tabla: columna
table_to_col = {"frecuencias": "frecuencia",
                "tipo_contribuyentes": "contribuyente",
                "razones_sociales": "razon",
                "tipos": "tipo",
                "paises": "pais",
                "cuentas": "nro_cuenta"}

for table, col in table_to_col.items():
    ingest_table(df2, table, col)

Cantidad de filas insertadas en frecuencias: 5
Cantidad de filas insertadas en tipo_contribuyentes: 3
Cantidad de filas insertadas en razones_sociales: 5
Cantidad de filas insertadas en tipos: 7
Cantidad de filas insertadas en paises: 10
Cantidad de filas insertadas en cuentas: 7


### Arrancamos con Donantes

In [24]:

# datos unicos de los proveedores basado en el numero
df_donantes = df2.drop_duplicates(subset='numero')
como = "numero"
print(f"len df: {len(df2)}")
print(f'dfpro: {df_donantes[como].count()}, df: {df2[como].count()}')
print(f'dfpro: {df_donantes[como].nunique()}, df: {df2[como].nunique()}')

len df: 608
dfpro: 138, df: 608
dfpro: 138, df: 138


Fetch foreign keys  
Hay que volver a fetchear `categoria`, `contribuyente` y `razon social` ya fuero los datos fueron actualizados.  
 Al igual que `tipo`, `pais` y `frecuencia` que son tablas nuevas `Donantes`

In [25]:
razon_mapping = fetch_fk(cursor, id="id_razon", col="razon", table="razones_sociales")
contribuyente_mapping = fetch_fk(cursor, id="id_contribuyente", col="contribuyente", table="tipo_contribuyentes")

tipo_mapping = fetch_fk(cursor, id="id_tipo", col="tipo", table="tipos")
pais_mapping = fetch_fk(cursor, id="id_pais", col="pais", table="paises")
frecuencia_mapping = fetch_fk(cursor, id="id_frecuencia", col="frecuencia", table="frecuencias")


Fecth fkeys from razones_sociales
Fecth fkeys from tipo_contribuyentes
Fecth fkeys from tipos
Fecth fkeys from paises
Fecth fkeys from frecuencias


In [26]:
ke = 0
for _, row in df_donantes.iterrows():
    frecuencia_mapping.get(row['frecuencia'])
    ke += 1
print(ke, df_donantes['frecuencia'].count())

138 138


In [27]:
def sort_dict(dictio: dict) -> dict:
    return dict(sorted(dictio.items(), key=lambda item: item[1]))

print(sort_dict(frecuencia_mapping))

{'Mensual': 1, 'Bimestral': 2, 'Anual': 3, 'Semestral': 4, 'Trimestral': 5}


In [28]:
df_donantes.iloc[:3, :]

Unnamed: 0,numero,nombre,tipo,contacto,mail,telefono,razon,contribuyente,cuit,fecha,activo,frecuencia,importe,nro_cuenta,pais
0,D00001,TechNova Solutions,Empresa,Contacto A,TechNova Solutions@example.com,(011) 5000-0001,S.R.L,Responsable Inscripto,20-12345678-9,2024-02-26,False,Mensual,292732.0,402101,Chile
1,D00002,EnergiaPlus S.A.,Empresa,Contacto B,EnergiaPlus S.A.@example.com,(011) 5000-0002,S.A,Monotributista,21-23456789-0,2024-01-12,False,Bimestral,326439.0,403101,Uruguay
2,D00003,Grupo Delta,Empresa,Contacto C,Grupo Delta@example.com,(011) 5000-0003,S.R.L,Responsable Inscripto,22-34567890-1,2024-06-08,True,Anual,276920.0,403101,Uruguay


In [29]:

def ingest_donantes(df_donantes: pd.DataFrame, cursor) -> None:
    # to test
    df_donantes = df_donantes.reset_index(drop=True)
    count = 0
    # Step 6: Insert unique data into proveedores
    for _, row in df_donantes.iterrows():
        # Agarramos la foreign key que mapee con el valor de la fila en cuestion
        id_frecuencia = frecuencia_mapping.get(row['frecuencia'])
        id_contribuyente = contribuyente_mapping.get(row['contribuyente'])
        id_razon = razon_mapping.get(row['razon'])
        id_tipo = tipo_mapping.get(row['tipo'])
        id_pais = pais_mapping.get(row['pais'])
    
        query = """
        INSERT INTO donantes (numero, nombre, cuit, contacto, mail, telefono, activo, id_frecuencia, id_contribuyente, id_razon, id_tipo, id_pais)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE 
            nombre = VALUES(nombre), 
            cuit = VALUES(cuit), 
            contacto = VALUES(contacto), 
            mail = VALUES(mail), 
            activo = VALUES(activo), 
            telefono = VALUES(telefono), 
            id_frecuencia = VALUES(id_frecuencia), 
            id_contribuyente = VALUES(id_contribuyente), 
            id_razon = VALUES(id_razon), 
            id_tipo = VALUES(id_tipo), 
            id_pais = VALUES(id_pais);
        """
        data = (
            row['numero'], row['nombre'], row['cuit'], row['contacto'], 
            row['mail'], row['telefono'], row['activo'], id_frecuencia, 
            id_contribuyente, id_razon, id_tipo, id_pais
        )
    
        # Insert into proveedores table
        insert_or_ignore(query, data)
        count += 1
    print(f"Cantidad de filas insertadas: {count}")
        
ingest_donantes(df_donantes, cursor)

Cantidad de filas insertadas: 138


In [30]:
# Fetch id_proveedor --> se llama 'id'
proveedor_mapping = fetch_fk(cursor, "id", "numero", "donantes")
# Fetch id_cuenta
cuenta_mapping = fetch_fk(cursor, "id_cuenta", "nro_cuenta", "cuentas")

Fecth fkeys from donantes
Fecth fkeys from cuentas


In [31]:

def ingest_ingresos(df: pd.DataFrame) -> None:
    count = 0
    # Step 8: Insert unique data into ingresos --> todo el df ya que son todas las transacciones
    for _, row in df.iterrows():  # This should iterate through the full dataset
        # Retrieve foreign key ids from pre-fetched mappings
        id_donante = proveedor_mapping.get(row['numero'])
        id_cuenta = cuenta_mapping.get(row['nro_cuenta'])
    
        # Only insert if foreign keys are found
        if id_donante is not None and id_cuenta is not None:
            query = """
            INSERT INTO ingresos (importe, fecha, id_donante, id_cuenta)
            VALUES (%s, %s, %s, %s);
            """
            data = (row['importe'], row['fecha'], id_donante, id_cuenta)
            insert_or_ignore(query, data)
            count += 1
        else:
            print(f"Foreign key not found for {row['numero']} or {row['nro_cuenta']}")
    print(f"Cantidad de filas insertadas: {count}")

ingest_ingresos(df2)

Cantidad de filas insertadas: 608


In [32]:
# Close the cursor and connection
cursor.close()
conn.close()

## Making sure

```mysql
## Los valores son los correctos pero faltan mas filas en la tabla transacciones
USE db_proveedores;

SELECT numero FROM proveedores WHERE id = 12;

SELECT nombre, importe, fecha
FROM transacciones 
INNER JOIN proveedores 
ON 
proveedores.id = transacciones.id_proveedor
WHERE proveedores.id = 12;

```


| c0              | c1        | c2         |
| --------------- | --------- | ---------- |
| Proveedor Ltda. | 224379.00 | 2024-04-09 |
| Proveedor Ltda. | 153447.00 | 2024-06-03 |

Puede llegar a ser que itera hasta la cantidad de proveedores unicos y el loop corta,  

Hay que revisar

In [43]:
# !pwd
# !jupyter nbconvert --to script ../test-code/final-ingestion-test.ipynb

and fails to parse leap day. The default behavior will change in Python 3.15
to either always raise an exception or to use a different default year (TBD).
To avoid trouble, add a specific year to the input & format.
See https://github.com/python/cpython/issues/70647.
  from nbconvert.nbconvertapp import main
[NbConvertApp] Converting notebook ../test-code/final-ingestion-test.ipynb to script
[NbConvertApp] Writing 15068 bytes to ../test-code/final-ingestion-test.py
