# End-to-end Snowpark jaffle_shop in Python

For funsies.

## Imports

In [None]:
# tracking
import mlflow

# pydata/ml
import sklearn as sklearn

import numpy as np
import pandas as pd
import lightgbm as lgb

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split

# viz
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt

# snowflake
import yaml
import snowflake.snowpark

from snowflake.snowpark import types
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, udf, sql_expr

## Setup

In [None]:
# setup viz defaults
sns.set_theme()
sns.set(rc={"figure.figsize": (16, 18)})
sns.set_style("darkgrid")
plt.style.use(["dark_background"])

## Snowpark session

In [None]:
with open("/home/vscode/.dbt/profiles.yml", "r") as f:
    profiles = yaml.safe_load(f)
    dev_profile = profiles["snowflake"]["outputs"]["dev"]

conn_params = {
    "account": dev_profile["account"],
    "user": dev_profile["user"],
    "role": dev_profile["role"],
    "warehouse": dev_profile["warehouse"],
    "database": dev_profile["database"],
    "schema": dev_profile["schema"],
    "authenticator": dev_profile["authenticator"],
}
conn_params

In [None]:
s = Session.builder.configs(conn_params).create()

## Raw data (sources)

In [None]:
raw_customers = s.table("raw_customers")
raw_customers.show(5)

In [None]:
raw_orders = s.table("raw_orders")
raw_orders.show(5)

In [None]:
raw_payments = s.table("raw_payments")
raw_payments.show(5)

## Staging data

In [None]:
customers_rename = {"id": "customer_id"}
customers_rename = {
    key.upper(): value.upper() for key, value in customers_rename.items()
}  # snowpark

# stg_customers = raw_customers.rename(customers_rename) # pandas

# Option A
# stg_customers = raw_customers.select(
#     *[
#         col(col_name).as_(customers_rename[col_name])
#         if col_name in customers_rename
#         else col_name
#         for col_name in raw_customers.schema.names
#     ]
# )

stg_customers = raw_customers.select("*")
for col_name in customers_rename:
    stg_customers = stg_customers.rename(
        stg_customers[col_name], customers_rename[col_name]
    )

stg_customers.show(5)

In [None]:
orders_rename = {"id": "order_id", "user_id": "customer_id"}
orders_rename = {
    key.upper(): value.upper() for key, value in orders_rename.items()
}  # snowpark

# stg_orders = raw_orders.rename(columns=orders_rename) # pandas
# Option A
# stg_orders = raw_orders.select(
#     *[
#         col(col_name).as_(orders_rename[col_name])
#         if col_name in orders_rename
#         else col_name
#         for col_name in raw_orders.schema.names
#     ]
# )
stg_orders = raw_orders.select("*")
for col_name in orders_rename:
    stg_orders = stg_orders.rename(stg_orders[col_name], orders_rename[col_name])

stg_orders.show(5)

In [None]:
payments_rename = {"id": "payment_id"}
payments_rename = {
    key.upper(): value.upper() for key, value in payments_rename.items()
}  # snowpark

# stg_payments = raw_payments.rename(columns=payments_rename) # pandas
# stg_payments["amount"] /= 100  # this makes cents into dollars

stg_payments = raw_payments.select(
    *[
        col(col_name).as_(payments_rename[col_name])
        if col_name in payments_rename
        else col_name
        for col_name in raw_payments.schema.names
        if col_name != "AMOUNT"
    ],
    (raw_payments["amount"] / 100).as_("amount"),
)

stg_payments.show(5)

## Final models

In [None]:
# pandas
# customer_orders = (
#     stg_orders.groupby("customer_id")
#     .agg(
#         first_order=("order_date", "min"),
#         most_recent_order=("order_date", "max"),
#         number_of_orders=("order_id", "count"),
#     )
#     .reset_index()
# )

customer_orders = stg_orders.group_by("customer_id").agg(
    [
        (stg_orders["order_date"], "min"),
        (stg_orders["order_date"], "max"),
        (stg_orders["order_id"], "count"),
    ]
)

customer_orders.show(5)

In [None]:
stg_orders.show(5)

In [None]:
# pandas
# customer_payments = (
#     stg_payments.merge(stg_orders, on="order_id", how="left")
#     .groupby("customer_id")
#     .agg(stg_orders["amount"], "sum")
# )
customer_payments = (
    stg_payments.join(stg_orders, using_columns=["order_id"], join_type="left")
    .group_by("customer_id")
    .agg(
        [
            (stg_payments["amount"], "sum"),
        ]
    )
)

customer_payments.show(5)

In [None]:
customers_rename = {"total_amount": "customer_lifetime_value"}

# copilot actually wrote this line, minus the renaming (maybe would have if I'd added the dictionary?)
customers = (
    stg_customers.merge(customer_orders, on="customer_id", how="left")
    .merge(customer_payments, on="customer_id", how="left")
    .rename(columns=customers_rename)
)

customers.show(5)

In [None]:
payment_methods = ["credit_card", "coupon", "bank_transfer", "gift_card"]

order_payments_renames = {
    f"{payment_method}": f"{payment_method}_amount"
    for payment_method in payment_methods
}

order_payments_totals = stg_payments.groupby("order_id").agg(
    total_amount=("amount", "sum")
)

order_payments = (
    stg_payments.groupby(["order_id", "payment_method"])
    .agg(payment_method_amount=("amount", "sum"))
    .reset_index()
    .pivot(index="order_id", columns="payment_method", values="payment_method_amount")
    .rename(columns=order_payments_renames)
    .merge(order_payments_totals, on="order_id", how="left")
    .reset_index()
)

order_payments.show(5)

In [None]:
orders_renames = {"total_amount": "amount"}

orders = stg_orders.merge(order_payments, on="order_id", how="left").rename(
    columns=orders_renames
)

orders.show(5)