The purpose of this notebook is to introduce optimization concepts that are used in assortment optimization.  The concepts will be introduced via a LEGO part optimization example. It was developed on the  **Databricks 15.4 LTS (Scala 2.12, Spark 3.5.0)** runtime.

# LEGO part optimization

%md
## Introduction 

Welcome to this tutorial on mathematical optimization, where we will dive into the core optimization components using a practical and fun example with LEGO datasets. 

Imagine you have a number of LEGO parts and you want to figure out which complete LEGO sets can be constructed from those parts.  We will explore how mathematical optimization can help us maximize the value of the sets built, considering the available parts.  

As an example, suppose you have a collection of the following parts:

- 1 red 2x4 brick
- 2 blue 2x4 bricks
- 3 green 1x1 bricks
- 1 yellow 2x2 brick

Assume you have instruction manuals for the following LEGO sets, with each set requiring the following specific parts:

- Set A: 1 red 2x4 brick, 2 blue 2x4 bricks, 1 green 1x1 brick.
- Set B: 1 red 2x4 brick, 1 blue 2x4 brick, 1 yellow 2x2 brick.
- Set C: 2 green 1x1 bricks, 1 yellow 2x2 brick.

By trial and error, we can determine that the 7 available parts can be used to create Set A and Set C, without leaving any part unused.  What if we have thousands of parts and hundreds of sets? How can we create an approach to solve this exercise more systematically?  

This problem is a great introduction to optimization techniques. Unlike classical machine learning that predicts a future outcome based on current state variables, optimization helps the decision-makers to identify the set of actions required to best achieve a particular outcome. These challenges are especially prevalent in the retail industry. Some relevant use cases include:

* **Inventory Optimization**: What is the right level of inventory to hold at various locations to ensure customer demand is easily satisfied but to also minimize locked up capital?
* **Supply Chain**: What is the right network design for us to ensure goods are distributed at the right frequency at the right locations while minimizing storage and transportation costs?
* **Product Assortment**: What is the right mix of products to provide customers to satisfy customer needs and maximize profits but also dealing with limited shelf space?

There are significant parallels between the fun LEGO example we describe above and Product Assortment in the retail industry. For example:
* **Constraints on availability**: We have limited LEGO bricks available, similar to retail stores where shelf space and product inventory is limited
* **Multiple choices**: We have to choose which LEGO sets to build with the bricks available, similar to retail stores which have to determine which products to display from a larger selection of potential SKUs
* **Substitution effects**: There are multiple LEGO sets of a similar theme(for example Star Wars). Similarly, there are multiple products of a similar type usually competing for shelf space (example: Navel Oranges vs Cara Cara Oranges)
* **Complementary products**: We could potentially purchase loose LEGO bricks to make new sets. Similarly, retailers can decide to stock new high value SKUs that increase overall purchases from a customer (example: stocking cookies might increase milk purchase from customers)

Mathematical optimization is a powerful tool to address the Product Assortment challenges in the retail industry. Our goal in these set of notebooks is to provide you with a gentle introduction to the basics of mathematical optimization, mathematical formulations for optimization, and their implementation and solution, all powered by the Databricks Data Intelligence Platform. We will use the LEGO example as the underlying problem to introduce these concepts in a fun and accessible way. Whether you are new to optimization or looking to reinforce your understanding, this tutorial provides a clear path to learning and applying mathematical programming concepts. The word programming in this context is used as a synonym for the word optimization.

We will use [Gurobi](https://www.gurobi.com/) as the optimization solver.  Gurobi is a power solver that can be used to solve a wide range of mathematical programming (optimization) problems, including linear programming (LP), mixed-integer programming (MIP), quadratic programming (QP), and more. It is designed to handle large-scale problems efficiently and can be easily integrated with your applications with its APIs in multiple languages including Python. In this tutorial, we will adjust input data and keep the optimization model small to ensure it can be solved without a commercial Gurobi license. For large scale applications, procuring a commercial license from Gurobi may be necessary. 


In this notebook, we will access the datasets used in this exercise and assemble a subset that is tailored to the specific problems we wish to tackle. Please note that the series continues with two notebooks:

- <a href="$./Optimization_Model">Optimization_Model</a>: describes the application of an optimization model on a small example to introduce key concepts and build intuition for these methods.
- <a href="$./Optimization_Model_Large">Optimization_Model_Large</a>: describes the application of the same model on a large scale dataset to showcase the utility of optimization modelling for real-world problems.

The input datasets for both notebooks above are assembled in the following cells. Please note that all datasets with a `_large` suffix are utilized in the <a href="$./Optimization_Model_Large">Optimization_Model_Large</a> notebook, while the relatively smaller datasets without the suffix are used in the <a href="$./Optimization_Model">Optimization_Model</a> notebook.

In [0]:
import pyspark.sql.functions as fn  # Importing the functions module from pyspark.sql to use built-in SQL functions
from pyspark.sql.functions import col # col() is a highly useful function that allows you to reference columns in a Spark DataFrame.

import pyspark.pandas as ps
import pandas as pd
import numpy as np

# functions needed to perturb the parts set
from pyspark.sql.functions import rand, when, lit, floor
from pyspark.sql.types import IntegerType

## Step 1: Access the Lego Dataset

The LEGO dataset is obtained from Kaggle [LEGO Database](https://www.kaggle.com/datasets/rtatman/lego-database), which is originally from [Rebrickable](https://rebrickable.com/help/lego-database/).  It consists of eight comma-separated files.  For now, we will create the containers needed to hold all these items:

In [0]:
# Create a widget to accept a catalog name
dbutils.widgets.text('catalog_name','catalog_name','Enter catalog name')
dbutils.widgets.text('schema_name','schema_name','Enter schema name')

In [0]:
# Retrieve the catalog name
catalog_name = dbutils.widgets.get('catalog_name')
schema_name = dbutils.widgets.get('schema_name')
print(catalog_name + "/" + schema_name)

In [0]:
# create catalog dynamically
catalog_sql = f"USE CATALOG {catalog_name};"
schema_sql = f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name};"
volume_sql = f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.csv_inputs;"

# Execute the commands
spark.sql(catalog_sql)
spark.sql(schema_sql)
spark.sql(volume_sql)

print(f"Catalog '{catalog_name}', schema '{schema_name}', and volume 'csv_inputs' are ready.")

Using the Catalog Explorer, navigate to the specified catalog. </p> 

<img src='https://brysmiwasb.blob.core.windows.net/demos/images/legos_catalog_explorer_catalog.PNG' width=80%></p>

Click on the *legos* schema, select the *Volumes* tab and click on the *csv_inputs* volume.  Click the *Upload Files* button in the upper right-hand corner of the screen and to bring up the file upload dialog box.
</p>
<img src='https://brysmiwasb.blob.core.windows.net/demos/images/logos_catalog_explorer_volumes.PNG' width=80%></p>

Download the eight CSV files that make up this dataset to your local system, unzip any of the larger files that Kaggle may have automatically compressed and drag and drop the CSV files to the volume upload dialog in order to upload into your Databricks environment.  (Please note, this could be done programmatically but to avoid juggling Kaggle keys, we have elected to describe a manual process.)
</p>
<img src='https://brysmiwasb.blob.core.windows.net/demos/images/legos_catalog_explorer_upload_complete.PNG' width=80%></p>

In [0]:
# Define the path dynamically using the catalog name
path = f"/Volumes/{catalog_name}/{schema_name}/csv_inputs/"

# List the contents of the directory
files = dbutils.fs.ls(path)

# Print the list of files in the directory
for file in files:
    print(file.path)

We can now read each file into a table to make it more accessible in later steps:

In [0]:
for filename in dbutils.fs.ls(f"/Volumes/{catalog_name}/{schema_name}/csv_inputs/"):
    
    # if this is a CSV input file
    if filename.name.endswith('.csv'):

        # what table are we creating
        dataset = filename.name[:-4]  # ignore the '.csv' suffix
        print(dataset)
        # read CSV and write as delta table
        (
            spark
                .read
                    .csv(
                        path=filename.path,
                        sep=',',
                        header=True,
                        inferSchema=True
                    )
                .write
                    .format('delta')
                    .mode('overwrite')
                    .option('overwriteSchema','true')
                    .saveAsTable(f"{catalog_name}.{schema_name}.{dataset}")

        )

In [0]:
query = f"SHOW TABLES IN {catalog_name}.{schema_name};"

tables = spark.sql(query)
tables.show()

%md
## Step 2: Create Datasets

Let's pretend we have all the parts from the following four Lego sets:

* [75160-1: U-wing](https://www.lego.com/en-us/product/u-wing-microfighter-75160)
* [75162-1: Y-wing](https://www.lego.com/en-us/product/y-wing-microfighter-75162)
* [75168-1: Yoda's Jedi Starfighter](https://www.lego.com/en-us/product/yoda-s-jedi-starfighter-75168)
* [75170-1: The Phantom](https://www.lego.com/en-us/product/the-phantom-75170)

These LEGO parts are used to construct our pile of "loose parts".  After we construct and solve the optimization model, we hope to see that these individual LEGO parts can be used to construct the four sets above.  

As a separate exercise, you can use other approaches to generate a list of LEGO parts and see which sets can be constructed!

In [0]:
# Read the data from tables stored earlier
inventories = spark.table(f"{catalog_name}.{schema_name}.inventories")
themes = spark.table(f"{catalog_name}.{schema_name}.themes")
colors = spark.table(f"{catalog_name}.{schema_name}.colors")
sets = spark.table(f"{catalog_name}.{schema_name}.sets")
inventory_parts = spark.table(f"{catalog_name}.{schema_name}.inventory_parts")
parts = spark.table(f"{catalog_name}.{schema_name}.parts")

###Note 
"sets" table contains over 10k LEGO sets.  We will reduce LEGO set options to keep the input data small, so that we can meet model size limit for Gurobi's free license and execute the <a href="$./Optimization_Model">Optimization_Model</a> notebook without a commercial Gurobi license. We will create reduced datasets below. 

Please note that the model solved in the notebook <a href="$./Optimization_Model_Large">Optimization_Model_Large</a> does exceed the size limit for Gurobi's free license. To execute this notebook, you will need to procure an appropriate license from [Gurobi](https://www.gurobi.com/). 

In [0]:
# Filter the 'themes' table to find the 'Star Wars' themes to keep a smaller number of LEGO sets
filtered_theme_id = (
    themes
    .filter(col('name').contains("Star Wars"))
    .select("id")  # Select the 'id' column
    .toPandas()['id']  # Convert to pandas DataFrame and select the 'id' column
    .tolist()  # Convert to a Python list
)

# More attempt to reduce data size - assume we only want to consider building sets that meet the following criteria
sets = (
    sets
        .filter(col("theme_id").isin(filtered_theme_id))
        .filter("year = 2017") # focus on just 2017 variants
        .filter("num_parts between 70 and 399") # keep part sizes manageable
    )

We need a table that has both unique LEGO set ID and part ID that defines the number of each part in each LEGO set.  To do that, we need to join the "inventories" and "inventory_parts" tables.

In [0]:
part_in_set = ( 
    inventories
        .join(inventory_parts, on=inventories['id']==inventory_parts['inventory_id'], how='inner')
        .join(sets, on='set_num', how='leftsemi')
        .groupBy(['set_num','part_num','color_id'])
            .agg(
                fn.sum('quantity').alias('quantity')
            )
    )

display(part_in_set)

Create a dataset `parts_avail` for available pieces to be allocated to LEGO sets (this is the set of "loose" pieces we have on hand).

We perturb the available inventory by removing one part common to two sets. This will provide a test for our optimization model: which LEGO sets will it choose to build given it cannot build all four anymore due to the missing part?

In [0]:
my_sets = sets.filter("set_num IN ('75160-1', '75162-1', '75168-1', '75170-1')")

display(my_sets)

parts_avail = ( 
    part_in_set
        .join(my_sets, on='set_num', how='inner')
        .groupBy(['part_num','color_id'])
            .agg(
                fn.sum('quantity').alias('quantity')
            )
    )

# Perturb inventory by removing  1 part common to two sets
parts_avail = parts_avail.withColumn(
    'quantity',
    fn.when(
        (parts_avail['part_num'] == '3020') & (parts_avail['color_id'] == 71),
        parts_avail['quantity'] - 1
    ).otherwise(parts_avail['quantity'])
)

display(parts_avail)

The following cell will create a reduced dataset to retain only relevant data (i.e. we will only keep a LEGO set, part, or color in the tables if they are relevant options within the reduced LEGO sets created above).  

In [0]:
relevant_set = ( 
    part_in_set
        .join(parts_avail, on=part_in_set['part_num']==parts_avail['part_num'], how='inner')
    ).select(part_in_set["*"])

display(relevant_set)

sets = (
    sets
        .join(relevant_set.select("set_num"), on="set_num", how="inner")
        .select(sets["*"])
).distinct()


display(sets)

parts = (parts
         .join(part_in_set.select("part_num"), on="part_num", how="inner")
         .select(parts["*"])
).distinct()

display(parts)

colors = (
    colors
    .join(part_in_set.select("color_id"), 
          colors["id"] == part_in_set["color_id"], 
          "inner")
    .select(colors["*"])
).distinct()

display(colors)

##Step 3: Persist Data for Optimization Steps

We have now created all datasets that will be utilized in the <a href="$./Optimization_Model">Optimization_Model</a> notebook! Before moving on, it would be helpful to persist newly created tables and update existing tables so we don't have to duplicate data processing steps in the Optimization_Model notebook.

In [0]:
(
    my_sets
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.my_sets")    
)

(
    relevant_set
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.relevant_set")
)

(
    sets
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.sets")
    )

(
    parts
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.parts")
    )


(
    colors
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.colors")
    )


(
    parts_avail
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.parts_avail")
    )


(
    part_in_set
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.part_in_set")
    )


##Step 4: Prepare Larger Inventory Datasets

In steps 4 and 5 we will repeat the steps above, expanding the initial number of LEGO sets to more than 200 Star Wars themed sets.
These datasets will be utilized in notebook <a href="$./Optimization_Model_Large">Optimization_Model_Large</a> notebook to define a more challenging inventory to be optimized. 

First, re-read datasets from source and store them as tables with `_large` suffix.

In [0]:
for filename in dbutils.fs.ls(f"/Volumes/{catalog_name}/{schema_name}/csv_inputs/"):
    
    # if this is a CSV input file
    if filename.name.endswith('.csv'):

        # what table are we creating
        dataset = filename.name[:-4]  # ignore the '.csv' suffix
        print(dataset)
        # read CSV and write as delta table
        (
            spark
                .read
                    .csv(
                        path=filename.path,
                        sep=',',
                        header=True,
                        inferSchema=True
                    )
                .write
                    .format('delta')
                    .mode('overwrite')
                    .option('overwriteSchema','true')
                    .saveAsTable(f"{catalog_name}.{schema_name}.{dataset}_large")

            )

In [0]:
# Read the data from tables stored earlier
inventories_large = spark.table(f"{catalog_name}.{schema_name}.inventories_large")
themes_large = spark.table(f"{catalog_name}.{schema_name}.themes_large")
colors_large = spark.table(f"{catalog_name}.{schema_name}.colors_large")
sets_large = spark.table(f"{catalog_name}.{schema_name}.sets_large")
inventory_parts_large = spark.table(f"{catalog_name}.{schema_name}.inventory_parts_large")
parts_large = spark.table(f"{catalog_name}.{schema_name}.parts_large")

We can now regenerate our initial collection by selecting all Star Wars sets released since the year 2000. The `filter_theme_id` can be reused as it already references the desire theme.

In [0]:
# Filter the sets_large to Star Wars theme, released after 2000
sets_large = (
    sets_large
    .filter(col("theme_id").isin(filtered_theme_id))
    .filter(col("year") > 2000)
    .filter("num_parts between 70 and 400") # keep part sizes manageable
)

display(sets_large)

We need a new table that has both unique LEGO set ID and part ID that defines the number of each part in each LEGO set in the large inventory.

In [0]:
part_in_set_large = ( 
    inventories_large
        .join(inventory_parts_large, on=inventories_large.id == inventory_parts_large.inventory_id, how='inner')
        .join(sets_large, on='set_num', how='leftsemi')
        .groupBy(['set_num','part_num','color_id'])
            .agg(
                fn.sum('quantity').alias('quantity')
            )
    )

display(part_in_set_large)

%md
Create a dataset `parts_avail_large` for available pieces to be allocated to LEGO sets (this is the set of "loose" pieces we have on hand). We use a randomized strategy to:
* Add or remove single pieces regardless of their quantity
* Increase or decrease significantly the total for common pieces

These changes will define an inventory such that some of the initial sets will be infeasible to build as some crucial single pieces will not be available. The common pieces modification will make it more likely that there exist compromises in which sets to build.


In [0]:
parts_avail_large = ( 
    part_in_set_large
        .join(sets_large, on='set_num', how='inner')
        .groupBy(['part_num', 'color_id'])
            .agg(
                fn.sum('quantity').alias('quantity')
            )
    )

# Perturb the quantity of each part in the set
r_seed = 13
parts_avail_large = parts_avail_large.withColumn(
    'perturbation',
    when(rand(seed=r_seed) < 0.8, lit(0))
    .when(rand(seed=r_seed) < 0.9, lit(1))
    .otherwise(lit(-1))
)

# Make some multiplication to large quantity parts
parts_avail_large = parts_avail_large.withColumn(
    'perturbation_multi',
    when((rand(seed=r_seed) < 0.3) & (col("quantity") > 60) , lit(1.5))
    .when((rand(seed=r_seed) < 0.6) & (col("quantity") > 60), lit(0.5))
    .otherwise(lit(1))
)

parts_avail_large = parts_avail_large.withColumn('quantity', col('quantity') + col('perturbation'))
parts_avail_large = parts_avail_large.withColumn('quantity', (col('quantity') * col('perturbation_multi')).cast(IntegerType()))

display(parts_avail_large)

Similarly to above, we generate the other datasets required to describe the large inventory.

In [0]:
relevant_set_large = ( 
    part_in_set_large
        .join(parts_avail_large, on=part_in_set_large['part_num']==parts_avail_large['part_num'], how='inner')
    ).select(part_in_set_large["*"])

display(relevant_set_large)

sets_large = (
    sets_large
        .join(relevant_set_large.select("set_num"), on="set_num", how="inner")
        .select(sets_large["*"])
).distinct()


display(sets_large)

parts_large = (parts_large
         .join(part_in_set_large.select("part_num"), on="part_num", how="inner")
         .select(parts_large["*"])
).distinct()

display(parts_large)

colors_large = (
    colors_large
    .join(part_in_set_large.select("color_id"), 
          colors_large["id"] == part_in_set_large["color_id"], 
          "inner")
    .select(colors_large["*"])
).distinct()

display(colors_large)

##Step 5: Persist Data for Large Inventory Optimization Steps

We have now created all datasets that will be utilized in the <a href="$./Optimization_Model_Large">Optimization_Model_Large</a> notebook! Before moving on, it would be helpful to persist newly created tables and update existing tables so we don't have to duplicate data processing steps in the Optimization_Model_Large notebook.

In [0]:
(
    relevant_set_large
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.relevant_set_large")
)

(
    sets_large
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.sets_large")
    )

(
    parts_large
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.parts_large")
    )


(
    colors_large
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.colors_large")
    )


(
    parts_avail_large
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.parts_avail_large")
    )


(
    part_in_set_large
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema','true')
        .saveAsTable(f"{catalog_name}.{schema_name}.part_in_set_large")
    )
