# Cleaning and Conections


#### 1. Library importation

In [1]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import seaborn as sns
import unicodedata
import re
import os
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

df = pd.read_csv("RWventas.csv")

print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 11 columns):
 #   Column           Non-Null Count   Dtype 
---  ------           --------------   ----- 
 0   Ciudad           995449 non-null  object
 1   Fecha            995380 non-null  object
 2   Producto         995395 non-null  object
 3   Tipo_Producto    995374 non-null  object
 4   Cantidad         995470 non-null  object
 5   Precio_Unitario  995545 non-null  object
 6   Tipo_Venta       995477 non-null  object
 7   Tipo_Cliente     995579 non-null  object
 8   Descuento        995497 non-null  object
 9   Costo_Envio      995531 non-null  object
 10  Total            995472 non-null  object
dtypes: object(11)
memory usage: 83.9+ MB
None


1.1 Visialization of corrupted data.

In [2]:
for row in df.columns:
    print(f"'{row}' {df[row].unique()}\n")

'Ciudad' ['Antofagasta' 'Monterrey' 'Valparaíso' 'Sevilla' 'Córdoba'
 'Ciudad de México' 'Houston' 'Mendoza' 'Barcelona' 'Chicago' 'Miami'
 'Pereira' 'Cali' 'Concepción' 'Rosario' 'Madrid' 'Arequipa' 'Lima'
 'Santiago' 'Bucaramanga' 'Tijuana' 'Guadalajara' 'Cusco' 'New York'
 'Buenos Aires' 'Valencia' 'Los Angeles' 'Bogotá' 'Trujillo' nan 'Puebla'
 'Medellín' 'Barranquilla' 'Cartagena' 'Barcelona***' 'Córdoba***'
 'Ciudad de México###' 'Barcelona@@@' 'Madrid###' 'Pereira@@@'
 'Santiago***' '  Barcelona   ' 'Miami###' 'vALPARAÍSO' 'cÓRDOBA'
 '  Miami   ' 'Puebla###' 'Medellín@@@' 'Sevilla***' 'Puebla***'
 '  Valparaíso   ' '  Bogotá   ' 'Bogotá###' '  Ciudad de México   '
 '  Pereira   ' 'cUSCO' 'Cali***' 'Barcelona###' 'pEREIRA' '  Cali   '
 'Buenos Aires***' 'cONCEPCIÓN' 'nEW yORK' '  Trujillo   ' '  Sevilla   '
 '  Antofagasta   ' 'Mendoza***' 'gUADALAJARA' 'Trujillo@@@' 'hOUSTON'
 '  Concepción   ' 'Cusco###' 'Madrid@@@' 'Antofagasta###' 'mEDELLÍN'
 'tIJUANA' '  Guadalajara   ' '  M

In [3]:
mis_columns = df.isna().sum()
mis_total = df.isna().sum().sum()

print(f"Missing data per column{ mis_columns}\nTotal data missing {mis_total}")

Missing data per columnCiudad             4551
Fecha              4620
Producto           4605
Tipo_Producto      4626
Cantidad           4530
Precio_Unitario    4455
Tipo_Venta         4523
Tipo_Cliente       4421
Descuento          4503
Costo_Envio        4469
Total              4528
dtype: int64
Total data missing 49831


#### 2. Cleaning DataSet

In [4]:
def clean_city(texto):
    if pd.isna(texto):
        return texto
    
    # text to lower case
    texto = texto.lower().strip()

    # delet ascents
    texto = ''.join(c for c in unicodedata.normalize('NFD', texto) if unicodedata.category(c) != 'Mn')

    # delet special characters only keep letters
    texto = re.sub(r'[^a-z\s]', '', texto)

    # remove double spaces "  "
    texto = re.sub(r'\s+', ' ', texto)

    return texto

# Normalizing text
cols_texto = ["Ciudad", "Producto", "Tipo_Producto", "Tipo_Venta", "Tipo_Cliente"]
for col in cols_texto:
    df[col] = df[col].apply(clean_city)

# Conversion to numeric
df["Descuento"] = pd.to_numeric(df["Descuento"], errors="coerce")
df["Precio_Unitario"] = pd.to_numeric(df["Precio_Unitario"], errors="coerce")
df["Cantidad"] = pd.to_numeric(df["Cantidad"], errors="coerce")
df["Costo_Envio"] = pd.to_numeric(df["Costo_Envio"], errors="coerce")
df["Total"] = pd.to_numeric(df["Total"], errors="coerce")

# Fill missing
df["Tipo_Cliente"] = df["Tipo_Cliente"].replace("nan", np.nan)
df["Tipo_Cliente"] = df["Tipo_Cliente"].fillna("desconocido")

df["Descuento"] = df["Descuento"].fillna(0)
df["Precio_Unitario"] = df["Precio_Unitario"].fillna(0)
df["Cantidad"] = df["Cantidad"].fillna(0)
df["Costo_Envio"] = df["Costo_Envio"].fillna(0)
df["Total"] = df["Total"].fillna(0)

# Conversion to date format
df["Fecha"] = pd.to_datetime(df["Fecha"], errors="coerce")


# Recalculating "Total" column
df["Total"] = (df["Cantidad"] * df["Precio_Unitario"]) - (df["Descuento"] * df["Cantidad"] * df["Precio_Unitario"]) + df["Costo_Envio"]

df.columns = df.columns.str.lower()

# 6. Save changes in a new .csv

df.to_csv("clean_sales.csv", index=False)

In [5]:
for row in df.columns:
    print(f"'{row}' {df[row].unique()}\n")

'ciudad' ['antofagasta' 'monterrey' 'valparaiso' 'sevilla' 'cordoba'
 'ciudad de mexico' 'houston' 'mendoza' 'barcelona' 'chicago' 'miami'
 'pereira' 'cali' 'concepcion' 'rosario' 'madrid' 'arequipa' 'lima'
 'santiago' 'bucaramanga' 'tijuana' 'guadalajara' 'cusco' 'new york'
 'buenos aires' 'valencia' 'los angeles' 'bogota' 'trujillo' nan 'puebla'
 'medellin' 'barranquilla' 'cartagena']

'fecha' <DatetimeArray>
['2025-11-28 00:00:00', '2025-11-29 00:00:00', '2025-12-07 00:00:00',
 '2025-12-01 00:00:00', '2025-11-18 00:00:00', '2025-11-20 00:00:00',
 '2025-11-15 00:00:00', '2025-11-30 00:00:00', '2025-11-26 00:00:00',
 '2025-11-09 00:00:00', '2025-11-16 00:00:00', '2025-11-12 00:00:00',
 '2025-11-14 00:00:00', '2025-12-04 00:00:00', '2025-11-22 00:00:00',
 '2025-12-02 00:00:00', '2025-11-24 00:00:00', '2025-11-08 00:00:00',
 '2025-11-11 00:00:00',                 'NaT', '2025-11-19 00:00:00',
 '2025-11-23 00:00:00', '2025-11-13 00:00:00', '2025-11-17 00:00:00',
 '2025-12-05 00:00:00', '

#### 3. PostgreDB connection configuration

In [12]:
load_dotenv(override=True)

#Variables de entorno

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")


URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(URL)

conn = engine.connect()
try:
    print("Connection Succesfull!!" if conn else "")

except Exception as e:
    print("Error al conectar la base datos en", e)


Connection Succesfull!!


#### 4. Setting schema

In [13]:
# Setting schema

set_schema = conn.execute(text("set search_path to riwi_ventas;"))

# Array to verify the status of each one table
tables = ["ciudad", "tipo_producto", "producto", "tipo_venta", "tipo_cliente", "factura_ventas"]

# This loop verfy the status
for t in range(len(tables)):
    verify = pd.read_sql((f"select * from {tables[t]};"),conn)
    if not verify.empty:
        print(f"Talbe: '{tables[t].capitalize()}' Is Filled.")
    if verify.empty:
        print(f"Table: '{tables[t].capitalize()}' Is empty.")

Table: 'Ciudad' Is empty.
Table: 'Tipo_producto' Is empty.
Table: 'Producto' Is empty.
Table: 'Tipo_venta' Is empty.
Table: 'Tipo_cliente' Is empty.
Table: 'Factura_ventas' Is empty.


#### 4.1 Uploading data to PostgresDB

In [14]:
cleaned_df = pd.read_csv("clean_sales.csv")

#Obtain the unique cities and generate the ID for Cities
ciudad_df = (
    cleaned_df["ciudad"].drop_duplicates()
                .reset_index(drop=True)
                .reset_index()
                .rename(columns={"index": "ciudad_id", "ciudad": "nombre_ciudad"})
                
)
ciudad_df["ciudad_id"] += 1

cleaned_df =cleaned_df.merge(ciudad_df, left_on="ciudad", right_on="nombre_ciudad", how="left",suffixes=("", "_drop"))


# Obtain the unique cities and generate the ID for Products

tipo_producto_df = (
    cleaned_df["tipo_producto"].drop_duplicates()
                       .reset_index(drop=True)
                       .reset_index()
                       .rename(columns={"index": "tipo_producto_id"})
)
tipo_producto_df["tipo_producto_id"] += 1

cleaned_df =cleaned_df.merge(tipo_producto_df, on="tipo_producto", how="left",suffixes=("", "_drop"))


# Obtain the unique cities and generate the ID for type products

cleaned_df = cleaned_df.rename(columns={"producto":"nombre_producto"}) #Rename to prevent conflicts with DB

producto_df = (
    cleaned_df[["nombre_producto", "tipo_producto_id"]]
    .drop_duplicates()
    .reset_index(drop=True)
    .reset_index()
    .rename(columns={"index": "producto_id", "producto":"nombre_producto"})
)
producto_df["producto_id"] += 1

cleaned_df =cleaned_df.merge(producto_df, on=["nombre_producto", "tipo_producto_id"], how="left",suffixes=("", "_drop"))

# Obtain the unique cities and generate the ID for type sale

cleaned_df = cleaned_df.rename(columns={"tipo_de_venta":"tipo_venta"}) #Rename to prevent conflicts with DB

tipo_venta_df = (
    cleaned_df["tipo_venta"].drop_duplicates()
                       .reset_index(drop=True)
                       .reset_index()
                       .rename(columns={"index": "tipo_venta_id"})
)
tipo_venta_df["tipo_venta_id"] += 1

cleaned_df =cleaned_df.merge(tipo_venta_df, on="tipo_venta", how="left",suffixes=("", "_drop"))


# Obtain the unique cities and generate the ID for type customer
tipo_cliente_df = (
    cleaned_df["tipo_cliente"].drop_duplicates()
                      .reset_index(drop=True)
                      .reset_index()
                      .rename(columns={"index": "tipo_cliente_id"})
)
tipo_cliente_df["tipo_cliente_id"] += 1

cleaned_df =cleaned_df.merge(tipo_cliente_df, on="tipo_cliente", how="left",suffixes=("", "_drop"))


#Create the table for sales
factura_df =cleaned_df[["fecha", "ciudad_id", "producto_id", "tipo_venta_id", "tipo_cliente_id", "cantidad", "precio_unitario", "descuento", "costo_envio", "total"]].copy()


#export tables to external folder
os.makedirs("tables_csv", exist_ok=True)

#Save the files into tables_csv
ciudad_df.to_csv("tables_csv/ciudad.csv", index=False)
tipo_producto_df.to_csv("tables_csv/tipo_producto.csv", index=False)
producto_df.to_csv("tables_csv/producto.csv", index=False)
tipo_venta_df.to_csv("tables_csv/tipo_venta.csv", index=False)
tipo_cliente_df.to_csv("tables_csv/tipo_cliente.csv", index=False)
factura_df.to_csv("tables_csv/factura_ventas.csv", index=False)

print("CSV creation sucessfull 'tables_csv'")

CSV creation sucessfull 'tables_csv'


In [15]:
#This order respect the cardinality and prevent issues
order_tables = [
    "ciudad",
    "tipo_producto",
    "producto",
    "tipo_venta",
    "tipo_cliente",
    "factura_ventas"
]

SEEDERS_DIR = "tables_csv"

CHUNK_SIZE = 100000  

with engine.connect() as conn:
    
    conn.execute(text("SET search_path TO riwi_ventas;"))

    for table in order_tables:
        csv_path = os.path.join(SEEDERS_DIR, f"{table}.csv")

        if not os.path.exists(csv_path):
            print(f"doesn't exist: {csv_path}")
            continue

        print(f"\nInsert into table: {table}")

        # Read by chunk
        chunk_iter = pd.read_csv(csv_path, chunksize=CHUNK_SIZE)

        for i, chunk_df in enumerate(chunk_iter):
            print(f"Chunk {i+1} inserting... ({len(chunk_df)} rows)")

            # Replace NaN per None or Null for SQL
            chunk_df = chunk_df.where(pd.notnull(chunk_df), None)

            cols = chunk_df.columns.tolist()
            colnames = ", ".join(cols)
            placeholders = ", ".join([f":{c}" for c in cols])

            query = text(f"""
                INSERT INTO {table} ({colnames})
                VALUES ({placeholders});
            """)

            # Ejecutar todas las filas del chunk
            conn.execute(query, chunk_df.to_dict(orient="records"))

        print(f"Table {table} was inserted sucess!!")

    conn.commit()

    conn.close()
    
print("Data inserted sucessfull!!")


Insert into table: ciudad
Chunk 1 inserting... (34 rows)
Table ciudad was inserted sucess!!

Insert into table: tipo_producto
Chunk 1 inserting... (7 rows)
Table tipo_producto was inserted sucess!!

Insert into table: producto
Chunk 1 inserting... (63 rows)
Table producto was inserted sucess!!

Insert into table: tipo_venta
Chunk 1 inserting... (5 rows)
Table tipo_venta was inserted sucess!!

Insert into table: tipo_cliente
Chunk 1 inserting... (6 rows)
Table tipo_cliente was inserted sucess!!

Insert into table: factura_ventas
Chunk 1 inserting... (100000 rows)
Chunk 2 inserting... (100000 rows)
Chunk 3 inserting... (100000 rows)
Chunk 4 inserting... (100000 rows)
Chunk 5 inserting... (100000 rows)
Chunk 6 inserting... (100000 rows)
Chunk 7 inserting... (100000 rows)
Chunk 8 inserting... (100000 rows)
Chunk 9 inserting... (100000 rows)
Chunk 10 inserting... (100000 rows)
Table factura_ventas was inserted sucess!!
Data inserted sucessfull!!


#### 5. Visualization of Data in DB

In [None]:
with engine.connect() as conn:
    # Setting schema
    conn.execute(text("SET search_path TO riwi_ventas"))
    
    # Get table list
    table_list = pd.read_sql(text("SELECT table_name FROM information_schema.tables WHERE table_schema = 'riwi_ventas';"),conn)
    
    msg = f"| Tablas encontradas: {len(table_list)} |"
    
    print("-"*len(msg))
    print(msg)
    print("-"*len(msg))

    idx = 0
    for table in table_list["table_name"]:
        idx+=1
        print(f"{idx}. {table}")
        
    print("\n-------------------------------------------\n")

    # List existing and filled tables.
    for table in table_list["table_name"]:
        verfy = pd.read_sql(text(f"select * from {table} LIMIT 1000;"),conn)
        if not verfy.empty:
            print(f"Table: {table} exist and is filled")
        else:
            print(f"Table: | {table} | doesn't exist or is empty please check in the Data base.")

    conn.close()

-------------------------
| Tablas encontradas: 6 |
-------------------------
1. tipo_producto
2. producto
3. ciudad
4. factura_ventas
5. tipo_venta
6. tipo_cliente

-------------------------------------------

Table: tipo_producto exist and is filled
Table: producto exist and is filled
Table: ciudad exist and is filled
Table: factura_ventas exist and is filled
Table: tipo_venta exist and is filled
Table: tipo_cliente exist and is filled
