# HU 1: Connection and Data Loading from PostgreSQL

**Project:** Sales Analysis with Python & Power BI  
**User Story:** HU 1  
**Objective:** Connect to the `Ventas` database (staging area), extract necessary tables, and prepare data for analysis.

### Description
In this notebook, we perform the initial **ETL (Extract, Transform, Load)** process. We encountered a raw CSV file (`ventas.csv`) with schema misalignment issues. Our goal is to ingest this raw file, fix the structural errors, and load it into a **PostgreSQL** database as a staging table (`raw_sales`) for further cleaning.

### Tasks Covered
1. [x] Configure connection with PostgreSQL using `SQLAlchemy`.
2. [x] Extract and fix raw data schema (headers misalignment).
3. [x] Export fixed data to PostgreSQL (Staging).
4. [x] Document the process.

In [5]:
import pandas as pd
import sys
import os

# Add the src directory to the system path to import local modules
sys.path.append(os.path.abspath(os.path.join('..', 'src')))

from db.db_connection import get_db_engine

## 1. Data Extraction & Schema Analysis

We start by loading the raw CSV file to inspect its structure. Preliminary observation suggests a mismatch between the headers and the actual data content (e.g., City names appearing in the 'Date' column).

In [6]:
# Path to the raw CSV
csv_path = '../data/raw/ventas.csv'

# Read the CSV
# Use header=0 to verify the mismatch initially
df_raw = pd.read_csv(csv_path)

# Display the first few rows to analyze the misalignment
df_raw.head()

Unnamed: 0,Fecha,Producto,Tipo_Producto,Cantidad,Precio_Unitario,Ciudad,Pais,Tipo_Venta,Tipo_Cliente,Descuento,Costo_Envio
0,Santiago,2025-10-30,Arepa,Abarrotes,2.0,3681.0,Online,Minorista,0.2,0.0,5889.0
1,Córdoba,2025-11-17,Arepa,Abarrotes,7.0,2321.0,Distribuidor,Gobierno,0.15,0.0,13809.0
2,Barranquilla,2025-10-22,Leche,Lácteo,9.0,3540.0,Distribuidor,Gobierno,0.2,0.0,25488.0
3,New York,2025-10-20,Cereal,Lácteo,3.0,3287.0,Tienda_Física,Gobierno,0.05,0.0,9367.0
4,Madrid,2025-10-20,Leche,Hogar,2.0,3414.0,Distribuidor,Mayorista,0.0,0.0,6828.0


## 2. Transformation: Schema Alignment

Upon inspection, the original headers are shifted or incorrectly labeled. We have identified the correct data mapping based on the column content:

- **Column 0:** Contains Cities -> Renamed to `City`
- **Column 1:** Contains Dates -> Renamed to `Date`
- **Column 2:** Contains Product Names -> Renamed to `Product`
- ... *(and so on for the rest of the columns)*

The following code reloads the dataset enforcing the correct schema.

In [7]:
# Define the correct column names based on data observation
# Observation: Data starts with City, then Date, etc.
correct_columns = [
    "City",             # Santiago
    "Date",             # 2025-10-30
    "Product",          # Arepa
    "Product_Type",     # Abarrotes
    "Quantity",         # 2.0
    "Unit_Price",       # 3681.0
    "Sales_Type",       # Online
    "Client_Type",      # Minorista
    "Discount",         # 0.2
    "Shipping_Cost",    # 0.0
    "Total_Sales"       # 5889.0
]

# Reload the dataset ignoring the original header (header=0 usually takes row 0 as header, 
# but since row 0 is just wrong names, let's overwrite them)
df_fixed = pd.read_csv(csv_path, header=0, names=correct_columns)

# Let's inspect again
df_fixed.head()

Unnamed: 0,City,Date,Product,Product_Type,Quantity,Unit_Price,Sales_Type,Client_Type,Discount,Shipping_Cost,Total_Sales
0,Santiago,2025-10-30,Arepa,Abarrotes,2.0,3681.0,Online,Minorista,0.2,0.0,5889.0
1,Córdoba,2025-11-17,Arepa,Abarrotes,7.0,2321.0,Distribuidor,Gobierno,0.15,0.0,13809.0
2,Barranquilla,2025-10-22,Leche,Lácteo,9.0,3540.0,Distribuidor,Gobierno,0.2,0.0,25488.0
3,New York,2025-10-20,Cereal,Lácteo,3.0,3287.0,Tienda_Física,Gobierno,0.05,0.0,9367.0
4,Madrid,2025-10-20,Leche,Hogar,2.0,3414.0,Distribuidor,Mayorista,0.0,0.0,6828.0


## 3. Loading: Staging to PostgreSQL

Now that the schema is structurally correct (though the data might still contain nulls or duplicates), we load it into our **Staging Area** in PostgreSQL.

We use the table name `raw_sales` to indicate that this data has not yet undergone deep cleaning (duplicates removal, type casting, etc.).

> **Note:** We use `chunksize=10000` to optimize memory usage during the upload process, as the dataset contains over 1.25 million records.

In [8]:
# Initialize the engine
engine = get_db_engine()

# Define the table name. 
# 'raw_sales' indicates this data is structually correct but technically dirty (nulls, duplicates)
table_name = 'raw_sales'

if engine:
    try:
        print(f"Starting data load into '{table_name}'...")
        
        # Write records stored in a DataFrame to a SQL database.
        # chunksize=10000: Loads data in batches to avoid memory issues.
        df_fixed.to_sql(
            name=table_name, 
            con=engine, 
            if_exists='replace', 
            index=False,
            chunksize=10000 
        )
        
        print(f"Success! Loaded {len(df_fixed)} rows into table '{table_name}'.")
        
    except Exception as e:
        print(f"Error loading data to SQL: {e}")
else:
    print("Could not initiate database engine.")

Successfully created engine for database: hu_db
Starting data load into 'raw_sales'...
Success! Loaded 1250000 rows into table 'raw_sales'.


In [9]:
# Verification query
# We use pandas read_sql to quickly check the table existance and row count
try:
    with engine.connect() as connection:
        # Check first 5 rows from DB
        query_preview = f"SELECT * FROM {table_name} LIMIT 5;"
        df_check = pd.read_sql(query_preview, connection)
        
        # Check total count
        query_count = f"SELECT COUNT(*) as total_rows FROM {table_name};"
        df_count = pd.read_sql(query_count, connection)
        
        total_rows = df_count['total_rows'][0]
        print(f"Total rows in Database: {total_rows}")
        
    display(df_check)

except Exception as e:
    print(f"Verification failed: {e}")

Total rows in Database: 1250000


Unnamed: 0,City,Date,Product,Product_Type,Quantity,Unit_Price,Sales_Type,Client_Type,Discount,Shipping_Cost,Total_Sales
0,Santiago,2025-10-30,Arepa,Abarrotes,2.0,3681.0,Online,Minorista,0.2,0.0,5889.0
1,Córdoba,2025-11-17,Arepa,Abarrotes,7.0,2321.0,Distribuidor,Gobierno,0.15,0.0,13809.0
2,Barranquilla,2025-10-22,Leche,Lácteo,9.0,3540.0,Distribuidor,Gobierno,0.2,0.0,25488.0
3,New York,2025-10-20,Cereal,Lácteo,3.0,3287.0,Tienda_Física,Gobierno,0.05,0.0,9367.0
4,Madrid,2025-10-20,Leche,Hogar,2.0,3414.0,Distribuidor,Mayorista,0.0,0.0,6828.0


## 4. Validation & Conclusion

We successfully established a connection to the database and loaded the dataset.
- **Source:** `ventas.csv`
- **Destination:** PostgreSQL table `raw_sales`
- **Status:** Ready for Cleaning (HU 2)

The verification query confirms that the row count in the database matches the source dataframe.