In [None]:
import csv
import psycopg2
import os
import glob
import pandas as pd
from sqlalchemy import create_engine
from psycopg2 import extras

In [None]:
# connection details to localhost
HOST='localhost'
DBNAME='postgres'
USER='postgres'
PW='1234'
PORT='5433'

In [None]:
# initiate connection
conn = psycopg2.connect(host=HOST,
                        dbname=DBNAME,
                        user=USER,
                        password=PW,
                        port=PORT
                        )

print("Connecting to Database")

Connecting to Database


In [None]:
# execute Data Model
ur = conn.cursor()
DDL = """
CREATE SCHEMA IF NOT EXISTS instacart;

-- Dimensions
CREATE TABLE IF NOT EXISTS instacart.user_dim (
  user_key     BIGSERIAL PRIMARY KEY,
  ext_user_id  INTEGER UNIQUE
);

CREATE TABLE IF NOT EXISTS instacart.department_dim (
  department_key BIGSERIAL PRIMARY KEY,
  department_id  INTEGER UNIQUE,
  department     TEXT
);

CREATE TABLE IF NOT EXISTS instacart.aisle_dim (
  aisle_key  BIGSERIAL PRIMARY KEY,
  aisle_id   INTEGER UNIQUE,
  aisle      TEXT
);

CREATE TABLE IF NOT EXISTS instacart.product_dim (
  product_key    BIGSERIAL PRIMARY KEY,
  product_id     INTEGER UNIQUE,
  product_name   TEXT,
  aisle_key      BIGINT REFERENCES instacart.aisle_dim(aisle_key),
  department_key BIGINT REFERENCES instacart.department_dim(department_key)
);

CREATE TABLE IF NOT EXISTS instacart.order_dim (
  order_key                BIGSERIAL PRIMARY KEY,
  order_id                 INTEGER UNIQUE,
  user_key                 BIGINT REFERENCES instacart.user_dim(user_key),
  order_number             INTEGER,
  eval_set                 TEXT,
  order_dow                SMALLINT,
  order_hour_of_day        SMALLINT,
  days_since_prior_order   SMALLINT
);

-- Fact
CREATE TABLE IF NOT EXISTS instacart.fact_order_product (
  fact_id            BIGSERIAL PRIMARY KEY,
  order_key          BIGINT NOT NULL REFERENCES instacart.order_dim(order_key),
  product_key        BIGINT NOT NULL REFERENCES instacart.product_dim(product_key),
  add_to_cart_order  SMALLINT,
  reordered          SMALLINT
);

-- Helpful indexes
CREATE INDEX IF NOT EXISTS ix_order_user     ON instacart.order_dim(user_key);
CREATE INDEX IF NOT EXISTS ix_fact_order     ON instacart.fact_order_product(order_key);
CREATE INDEX IF NOT EXISTS ix_fact_product   ON instacart.fact_order_product(product_key);
"""
cur.execute(DDL)
conn.commit()

In [None]:
# function for inserting data faster 
def execute_values_upsert(insert_sql, rows, page=10000):
    if not rows:
        return
    extras.execute_values(cur, insert_sql, rows, page_size=page)

In [None]:
# directory of data
CSV_DIR = "data"

PATH_ORDERS         = os.path.join(CSV_DIR, "orders.csv")
PATH_PRODUCTS       = os.path.join(CSV_DIR, "products.csv")
PATH_AISLES         = os.path.join(CSV_DIR, "aisles.csv")
PATH_DEPARTMENTS    = os.path.join(CSV_DIR, "departments.csv")
PATH_ORDER_PRIOR    = os.path.join(CSV_DIR, "order_products__prior.csv")
PATH_ORDER_TRAIN    = os.path.join(CSV_DIR, "order_products__train.csv")


In [None]:
# insert data

print("Loading department_dim ...")
dept = pd.read_csv(PATH_DEPARTMENTS)
rows = [(int(r.department_id), str(r.department)) for _, r in dept.iterrows()]
execute_values_upsert(
    """
    INSERT INTO instacart.department_dim (department_id, department)
    VALUES %s
    ON CONFLICT (department_id) DO UPDATE SET department = EXCLUDED.department;
    """,
    rows
)
conn.commit()

print("Loading aisle_dim ...")
ais = pd.read_csv(PATH_AISLES)
rows = [(int(r.aisle_id), str(r.aisle)) for _, r in ais.iterrows()]
execute_values_upsert(
    """
    INSERT INTO instacart.aisle_dim (aisle_id, aisle)
    VALUES %s
    ON CONFLICT (aisle_id) DO UPDATE SET aisle = EXCLUDED.aisle;
    """,
    rows
)
conn.commit()

# -----------------------------
# Load product_dim
# -----------------------------
print("Loading product_dim ...")
prod = pd.read_csv(PATH_PRODUCTS)
prod_rows = []
for _, r in prod.iterrows():
    ak = aisle_map.get(int(r.aisle_id))
    dk = dept_map.get(int(r.department_id))
    prod_rows.append((int(r.product_id), str(r.product_name), ak, dk))

execute_values_upsert(
    """
    INSERT INTO instacart.product_dim (product_id, product_name, aisle_key, department_key)
    VALUES %s
    ON CONFLICT (product_id) DO UPDATE
      SET product_name = EXCLUDED.product_name,
          aisle_key = EXCLUDED.aisle_key,
          department_key = EXCLUDED.department_key;
    """,
    prod_rows
)
conn.commit()


# -----------------------------
# Load user_dim and order_dim
# -----------------------------
print("Loading user_dim + order_dim ...")
orders = pd.read_csv(PATH_ORDERS)

# user_dim
user_ids = sorted(orders["user_id"].dropna().astype(int).unique().tolist())
user_rows = [(uid,) for uid in user_ids]
execute_values_upsert(
    """
    INSERT INTO instacart.user_dim (ext_user_id)
    VALUES %s
    ON CONFLICT (ext_user_id) DO NOTHING;
    """,
    user_rows
)
conn.commit()

ORDER_CHUNK = 200_000
for i in range(0, len(orders), ORDER_CHUNK):
    chunk = orders.iloc[i:i+ORDER_CHUNK].copy()
    rows = []
    for _, r in chunk.iterrows():
        uk = user_map.get(int(r.user_id))
        rows.append((
            int(r.order_id),
            uk,
            int(r.order_number) if not pd.isna(r.order_number) else None,
            str(r.eval_set),
            int(r.order_dow) if not pd.isna(r.order_dow) else None,
            int(r.order_hour_of_day) if not pd.isna(r.order_hour_of_day) else None,
            int(r.days_since_prior_order) if not pd.isna(r.days_since_prior_order) else None
        ))
    execute_values_upsert(
        """
        INSERT INTO instacart.order_dim
          (order_id, user_key, order_number, eval_set, order_dow, order_hour_of_day, days_since_prior_order)
        VALUES %s
        ON CONFLICT (order_id) DO UPDATE
          SET user_key = EXCLUDED.user_key,
              order_number = EXCLUDED.order_number,
              eval_set = EXCLUDED.eval_set,
              order_dow = EXCLUDED.order_dow,
              order_hour_of_day = EXCLUDED.order_hour_of_day,
              days_since_prior_order = EXCLUDED.days_since_prior_order;
        """,
        rows
    )
    conn.commit()
    print(f"  inserted/updated orders rows {i:,}–{min(i+ORDER_CHUNK, len(orders)):,}")


# -----------------------------
# Load fact_order_product (prior + train)
# -----------------------------
def load_order_products(path):
    print(f"Loading facts from {os.path.basename(path)} ...")
    chunksize = 500_000
    reader = pd.read_csv(path, chunksize=chunksize)
    total = 0
    for chunk in reader:
        rows = []
        for _, r in chunk.iterrows():
            ok = order_map.get(int(r.order_id))
            pk = prod_map.get(int(r.product_id))
            if ok is None or pk is None:
                continue
            rows.append((
                ok,
                pk,
                int(r.add_to_cart_order) if not pd.isna(r.add_to_cart_order) else None,
                int(r.reordered) if not pd.isna(r.reordered) else None
            ))
        execute_values_upsert(
            """
            INSERT INTO instacart.fact_order_product
              (order_key, product_key, add_to_cart_order, reordered)
            VALUES %s
            ON CONFLICT DO NOTHING;
            """,
            rows
        )
        conn.commit()
        total += len(rows)
        print(f"  +{len(rows):,} rows (total {total:,})")
S
load_order_products(PATH_ORDER_PRIOR)
load_order_products(PATH_ORDER_TRAIN)

Loading aisle_dim ...
Loading product_dim ...
Loading user_dim + order_dim ...
  inserted/updated orders rows 0–200,000
  inserted/updated orders rows 200,000–400,000
  inserted/updated orders rows 400,000–600,000
  inserted/updated orders rows 600,000–800,000
  inserted/updated orders rows 800,000–1,000,000
  inserted/updated orders rows 1,000,000–1,200,000
  inserted/updated orders rows 1,200,000–1,400,000
  inserted/updated orders rows 1,400,000–1,600,000
  inserted/updated orders rows 1,600,000–1,800,000
  inserted/updated orders rows 1,800,000–2,000,000
  inserted/updated orders rows 2,000,000–2,200,000
  inserted/updated orders rows 2,200,000–2,400,000
  inserted/updated orders rows 2,400,000–2,600,000
  inserted/updated orders rows 2,600,000–2,800,000
  inserted/updated orders rows 2,800,000–3,000,000
  inserted/updated orders rows 3,000,000–3,200,000
  inserted/updated orders rows 3,200,000–3,400,000
  inserted/updated orders rows 3,400,000–3,421,083
Loading facts from order_pr