## Installation of PySpark and MLflow

This code cell installs the necessary packages, PySpark and MLflow, using the `%pip` magic command in Jupyter Notebook.

The first command, `%pip install pyspark`, installs the PySpark package. PySpark is the Python library for Apache Spark, a fast and general-purpose cluster computing system. It provides high-level APIs for distributed data processing, machine learning, and graph processing.

The second command, `%pip install mlflow`, installs the MLflow package. MLflow is an open-source platform for managing the machine learning lifecycle. It offers tracking and versioning of experiments, packaging of models, and deployment capabilities.

By executing these commands, you ensure that both PySpark and MLflow are installed in the environment, allowing you to use their functionalities for data processing and machine learning tasks.

In [0]:
%pip install pyspark
%pip install mlflow

Python interpreter will be restarted.
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317143 sha256=9ce1244fdab02d217089622f734f0fe05321fb1d0809bf055008b2a16e313bfc
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.0
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting mlflow
  Downloading mlflow-2.4.1-py3-none-any.whl (18.1 MB)
Collecting gunicorn<21
  Downloading gunicorn-20.1.0-py3-none-any.whl (79 kB)
Collecting docker<7,>=4.0.0
  D

## Importing Required Libraries

In this code cell, we import several libraries that are necessary for the subsequent code execution. 

- `pyspark` is imported from `SparkContext`, `functions`, `SparkSession`, and `Column` modules. These modules provide essential functionalities for working with Spark and Spark SQL.

- `FPGrowth` is imported from `pyspark.ml.fpm` module. It is a class that implements the FP-Growth algorithm for frequent pattern mining.

- `mlflow` and `mlflow.spark` are imported to utilize the MLflow library. MLflow is an open-source platform for managing the end-to-end machine learning lifecycle, and these imports enable MLflow integration with Spark.

By importing these libraries, we ensure that the required functionality and classes are available for performing the subsequent data processing and modeling tasks.

In [0]:
from pyspark import SparkContext
from pyspark.sql import functions as f, SparkSession, Column
from pyspark.ml.fpm import FPGrowth
import mlflow
import mlflow.spark

## Creating a Spark Session

In this code cell, a Spark Session is created using the `SparkSession.builder` object. A Spark Session is the entry point for working with structured data in Spark and provides a programming interface to interact with various Spark functionalities.

The `appName` parameter is set to "MarketbasketMLFlow", which specifies the name of the Spark application. This name helps identify the application in the Spark cluster.

The `getOrCreate()` method is called on the `SparkSession.builder` object to either retrieve an existing Spark Session or create a new one if it doesn't exist. This ensures that only one Spark Session is created per application.

By creating a Spark Session, we establish a connection to the Spark cluster and enable the execution of Spark operations on distributed data. The Spark Session provides a unified interface for working with structured data, including DataFrame and SQL operations, machine learning, and streaming capabilities.

In [0]:
spark = SparkSession.builder.appName("MarketbasketMLFlow").getOrCreate()

## Starting an MLflow Run

In this code cell, an MLflow run is started using the `mlflow.start_run()` function. MLflow is an open-source platform for managing the end-to-end machine learning lifecycle, and it enables tracking of experiments, parameters, metrics, and artifacts.

Within the MLflow run, the following steps are performed:

1. Logging Spark Version: The Spark version is logged using the `mlflow.log_param()` function. This captures the version of Spark being used in the experiment.

2. Reading the Data: Two CSV files, "basket.csv" and "Groceries_data.csv", are read into Spark DataFrames. The data is loaded using the `spark.read.csv()` function, and column names are assumed to be present in the first row (header=True). Additionally, a new column "id" is added to each DataFrame using the `f.monotonically_increasing_id()` function.

3. Logging Data Paths: The paths of the input CSV files are logged using the `mlflow.log_param()` function. This provides a record of the data sources used in the experiment.

4. Computing the Number of Baskets: The DataFrame `df_all` is grouped by the "Member_number" column, and the count of baskets for each member is computed. This information is stored in the `num_baskets` variable.

5. Logging the Number of Baskets: The count of baskets is logged as a metric using the `mlflow.log_metric()` function. This metric represents the total number of baskets in the dataset.

6. Removing Null Values: The DataFrame `df` is transformed to remove null values. The "basket" column is first selected, and then the null values within the array are removed using `f.array_except()` function.

7. Performing Market Basket Analysis: The FP-Growth algorithm is applied to the aggregated DataFrame `df_aggregated` using the `FPGrowth` class from `pyspark.ml.fpm`. The minimum support and minimum confidence thresholds are set to 0.001. The resulting model is stored in the `model` variable.

8. Logging the Model: The trained model is logged using the `mlflow.spark.log_model()` function. This saves the model artifacts to be later retrieved and used for predictions.

9. Retrieving the Run ID: The run ID of the active MLflow run is obtained using `mlflow.active_run().info.run_id` and stored in the `run_id` variable. This run ID can be used to reference this specific MLflow run for later analysis or retrieval of logged information.

The code cell performs various steps to log information, compute market basket analysis, and save the model using MLflow, allowing for better experimentation tracking and model management.

In [0]:
# Start an MLflow run
with mlflow.start_run():
    # Log the Spark version
    mlflow.log_param("spark_version", spark.version)

    # Read the data
    df = spark.read.csv("dbfs:/FileStore/shared_uploads/lalithsagardevagudi@gmail.com/basket.csv", header=True).withColumn("id", f.monotonically_increasing_id())
    df_all = spark.read.csv("dbfs:/FileStore/shared_uploads/lalithsagardevagudi@gmail.com/Groceries_data.csv", header=True).withColumn("id", f.monotonically_increasing_id())

    # Log the data paths
    mlflow.log_param("basket_data_path", "dbfs:/FileStore/shared_uploads/lalithsagardevagudi@gmail.com/basket.csv")
    mlflow.log_param("groceries_data_path", "dbfs:/FileStore/shared_uploads/lalithsagardevagudi@gmail.com/Groceries_data.csv")

    # Compute number of baskets
    num_baskets = df_all.groupBy("Member_number").count()

    # Log the number of baskets
    mlflow.log_metric("num_baskets", num_baskets.count())

    # Remove nulls
    df_basket = df.select("id", f.array([df[c] for c in df.columns[:11]]).alias("basket"))
    df_aggregated = df_basket.select("id", f.array_except("basket", f.array(f.lit(None))).alias("basket"))

    # Perform market basket analysis
    fp_growth = FPGrowth(minSupport=0.001, minConfidence=0.001, itemsCol='basket', predictionCol='prediction')
    model = fp_growth.fit(df_aggregated)

    # Log the model
    mlflow.spark.log_model(model, "market_basket_model")

    # Retrieve the run ID
    run_id = mlflow.active_run().info.run_id
    # print("MLflow run ID:", run_id)

2023/06/12 17:55:09 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


## Loading the Trained Model and Generating Predictions

In this code cell, the trained model is loaded using MLflow and used to generate predictions on new data. The following steps are performed:

1. Model Loading: The `model_uri` variable is set to the URI of the trained model artifact. The URI is constructed using the `run_id` obtained earlier in the code. This URI specifies the location of the model within MLflow. The `mlflow.spark.load_model()` function is then called to load the model into the `model` variable.

2. Obtaining the FPGrowthModel: The FPGrowthModel from the loaded model is extracted. It is assumed that the FPGrowthModel is the last stage in the pipeline, so `model.stages[-1]` is used to access it. The FPGrowthModel is stored in the `fpgrowth_model` variable.

3. Creating a PySpark DataFrame: A new PySpark DataFrame called `new_df` is created to hold the new data for which predictions will be generated. The DataFrame consists of a single column named 'basket', and it contains two rows of new data: (['beef'],) and (['oil'],). The `spark.sparkContext.parallelize()` function is used to parallelize the new data, and the resulting RDD is converted to a DataFrame using the `toDF()` function. The column names are specified using the `columns` list.

4. Generating Predictions: The `model.transform()` function is used to generate predictions on the `new_df` DataFrame. The `model` variable contains the loaded model, and the `transform()` function applies the model to the input DataFrame. The predictions are stored in the `predictions` DataFrame.

5. Showing the Recommendations: The `predictions.show(5)` statement is used to display the top 5 rows of the predictions DataFrame, which includes the generated recommendations based on the input data.

By loading the trained model and generating predictions, this code cell demonstrates how to use the trained model to make recommendations for new data.

In [0]:
# Load the trained model
model_uri = "runs:/"+run_id+"/market_basket_model"  # Replace <run-id> with the actual run ID of the model
model = mlflow.spark.load_model(model_uri)

# Get the FPGrowthModel from the pipeline model
fpgrowth_model = model.stages[-1]  # Assuming the FPGrowthModel is the last stage in the pipeline

# Create a PySpark DataFrame with new data
columns = ['basket']
new_data = [(['beef'],), (['oil'],)]
rdd = spark.sparkContext.parallelize(new_data)
new_df = rdd.toDF(columns)

# Generate predictions using the model
predictions = model.transform(new_df)

# Show the recommendations
predictions.show(5)




2023/06/12 17:56:07 INFO mlflow.spark: 'runs:/5af8033cf02a4c9aa777b9a67284c37e/market_basket_model' resolved as 'dbfs:/databricks/mlflow-tracking/1564150963134392/5af8033cf02a4c9aa777b9a67284c37e/artifacts/market_basket_model'
2023/06/12 17:56:11 INFO mlflow.spark: URI 'runs:/5af8033cf02a4c9aa777b9a67284c37e/market_basket_model/sparkml' does not point to the current DFS.
2023/06/12 17:56:11 INFO mlflow.spark: File 'runs:/5af8033cf02a4c9aa777b9a67284c37e/market_basket_model/sparkml' not found on DFS. Will attempt to upload the file.
2023/06/12 17:56:13 INFO mlflow.spark: Copied SparkML model to /tmp/mlflow/c8f5da9a-44c2-45d3-b873-801c0845a3c7


+------+--------------------+
|basket|          prediction|
+------+--------------------+
|[beef]|[frankfurter, rol...|
| [oil]|[rolls/buns, yogu...|
+------+--------------------+



## Evaluating Predictions using Lift

This code cell demonstrates how to evaluate the predictions generated by the FPGrowth model using the concept of lift. The steps involved are as follows:

1. Association Rules: The association rules learned by the FPGrowth model are stored in the `association_rules` variable. These rules represent the relationships between items in the dataset.

2. Joining Predictions and Association Rules: The predictions generated earlier are joined with the association rules based on the common items. This is done using the `join()` function, where the join condition is specified as `association_rules.antecedent == predictions.basket`. The result is stored in the `eval_df` DataFrame.

3. Calculating Lift: The lift is a measure of the strength of an association rule. In this code, the calculation of lift is commented out. The lift can be calculated by dividing the confidence of each association rule by the overall confidence of the rules. You can uncomment the relevant line of code (`eval_df = eval_df.withColumn("lift", col("confidence") / association_rules.select("confidence").first())`) to calculate the lift for each association rule.

4. Showing the Evaluation Results: Finally, the `eval_df.show()` statement is used to display the evaluation results. This will show the joined DataFrame with the association rules and the corresponding predictions. If you have uncommented the lift calculation, the lift values will also be included in the evaluation results.

By evaluating the predictions using lift and examining the association rules, you can gain insights into the strength and significance of the relationships between different items in the dataset.

In [0]:
# Evaluate the predictions using lift
association_rules = fpgrowth_model.associationRules

# Join the association rules with the predictions based on the common items
eval_df = association_rules.join(predictions, association_rules.antecedent == predictions.basket, "inner")

# Calculate the lift for each association rule
# eval_df = eval_df.withColumn("lift", col("confidence") / association_rules.select("confidence").first())

# Show the evaluation results
eval_df.show()

+----------+--------------------+--------------------+-------------------+--------------------+------+--------------------+
|antecedent|          consequent|          confidence|               lift|             support|basket|          prediction|
+----------+--------------------+--------------------+-------------------+--------------------+------+--------------------+
|    [beef]|              [curd]| 0.03740157480314961|  1.110396356705412|0.001269798837131...|[beef]|[frankfurter, rol...|
|    [beef]| [frozen vegetables]| 0.03740157480314961| 1.3356557608103283|0.001269798837131...|[beef]|[frankfurter, rol...|
|    [beef]|         [margarine]| 0.04133858267716536| 1.2832971215734963|0.001403461872619127|[beef]|[frankfurter, rol...|
|    [beef]|        [whole milk]|  0.1377952755905512| 0.8725479088706803|0.004678206242063757|[beef]|[frankfurter, rol...|
|    [beef]|[whipped/sour cream]| 0.04133858267716536| 0.9457939030556961|0.001403461872619127|[beef]|[frankfurter, rol...|
|    [be

## Generating Recommendations based on Association Rules

In this code cell, we demonstrate how to generate recommendations based on the association rules learned by the FPGrowth model. Here are the steps involved:

1. Specify the Input Item: You need to specify the item for which you want to generate recommendations. In the provided code, the variable `input_item` is set to "beef". You can replace it with your desired input item.

2. Filtering the Association Rules: The association rules learned by the FPGrowth model are filtered based on the condition that the antecedent (left-hand side) of the rule contains the input item. This is done using the `fpgrowth_model.associationRules.filter()` function. The filtered rules are stored in the `recommendations` variable.

3. Ordering the Recommendations: The filtered rules are then ordered by confidence, lift, and support in descending order using the `orderBy()` function. This ensures that the recommendations are ranked based on these criteria.

4. Showing the Recommendations: Finally, the `recommendations.show()` statement is used to display the recommendations. This will show the association rules that contain the input item, ordered by confidence, lift, and support.

By following these steps, you can generate recommendations based on the association rules and gain insights into the items that are likely to be associated with the input item.

In [0]:
import mlflow.pyfunc
from pyspark.sql import functions as F

# Specify the input item for which you want to make recommendations
input_item = "beef"  # Replace with your desired input item

# Generate recommendations using the association rules
recommendations = fpgrowth_model.associationRules.filter(F.array_contains(fpgrowth_model.associationRules.antecedent, input_item))

# Order the filtered rules by confidence, lift, and support in descending order
recommendations = recommendations.orderBy(
    F.desc("confidence"),
    F.desc("lift"),
    F.desc("support")
)


# Show the recommendations
recommendations.show()

+----------+--------------------+--------------------+-------------------+--------------------+
|antecedent|          consequent|          confidence|               lift|             support|
+----------+--------------------+--------------------+-------------------+--------------------+
|    [beef]|        [whole milk]|  0.1377952755905512| 0.8725479088706803|0.004678206242063757|
|    [beef]|  [other vegetables]| 0.08267716535433071| 0.6771201013666396|0.002806923745238254|
|    [beef]|            [yogurt]| 0.06496062992125984| 0.7564248291920708|0.002205440085544343|
|    [beef]|      [citrus fruit]|  0.0531496062992126|  1.000349130886941|0.001804450979081735|
|    [beef]|              [soda]|  0.0531496062992126| 0.5473348651446099|0.001804450979081735|
|    [beef]|        [newspapers]| 0.04921259842519685| 1.2652373028113755|0.001670787943594199|
|    [beef]|   [root vegetables]| 0.04921259842519685| 0.7073661001308554|0.001670787943594199|
|    [beef]|        [rolls/buns]|0.04724

In [0]:
# Stop the Spark session
spark.stop()