In [0]:
%sql
SELECT * FROM `recfood`.`default`.`rec_data_spark`;

food_id,userid,rating
531,40728,3
531,3753,4
531,16438,4
531,4301,3
531,7931,2
531,46901,5
531,2659,4
531,21097,3
388,28355,1
388,473,5


In [0]:

temp_table_name = "rec_data_spark_csv"

_sqldf.createOrReplaceTempView(temp_table_name)

In [0]:
permanent_table_name = "rec_data_spark_csv"

In [0]:
columnsToKeep = ['food_id', 'userid', 'rating']

_sqldf = _sqldf.select(columnsToKeep)
_sqldf.cache().count()

183310

In [0]:
for column in ['food_id', 'userid', 'rating']:
    _sqldf = _sqldf.withColumn(column, _sqldf[column].cast('int'))
    
_sqldf.printSchema()

root
 |-- food_id: integer (nullable = true)
 |-- userid: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [0]:
import numpy as np
import pandas as pd

def preview(_sqldf, n=3):
    return pd.DataFrame(_sqldf.take(n), columns=_sqldf.columns)

In [0]:
preview(_sqldf, n=3)

Unnamed: 0,food_id,userid,rating
0,531,40728,3
1,531,3753,4
2,531,16438,4


In [0]:
# Count the total number of ratings in the dataset
numerator = _sqldf.select('rating').count()
# Count the number of distinct Id’s
num_users = _sqldf.select('userid').distinct().count()
num_items = _sqldf.select('food_id').distinct().count()
# Set the denominator equal to the number of users multiplied by the number of items
denominator = num_users * num_items
# Divide the numerator by the denominator
sparsity = (1.0-(numerator * 1.0)/ denominator) * 100
print('The ratings dataframe is ', '%.2f' % sparsity + '% empty.')

The ratings dataframe is  99.83% empty.


In [0]:
train, test = _sqldf.randomSplit([0.8, 0.2], seed=42)

from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# The ALS instance
als = ALS(userCol='userid',
          itemCol='food_id',
          ratingCol='rating',
          seed=42,
          nonnegative=True,
          implicitPrefs=False,
          coldStartStrategy='drop'
          )

# The parameter grid to search
# NOTE: the parmeter lists can be reduced to two or even 
# one item if the grid search takes too long
als_paramgrid = (ParamGridBuilder()
                 .addGrid(als.rank, [50])
                 .addGrid(als.maxIter, [20])
                 .addGrid(als.regParam, [0.03])
                 .build())

# The evaluation function for determining the best model
rmse_eval = RegressionEvaluator(labelCol='rating',
                                predictionCol='prediction', 
                                metricName='rmse')

# The cross validation instance
als_cv = CrossValidator(estimator=als,
                        estimatorParamMaps=als_paramgrid,
                        evaluator=rmse_eval,
                        numFolds=10,                         
                        seed=42)

# Fit the models and find the best one!
als_best = als_cv.fit(train)

In [0]:
als_pred_train = als_best.transform(train)
als_pred_best = als_best.transform(test)

als_rmse = pd.DataFrame([ (rmse_eval.evaluate(als_pred_train.dropna()), 
                                        rmse_eval.evaluate(als_pred_best.dropna())) ],
                                     columns=['rmse_train', 'rmse_test'])

als_rmse

Unnamed: 0,rmse_train,rmse_test
0,0.058546,0.476813


In [0]:
%fs mkdirs /dbfs/FileStore/model/

In [0]:
#import joblib

joblib.dump(als_best, '/dbfs/FileStore/model/als_best.pkl')

[0;31m---------------------------------------------------------------------------[0m
[0;31mNotADirectoryError[0m                        Traceback (most recent call last)
File [0;32m<command-825299530402158>, line 3[0m
[1;32m      1[0m [38;5;66;03m#import joblib[39;00m
[0;32m----> 3[0m joblib[38;5;241m.[39mdump(als_best, [38;5;124m'[39m[38;5;124m/dbfs/FileStore/model/als_best.pkl[39m[38;5;124m'[39m)
[1;32m      4[0m dbutils[38;5;241m.[39mfs[38;5;241m.[39mcp([38;5;124m'[39m[38;5;124mfile:/dbfs/FileStore/model/als_best.pkl[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mdbfs:/FileStore/model/als_best.pkl[39m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.10/site-packages/joblib/numpy_pickle.py:552[0m, in [0;36mdump[0;34m(value, filename, compress, protocol, cache_size)[0m
[1;32m    550[0m         NumpyPickler(f, protocol[38;5;241m=[39mprotocol)[38;5;241m.[39mdump(value)
[1;32m    551[0m [38;5;28;01melif[39;00m is_filename:
[0;

In [0]:
%fs ls /dbfs/FileStore/model/ 

path,name,size,modificationTime
dbfs:/dbfs/FileStore/model/als_best/,als_best/,0,1697794781788
dbfs:/dbfs/FileStore/model/als_best.pkl/,als_best.pkl/,0,1697794781788


In [0]:
dbutils.fs.cp('/dbfs/FileStore/model/als_best.pkl', 'file:/tmp/als_best.pkl', recurse=True)

True

In [0]:
preview(als_pred_best[['food_id', 'userid', 'rating', 'prediction']])

Unnamed: 0,food_id,userid,stars,prediction
0,1489,27,4,1.537455
1,258,458,5,4.334433
2,1737,458,4,4.133451


In [0]:
train_try = train

In [0]:
train.show()

In [0]:
train.count()

In [0]:
train.agg({'userid': 'max'}).show()

In [0]:
vals = [(83, 57701, 5),
        (50,57701, 5),
        (69,57701, 1),
        (0,57701, 5),
        (502,57701, 3),]

In [0]:
newRows = spark.createDataFrame(vals,train_try.columns)

In [0]:
train_try = train_try.union(newRows)

In [0]:
train_try.count()

In [0]:
train_try_pd = train_try.toPandas()

In [0]:
user_food_57695 = train_try_pd[train_try_pd['userid']==57701]
user_food_57695
#benim oyladıklarım yeni eklenen

In [0]:
# create the model with new user
"""als = ALS(userCol='userid',
          itemCol='food_id',
          ratingCol='stars',
          rank=,
          maxIter=,
          regParam=,
          nonnegative=True,
          coldStartStrategy='drop',
          implicitPrefs=False)

als_new_user = als.fit(train_try.dropna())"""


loaded_model

In [0]:
#nrecommendations = als_new_user.recommendForAllUsers(5)

nrecommendations = loaded_model.recommendForAllUsers(5)

In [0]:
from pyspark.sql.functions import split, explode, col, lower, sort_array
recommendationsDF = (nrecommendations
  .select("userid", explode("recommendations")
  .alias("recommendation"))
  .select("userid", "recommendation.*")
)

display(recommendationsDF)

In [0]:
collab_rec_57695 = recommendationsDF[recommendationsDF['userid'] == 57701].toPandas()

In [0]:
rated_57695 = train_try_pd[train_try_pd['userid']==57701]['food_id'].tolist()

In [0]:
rated_57695 #daha once ratelediklerim

In [0]:
foods_rec = []
for i in collab_rec_57695['food_id']:
    if i not in rated_57695:
        foods_rec.append(i)

In [0]:
foods_rec 
#daha once oyladiklarima gore bu recommendation systemin bana tavsiye ettigi yemekler