# 🤖 Prototyping models for Meet Fresh recommender - Collaborative Filtering
For Meet Fresh product solution POC, we prototype multiple models that could be used for providing recommendations on different levels -

- Recommending ingredients based on customer ingredient ratings
- Recommending products based on ingredient selections

Here we outline the process of building a POC for collaborative filtering using ingredent ratings.

References:
- [Prototyping a Recommender System Step by Step Part 2: Alternating Least Square (ALS) Matrix Factorization in Collaborative Filtering](https://towardsdatascience.com/prototyping-a-recommender-system-step-by-step-part-2-alternating-least-square-als-matrix-4a76c58714a1)
- [Running ALS on MovieLens (PySpark)](https://github.com/microsoft/recommenders/blob/main/examples/00_quick_start/als_movielens.ipynb)

### Model 1: Collaborative Filtering Using Ingredient Ratings
This CF model uses only customer ratings (1-3) for ingredient items, outputs predicted ratings for the ingredients that have not been rated on by the customer. The assumption goes that customer only expresses explicit preference from providing rating, but not providing a rating does not mean they are NOT interested (as they might just not know that they like the item).

Based on predicted ratings for unrated ingredients, recommendations could be by sorting on predicted ratings.

In practical implementation, we could only use this approach when we have accumulated significant amount of ratings data. To address cold start problem, we could utilize content-based filtering and other approaches. 

In [None]:
pip install recommenders --user

[0mCollecting recommenders
  Using cached recommenders-1.1.1-py3-none-any.whl (339 kB)
Collecting lightfm<2,>=1.15 (from recommenders)
  Using cached lightfm-1.17-cp37-cp37m-linux_x86_64.whl
Collecting lightgbm>=2.2.1 (from recommenders)
  Using cached lightgbm-3.3.5-py3-none-manylinux1_x86_64.whl (2.0 MB)
Collecting memory-profiler<1,>=0.54.0 (from recommenders)
  Using cached memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Collecting nltk<4,>=3.4 (from recommenders)
  Using cached nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting transformers<5,>=2.5.0 (from recommenders)
  Using cached transformers-4.30.2-py3-none-any.whl (7.2 MB)
Collecting category-encoders<2,>=1.3.0 (from recommenders)
  Using cached category_encoders-1.3.0-py2.py3-none-any.whl (61 kB)
Collecting jinja2<3.1,>=2 (from recommenders)
  Using cached Jinja2-3.0.3-py3-none-any.whl (133 kB)
Collecting pyyaml<6,>=5.4.1 (from recommenders)
  Using cached PyYAML-5.4.1-cp37-cp37m-manylinux1_x86_64.whl (636 kB)
Collecting cor

In [None]:
from pyspark.ml.recommendation import ALS
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation

#### Step 1 - Data pre-processing for running ALS

In [None]:
%%bigquery ratings_orig_df
SELECT * FROM `dsxl-ai-advanced-program.meetfresh.ft_customer_ingredient_ratings`

Query is running:   0%|          |

Downloading:   0%|          |

In [None]:
ratings_orig_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500029 entries, 0 to 500028
Data columns (total 4 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   customer_id       500029 non-null  object
 1   ingredient_id     500029 non-null  object
 2   ingredient_name   500029 non-null  object
 3   meetfresh_rating  500029 non-null  object
dtypes: object(4)
memory usage: 15.3+ MB


In [None]:
ratings_orig_df

Unnamed: 0,customer_id,ingredient_id,ingredient_name,meetfresh_rating
0,156294,B15,Tofu Pudding,1.0
1,206256,B15,Tofu Pudding,1.0
2,248808,B15,Tofu Pudding,1.0
3,141890,B15,Tofu Pudding,1.0
4,116806,B15,Tofu Pudding,1.0
...,...,...,...,...
500024,35223,UNK1002,Almond Flakes,3.0
500025,98035,UNK1002,Almond Flakes,3.0
500026,11387,UNK1002,Almond Flakes,3.0
500027,82186,UNK1002,Almond Flakes,3.0


In [None]:
# spark-based API for ALS currently only supports integers for user and item ids
# map ingredient_id to 8-digit numeric id values
import random
random.seed(101)

original_ids = ratings_orig_df['ingredient_id'].unique()

while True:
    new_ids = {id_: random.randint(10_000_000, 99_999_999) for id_ in original_ids}
    if len(set(new_ids.values())) == len(original_ids):
        # all the generated id's were unique
        break
    # otherwise this will repeat until they are

ratings_orig_df['ingredient_id_int'] = ratings_orig_df['ingredient_id'].map(new_ids)

In [None]:
# customer_id should be integer, meetfresh_rating should be float
ratings_orig_df['customer_id'] = ratings_orig_df['customer_id'].astype(int)
ratings_orig_df['meetfresh_rating'] = ratings_orig_df['meetfresh_rating'].astype(float)
ratings_orig_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500029 entries, 0 to 500028
Data columns (total 5 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   customer_id        500029 non-null  int64  
 1   ingredient_id      500029 non-null  object 
 2   ingredient_name    500029 non-null  object 
 3   meetfresh_rating   500029 non-null  float64
 4   ingredient_id_int  500029 non-null  int64  
dtypes: float64(1), int64(2), object(2)
memory usage: 19.1+ MB


In [None]:
# turn dataframes into spark df
ratings_df = ratings_orig_df[['customer_id', 'ingredient_id_int', 'meetfresh_rating']]

schema = StructType(
    [
        StructField('customer_id', IntegerType()),
        StructField('ingredient_id_int', IntegerType()),
        StructField('meetfresh_rating', FloatType())
    ]
)

sparkDF=spark.createDataFrame(ratings_df, schema=schema) 
sparkDF.printSchema()
sparkDF.show()

root
 |-- customer_id: integer (nullable = true)
 |-- ingredient_id_int: integer (nullable = true)
 |-- meetfresh_rating: float (nullable = true)



23/06/30 19:58:39 WARN org.apache.spark.scheduler.TaskSetManager: Stage 0 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
[Stage 0:>                                                          (0 + 1) / 1]

+-----------+-----------------+----------------+
|customer_id|ingredient_id_int|meetfresh_rating|
+-----------+-----------------+----------------+
|     156294|         88000918|             1.0|
|     206256|         88000918|             1.0|
|     248808|         88000918|             1.0|
|     141890|         88000918|             1.0|
|     116806|         88000918|             1.0|
|     197318|         88000918|             1.0|
|     133389|         88000918|             1.0|
|       9423|         88000918|             1.0|
|     265892|         88000918|             1.0|
|     130654|         88000918|             1.0|
|     140511|         88000918|             1.0|
|      95722|         88000918|             1.0|
|     255165|         88000918|             1.0|
|      24086|         88000918|             1.0|
|     128695|         88000918|             1.0|
|     151125|         88000918|             1.0|
|     154479|         88000918|             1.0|
|      84830|       

                                                                                

#### Step 2 - ALS model training

In [None]:
# set up some global parameter values
# top k items to recommend
TOP_K = 10

# Column names for the dataset
COL_USER = "customer_id"
COL_ITEM = "ingredient_id_int"
COL_RATING = "meetfresh_rating"
# COL_TIMESTAMP = "Timestamp"

In [None]:
# split data into train and validation sets
train_data, validation_data = spark_random_split(sparkDF, ratio=0.75, seed=123)
print ("N train", train_data.cache().count())
print ("N test", validation_data.cache().count())

23/06/30 19:58:58 WARN org.apache.spark.scheduler.TaskSetManager: Stage 1 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
                                                                                

N train 374966


23/06/30 19:59:01 WARN org.apache.spark.scheduler.TaskSetManager: Stage 3 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.

N test 125063


                                                                                

In [None]:
# train model with some default hyperparameters
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

model = als.fit(train_data)

23/06/30 19:59:15 WARN org.apache.spark.scheduler.TaskSetManager: Stage 5 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 19:59:15 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 19:59:20 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/06/30 19:59:20 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/06/30 19:59:21 WARN com.github.fommil.netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/06/30 19:59:21 WARN com.github.fommil.netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [None]:
# recommending ingredients already rated by customers does not make sense and we need to remove those already rated

# get the cross join of all user-item pairs and score them
users = train_data.select(COL_USER).distinct()
items = train_data.select(COL_ITEM).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)

# remove rated items
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
    train_data.alias("train"),
    (dfs_pred[COL_USER] == train_data[COL_USER]) & (dfs_pred[COL_ITEM] == train_data[COL_ITEM]),
    how='outer'
)

top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
    .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

top_all.show()

23/06/30 19:59:55 WARN org.apache.spark.sql.Column: Constructing trivially true equals predicate, 'customer_id#0 = customer_id#0'. Perhaps you need to use aliases.
23/06/30 19:59:55 WARN org.apache.spark.sql.Column: Constructing trivially true equals predicate, 'ingredient_id_int#1 = ingredient_id_int#1'. Perhaps you need to use aliases.
23/06/30 19:59:56 WARN org.apache.spark.scheduler.TaskSetManager: Stage 81 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 19:59:58 WARN org.apache.spark.scheduler.TaskSetManager: Stage 83 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 19:59:58 WARN org.apache.spark.scheduler.TaskSetManager: Stage 84 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.

+-----------+-----------------+----------+
|customer_id|ingredient_id_int|prediction|
+-----------+-----------------+----------+
|         11|         38792785| 1.4237518|
|         11|         54162664| 1.3280461|
|         14|         72698292| 1.1512083|
|         35|         72698292| 1.1512083|
|         80|         75157816| 1.4560395|
|         84|         38504727| 1.5660988|
|        111|         54162664|  1.363963|
|        129|         19442953| 2.4012413|
|        170|         90804821| 1.4121011|
|        185|         90804821| 0.5803766|
|        210|         35942492| 0.6856568|
|        210|         90804821|0.76456535|
|        218|         54162664| 1.1853198|
|        221|         44257992| 1.6205008|
|        242|         19442953| 0.5109888|
|        242|         38792785| 0.5643073|
|        248|         99049522| 1.3198466|
|        260|         77581358|0.84048474|
|        295|         90804821|0.75884974|
|        325|         48724825| 1.5827569|
+----------

                                                                                

#### Step 3 - ALS model rating predictions evaluation

In [None]:
# generate predicted ratings on validation data
prediction = model.transform(validation_data)
prediction.cache().show()

23/06/30 20:01:24 WARN org.apache.spark.scheduler.TaskSetManager: Stage 160 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
                                                                                

+-----------+-----------------+----------------+----------+
|customer_id|ingredient_id_int|meetfresh_rating|prediction|
+-----------+-----------------+----------------+----------+
|     108560|         21927066|             1.0| 1.3774933|
|        392|         21927066|             2.0|  1.022362|
|     180457|         21927066|             2.0| 1.6426476|
|     183122|         21927066|             2.0| 1.6831236|
|     267772|         21927066|             1.0| 1.3448393|
|      19131|         21927066|             2.0| 1.5322832|
|     156123|         21927066|             1.0| 1.0642309|
|     166547|         21927066|             1.0|   1.06981|
|     179412|         21927066|             2.0| 1.1142436|
|      45027|         21927066|             1.0| 0.8961086|
|      66594|         21927066|             1.0| 0.5315274|
|     125475|         21927066|             1.0| 1.1228914|
|     216362|         21927066|             1.0| 2.1285985|
|     257766|         21927066|         

In [None]:
rating_eval = SparkRatingEvaluation(validation_data, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

23/06/30 20:05:09 WARN org.apache.spark.scheduler.TaskSetManager: Stage 242 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:05:14 WARN org.apache.spark.scheduler.TaskSetManager: Stage 323 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:05:17 WARN org.apache.spark.scheduler.TaskSetManager: Stage 363 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
                                                                                

Model:	ALS rating prediction
RMSE:	0.900810
MAE:	0.710196
Explained variance:	-0.246854
R squared:	-0.482952


23/06/30 20:05:19 WARN org.apache.spark.scheduler.TaskSetManager: Stage 404 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.


In [None]:
# what about training data
prediction_train = model.transform(train_data)
rating_eval_train = SparkRatingEvaluation(train_data, prediction_train, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval_train.rmse(),
      "MAE:\t%f" % rating_eval_train.mae(),
      "Explained variance:\t%f" % rating_eval_train.exp_var(),
      "R squared:\t%f" % rating_eval_train.rsquared(), sep='\n')

23/06/30 20:11:27 WARN org.apache.spark.scheduler.TaskSetManager: Stage 406 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:11:28 WARN org.apache.spark.scheduler.TaskSetManager: Stage 442 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:11:33 WARN org.apache.spark.scheduler.TaskSetManager: Stage 448 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:11:34 WARN org.apache.spark.scheduler.TaskSetManager: Stage 484 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:11:48 WARN org.apache.spark.scheduler.TaskSetManager: Stage 489 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:11:48 WARN org.apache.spark.scheduler.TaskSetManager: Stage 524 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB

Model:	ALS rating prediction
RMSE:	0.343862
MAE:	0.226626
Explained variance:	0.791472
R squared:	0.786017


                                                                                

Model performs much better on training data than on validation data -> serious sign of high variance problem and model is overfitting on training data. In next step we find a way to conduct hyperparameter tuning and adjust regularization to account for overfitting.

#### Step 4 - ALS model hyperparameter tuning

In [None]:
def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of validation data
    
    Parameters
    ----------
    train_data: spark DF with columns ['customer_id', 'ingredient_id_int', 'meetfresh_rating']
    
    validation_data: spark DF with columns ['customer_id', 'ingredient_id_int', 'meetfresh_rating']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING}
    
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            als = ALS(
                        rank=rank,
                        maxIter=15,
                        implicitPrefs=False,
                        regParam=reg,
                        coldStartStrategy='drop',
                        nonnegative=False,
                        seed=42,
                        **header
                    )
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            prediction = model.transform(validation_data)
            rating_eval = SparkRatingEvaluation(validation_data, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")
            rmse = rating_eval.rmse()
            
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [None]:
regParams = [0.1]
ranks = [50,60,70]

tune_ALS(train_data, validation_data, 15, regParams, ranks)

23/06/30 20:50:01 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6772 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:50:01 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6773 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:50:52 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6848 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:50:52 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6884 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:50:57 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6890 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:50:57 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6925 contains a task of very large size (2908 KB). The maximum recommended task size is 

50 latent factors and regularization = 0.1: validation RMSE is 0.8371197275451394


23/06/30 20:51:05 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6931 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:51:05 WARN org.apache.spark.scheduler.TaskSetManager: Stage 6932 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:52:23 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7007 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:52:23 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7043 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:52:27 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7049 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:52:28 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7085 contains a task of very large size (2908 KB). The maximum recommended task size is 

60 latent factors and regularization = 0.1: validation RMSE is 0.8380459724351232


23/06/30 20:52:35 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7090 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:52:35 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7091 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:54:26 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7166 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:54:26 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7203 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:54:29 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7208 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:54:30 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7244 contains a task of very large size (2908 KB). The maximum recommended task size is 

70 latent factors and regularization = 0.1: validation RMSE is 0.8378188338922786

The best model has 50 latent factors and regularization = 0.1


                                                                                

ALS_5fc1ad3ca6ce

After tuning, seems that the best hyperparameter values to use are regParams = 0.1 and ranks = 50
Due to the nature of ratings data used for this exercise, it is expected to have less than ideal performance on validation set.

#### Step 5 - Make predictions

In [None]:
als = ALS(
        rank=50,
        maxIter=15,
        implicitPrefs=False,
        regParam=0.1,
        coldStartStrategy='drop',
        nonnegative=False,
        seed=42,
        **header
    )

model = als.fit(train_data)
prediction = model.transform(validation_data)
prediction.cache().show()

23/06/30 20:55:47 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7249 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:55:47 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7250 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
23/06/30 20:56:40 WARN org.apache.spark.scheduler.TaskSetManager: Stage 7360 contains a task of very large size (2908 KB). The maximum recommended task size is 100 KB.
                                                                                

+-----------+-----------------+----------------+----------+
|customer_id|ingredient_id_int|meetfresh_rating|prediction|
+-----------+-----------------+----------------+----------+
|     108560|         21927066|             1.0| 1.7628738|
|        392|         21927066|             2.0| 1.3824956|
|     180457|         21927066|             2.0| 1.2576532|
|     183122|         21927066|             2.0|  1.451288|
|     267772|         21927066|             1.0| 1.5627604|
|      19131|         21927066|             2.0| 1.5108426|
|     156123|         21927066|             1.0| 0.9783255|
|     166547|         21927066|             1.0| 1.0476387|
|     179412|         21927066|             2.0| 1.1452771|
|      45027|         21927066|             1.0| 0.9671886|
|      66594|         21927066|             1.0|0.61436623|
|     125475|         21927066|             1.0| 1.2123389|
|     216362|         21927066|             1.0| 1.2300282|
|     257766|         21927066|         

In [None]:
validation_pred_df = prediction.toPandas()
validation_pred_df

Unnamed: 0,customer_id,ingredient_id_int,meetfresh_rating,prediction
0,108560,21927066,1.0,1.762874
1,392,21927066,2.0,1.382496
2,180457,21927066,2.0,1.257653
3,183122,21927066,2.0,1.451288
4,267772,21927066,1.0,1.562760
...,...,...,...,...
105378,250609,82373836,2.0,1.760008
105379,252031,82373836,2.0,1.257197
105380,255183,82373836,2.0,2.405112
105381,259618,82373836,2.0,1.973809
