In [None]:
# Installations
!pip install pandas geopandas shapely
!pip install geoalchemy2
!pip install psycopg2

In [None]:
# Imports
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, Date, text
from sqlalchemy.orm import declarative_base, sessionmaker
from geoalchemy2 import Geometry
import geopandas as gpd
from shapely.wkt import loads
from datetime import datetime
from typing import List, Dict, Union, Optional

In [None]:
file_path = "./data/Iowa_Liquor_Sales.csv"

if not os.path.exists(file_path):
    !kaggle datasets download -d csafrit2/iowa-liquor-sales -p ./data --unzip

In [None]:
# Reading data
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 1000)
df = pd.read_csv("./data/Iowa_Liquor_Sales.csv", nrows=100, low_memory=False, encoding="utf-8-sig")

print(df.head())

In [None]:
# Float Conversions
float_cols = ["County Number", "Category", "Vendor Number", "Bottle Volume (ml)",
              "State Bottle Cost", "State Bottle Retail", "Sale (Dollars)",
              "Volume Sold (Liters)", "Volume Sold (Gallons)"]
df[float_cols] = df[float_cols].apply(pd.to_numeric, errors='coerce')

In [None]:
# Conversions
df = df.astype({
    "Invoice/Item Number": "string",
    "Store Number": "int32",
    "Store Name": "string",
    "Address": "string",
    "City": "string",
    "Zip Code": "string",
    "County Number": "float32",
    "County": "string",
    "Category": "int32",
    "Category Name": "string",
    "Vendor Number": "int32",
    "Vendor Name": "string",
    "Item Number": "int32",
    "Item Description": "string",
    "Pack": "int16",
    "Bottle Volume (ml)": "float32",
    "State Bottle Cost": "float32",
    "State Bottle Retail": "float32",
    "Bottles Sold": "int32",
    "Sale (Dollars)": "float32",
    "Volume Sold (Liters)": "float32",
    "Volume Sold (Gallons)": "float32"
})

In [None]:
# Conversion: Date
df["Date"] = pd.to_datetime(df["Date"], format="%m/%d/%Y", errors="coerce")

In [None]:
# Conversion: Store Location
df = df.dropna(subset=["Store Location"])
df["Store Location"] = df["Store Location"].apply(lambda x: loads(x) if x.startswith("POINT") else None)

In [None]:
# Function: transform_table
def transform_table(
    df: pd.DataFrame,
    columns: List[str],
    rename_map: Dict[str, str],
    keys: Union[List[str], str],
    aggregation: Optional[Dict[str, str]] = None
) -> pd.DataFrame:
    """
    Transforms a DataFrame by selecting specific columns, renaming them, 
    removing null values, and dropping duplicates. Supports aggregation.

    Args:
    - df (pd.DataFrame): The original DataFrame.
    - columns (List[str]): List of columns to select.
    - rename_map (Dict[str, str]): Dictionary mapping original column names to new names.
    - keys (Union[List[str], str]): Column(s) used as primary keys to remove nulls and duplicates.
    - aggregation (Optional[Dict[str, str]]): Dictionary specifying aggregations for `groupby` (optional).

    Returns:
    - pd.DataFrame: The transformed DataFrame.
    """
    transformed_df = df[columns].rename(columns=rename_map)

    if isinstance(keys, str):
        keys = [keys]
        
    if aggregation:
        transformed_df = transformed_df.groupby(keys, as_index=False).agg(aggregation)

    transformed_df = transformed_df.dropna(subset=keys)
    transformed_df = transformed_df.drop_duplicates(subset=keys)

    return transformed_df

In [None]:
# Dimension Table: Product
product_cols = ["Item Number", "Bottle Volume (ml)", "Pack", "State Bottle Retail", "State Bottle Cost", 
                "Category", "Item Description", "Category Name"]

product_renames = {"Item Number": "item_number", 
                   "Bottle Volume (ml)": "bottle_volume_ml", 
                   "Pack": "pack",
                   "State Bottle Retail": "state_bottle_retail",
                   "State Bottle Cost": "state_bottle_cost",
                   "Category": "category",
                   "Item Description": "item_description",
                   "Category Name": "category_name"}

product_df = transform_table(df, product_cols, product_renames, "item_number")

In [None]:
# Dimension Table: Store
store_cols = ["Store Number", "County", "County Number", "Store Location", "Zip Code", "City", "Address", "Store Name"]

store_renames = {"Store Number": "store_number",
                 "County": "county",
                 "County Number": "county_number",
                 "Store Location": "store_location",
                 "Zip Code": "zip_code",
                 "City": "city",
                 "Address": "address",
                 "Store Name": "store_name"}

store_df = transform_table(df, store_cols, store_renames, "store_number")

In [None]:
# Dimension Table: Time
min_year = df["Date"].min().year
max_year = df["Date"].max().year

start_date = f"{min_year}-01-01"
end_date = f"{max_year}-12-31"

date_range = pd.date_range(start=start_date, end=end_date, freq='D')

time_df = pd.DataFrame({"date": date_range})

time_df["id_time"] = time_df["date"].dt.strftime("%Y%m%d").astype(int)

time_df["day"] = time_df["date"].dt.day
time_df["month"] = time_df["date"].dt.month
time_df["year"] = time_df["date"].dt.year
time_df["quarter"] = time_df["date"].dt.quarter
time_df["weekday"] = time_df["date"].dt.weekday  # 0=Lunes, 6=Domingo
time_df["weekday_name"] = time_df["date"].dt.day_name()  # Nombre del día
time_df["is_weekend"] = time_df["weekday"].apply(lambda x: 1 if x >= 5 else 0)

In [None]:
# Dimension Table: Vendor
vendor_cols = ["Vendor Number", "Vendor Name"]
vendor_renames = {"Vendor Number": "vendor_number", "Vendor Name": "vendor_name"}

vendor_df = transform_table(df, vendor_cols, vendor_renames, "vendor_number")

In [None]:
# Fact Table: Sales
sales_cols = ["Vendor Number", "Date", "Store Number", "Item Number", 
              "Sale (Dollars)", "Bottles Sold", "Volume Sold (Liters)", "Volume Sold (Gallons)"]

sales_renames = {
    "Vendor Number": "vendor_number",
    "Date": "id_time",
    "Store Number": "store_number",
    "Item Number": "item_number",
    "Sale (Dollars)": "sale_dollars",
    "Bottles Sold": "bottles_sold",
    "Volume Sold (Liters)": "volume_sold_liters",
    "Volume Sold (Gallons)": "volume_sold_gallons"
}

sales_agreggate = {
    "sale_dollars": "sum",
    "bottles_sold": "sum",
    "volume_sold_liters": "sum",
    "volume_sold_gallons": "sum"
}

sales_keys = ["vendor_number", "id_time", "store_number", "item_number"]

sales_df = transform_table(df, sales_cols, sales_renames, sales_keys, aggregation=sales_agreggate)

sales_df["id_time"] = sales_df["id_time"].dt.strftime("%Y%m%d").astype(int)

In [None]:
# Creating PostgreSQL Tables and Constraints
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"
DB_PORT = "5433"
DB_NAME = "liquor_sales"

engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

with engine.connect() as conn:
    result = conn.execute(text("SELECT version();"))
    print("✅ Conexión exitosa a PostgreSQL en Docker")
    for row in result:
        print(row)

Base = declarative_base()

class Product(Base):
    __tablename__ = "Product"
    item_number = Column(Integer, primary_key=True)
    bottle_volume_ml = Column(Float)
    pack = Column(Integer)
    state_bottle_retail = Column(Float)
    state_bottle_cost = Column(Float)
    category = Column(Float)
    item_description = Column(String)
    category_name = Column(String)

class Store(Base):
    __tablename__ = "Store"
    store_number = Column(Integer, primary_key=True)
    county = Column(String)
    county_number = Column(Float)
    store_location = Column(Geometry("POINT", srid=4326))
    zip_code = Column(String)
    city = Column(String)
    address = Column(String)
    store_name = Column(String)

class Time(Base):
    __tablename__ = "Time"
    id_time = Column(Integer, primary_key=True)
    date = Column(Date, unique=True, nullable=False)
    day = Column(Integer)
    month = Column(Integer)
    year = Column(Integer)
    quarter = Column(Integer)
    weekday = Column(Integer)
    weekday_name = Column(String)
    is_weekend = Column(Integer)

class Vendor(Base):
    __tablename__ = "Vendor"
    vendor_number = Column(Integer, primary_key=True)
    vendor_name = Column(String)

class Sales(Base):
    __tablename__ = "Sales"

    vendor_number = Column(Integer, ForeignKey("Vendor.vendor_number"), primary_key=True)
    id_time = Column(Integer, ForeignKey("Time.id_time"), primary_key=True)
    store_number = Column(Integer, ForeignKey("Store.store_number"), primary_key=True)
    item_number = Column(Integer, ForeignKey("Product.item_number"), primary_key=True)

    sale_dollars = Column(Float)
    bottles_sold = Column(Integer)
    volume_sold_liters = Column(Float)
    volume_sold_gallons = Column(Float)

Base.metadata.create_all(engine)

In [None]:
store_gdf = gpd.GeoDataFrame(store_df, geometry="store_location", crs="EPSG:4326")

stores_in_sales = set(sales_df["store_number"].unique())
stores_in_store = set(store_gdf["store_number"].unique())

stores_not_in_store = stores_in_sales - stores_in_store

In [None]:
# Migrating data to database
Session = sessionmaker(bind=engine)
session = Session()

session.bulk_insert_mappings(Product, product_df.to_dict(orient="records"))
session.bulk_insert_mappings(Time, time_df.to_dict(orient="records"))
session.bulk_insert_mappings(Vendor, vendor_df.to_dict(orient="records"))
store_gdf.to_postgis("Store", engine, if_exists="append", index=False)

session.bulk_insert_mappings(Sales, sales_df.to_dict(orient="records"))

session.commit()