# Challenges development

This notebook was used for developing Python code for the challenges. It also provides some visualizations. Upload to your user folder in Databricks.

## Imports

This notebook was developed using a cluster with the DBR 1.12 ML. Install the following additional libraries from PyPi into your cluster:

- `prophet`
- `thefuzz`

In [None]:
# imports
import mlflow

import pandas as pd
import pyspark.pandas as ps
import matplotlib.pyplot as plt

from random import random, randint
from prophet import Prophet
from datetime import datetime
from sklearn.cluster import KMeans
from prophet.serialize import model_to_json, model_from_json
from sklearn.decomposition import PCA

from thefuzz.process import extractOne as match_str

## Setup

In [None]:
# yolo
ps.set_option("compute.ops_on_diff_frames", True)

In [None]:
# ~constants
s = spark
session = s
schema = "dbt_cody"

## Challenge 1: describe the data

In [None]:
orders = s.table(f"{schema}.orders").pandas_api()
orders.head()

In [None]:
described = orders.describe()
described

## Challenge 2: pivot the data

In [None]:
order_items = s.table(f"{schema}.stg_order_items").pandas_api()
order_items.head()

In [None]:
products = s.table(f"{schema}.stg_products").pandas_api()
products

In [None]:
product_ids = sorted(list(set(products["product_id"].unique().to_numpy())))
product_ids

In [None]:
order_item_product_subtotals = order_items.merge(products, on="product_id").groupby(["order_id", "product_id"]).agg(subtotal=("product_price", "sum")).reset_index().pivot(index="order_id", columns="product_id", values="subtotal").reset_index()
order_item_product_subtotals.head()

In [None]:
renames = {product_id: f"subtotal_{product_id}" for product_id in product_ids}
order_item_product_subtotals = order_item_product_subtotals.rename(columns=renames)
order_item_product_subtotals.head()

In [None]:
order_item_product_subtotals = order_item_product_subtotals.fillna(0)
order_item_product_subtotals.head()

In [None]:
orders_with_subtotals = orders.merge(
    order_item_product_subtotals, on="order_id"
)
orders_with_subtotals.head()

## Challenge 3: flag fuzzy duplicates

In [None]:
customers = s.table(f"{schema}.customers").pandas_api()
customers.head()

In [None]:
drop_cols = ["customer_id", "customer_order_index", "is_first_order"]
drop_cols.extend([col for col in customers.columns if col != "customer_name"])

fuzzed = orders.merge(customers, on="customer_id").drop(drop_cols, axis=1)
fuzzed.tail()

In [None]:
names = fuzzed["customer_name"]
names.head()

In [None]:
def fuzz_name(name):

    fuzz_name = ""
    names = name.split(" ")

    for name in names:
        if random() < 0.5:
            # employee or AI is decisive
            if random() < 0.5:
                # and loves all caps
                name = name.upper()
            else:
                # or all lowercase
                name = name.lower()

        if random() < 0.2:
            # AI dropped the first or last letter probably :/
            if random() < 0.5:
                # first letter dropped, whoops
                name = name[1:]
            else:
                # last letter dropped, whoops
                name = name[:-1]

        if random() < 0.1:
            # a solar flare hit the datacenter in all regions,
            # no multi-region resiliency could have saved it :(
            for char in name:
                if random() < 0.3:
                    name = name.replace(char, chr(ord(char) + randint(-5, 5)))

        fuzz_name += name + " "

    return fuzz_name.strip()

In [None]:
fuzzed_names = [fuzz_name(name) for name in names.to_numpy()]
fuzzed_names[0:10]

In [None]:
fuzzed["customer_name"] = fuzzed_names
fuzzed.head()

In [None]:
customer_names = sorted(list(set(customers["customer_name"].unique().to_numpy())))
customer_names[0:10]

In [None]:
unfuzzed = fuzzed
unfuzzed["customer_name_unfuzeed"] = fuzzed["customer_name"].apply(
    lambda x: match_str(x, customer_names)[0]
)
unfuzzed.head()

## Challenge 4: cluster customers by their order history

In [None]:
df = orders_with_subtotals
df.head()

In [None]:
df.dtypes

In [None]:
X = df.select_dtypes(include=["float32", "float64", "int64"]).to_numpy()
X

Detour: motivation!

In [None]:
n_components = 3
pca = PCA(n_components=n_components)
pca

In [None]:
pca = pca.fit(X)
pca

In [None]:
X_pca = pca.transform(X)
X_pca.shape

In [None]:
# yolo
import warnings

warnings.simplefilter("ignore")

In [None]:
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(projection="3d")

ax.scatter(
    X_pca[:, 0],
    X_pca[:, 1],
    X_pca[:, 2],
    c=df["subtotal"].to_numpy(),
)

In [None]:
model = KMeans(n_clusters=5)
model

In [None]:
model = model.fit(X)
model

In [None]:
cluster_labels = model.predict(X)
cluster_labels

In [None]:
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(projection="3d")

ax.scatter(
    X_pca[:, 0],
    X_pca[:, 1],
    X_pca[:, 2],
    c=cluster_labels,
)

In [None]:
temp = ps.DataFrame(data=cluster_labels, columns=["cluster_label"])
temp.head()

In [None]:
orders_with_subtotals_and_clusters = orders_with_subtotals.merge(
    temp, left_index=True, right_index=True
)
orders_with_subtotals_and_clusters.head()

## Challenge 5: predict revenue by location

In [None]:
revenue = s.table(f"{schema}.revenue_weekly_by_location").pandas_api()
revenue.head()

In [None]:
renames = {
    "date_week": "ds",
    "location_name": "location",
    "revenue": "y",
}
revenue = revenue.rename(columns=renames)
revenue.head()

In [None]:
locations = sorted(list(revenue["location"].unique().to_numpy()))
locations

In [None]:
models = {
    location: Prophet().fit(revenue[revenue["location"] == location].to_pandas())
    for location in locations
}
models

In [None]:
future = models[locations[0]].make_future_dataframe(periods=52 * 3, freq="W")
future.tail()

In [None]:
forecasts = {location: models[location].predict(future) for location in locations}
forecasts

In [None]:
for location, forecast in forecasts.items():
    forecast["location"] = location

In [None]:
df = pd.concat(forecasts.values())
df.head()

In [None]:
for x in forecasts.values():
    print(type(x))

In [None]:
for location in locations:
    models[location].plot(forecasts[location])
    plt.title(location)

Now using the dbt models

TODO: finish this

In [None]:
models = s.table(f"{schema}.forecast_train_py").pandas_api()
models.tail()

scratch beyond here

In [None]:
%sql

select * from dbt_cody.describe_py

In [None]:
%sql

select * from dbt_cody.describe_sql

In [None]:
%sql

select * from dbt_cody.pivot_py limit 10;

In [None]:
%sql

select * from dbt_cody.forecast_train_py 

In [None]:
%sql

select * from dbt_cody.forecast_score_py limit 10; 