In [53]:
# import necessary libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession

# instantiate SparkSession object
spark = SparkSession.builder.master("local").getOrCreate()

# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.json('../../api/interactions_70.json')
movie_ratings = movie_ratings.drop('timestamp')

In [59]:
movie_ratings.columns

['movie_id', 'rating', 'user_id']

In [61]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# split into training and testing sets
(training, test) = movie_ratings.randomSplit([.8, .2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, rank=4, regParam=0.01, userCol='user_id', itemCol='movie_id', ratingCol='rating', coldStartStrategy='drop')

# fit the ALS model to the training set
model=als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print(rmse)

                                                                                

1.0194220035628454


In [120]:
from elasticsearch import Elasticsearch


client = Elasticsearch('localhost:9200')

query = {
    'query': {
        'term': {'user_id': 4}
    }
}

result = client.search(index='users', body=query)
user = result['hits']['hits']
user

  result = client.search(index='users', body=query)


[{'_index': 'users',
  '_type': '_doc',
  '_id': '7BOcXowBdrQGnaV-yl5h',
  '_score': 1.0,
  '_source': {'user_id': 4,
   'age': 24,
   'gender': 'M',
   'occupation': 'technician',
   'zip': '43537'}}]

In [56]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# initialize the ALS model
als_model = ALS(userCol='user_id', itemCol='movie_id', ratingCol='rating', coldStartStrategy='drop')

# create the parameter grid
params = ParamGridBuilder().addGrid(als_model.regParam, [.01, .05, .1, .15]).addGrid(als_model.rank, [10, 50, 100, 150]).build()

#instantiating crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)

best_model = cv.fit(movie_ratings)
model = best_model.bestModel

                                                                                

In [62]:
final_als = ALS(maxIter=10, rank=50, regParam=0.15, userCol='user_id', itemCol='movie_id', ratingCol='rating', coldStartStrategy='drop')

final_model = final_als.fit(training)

test_predictions = final_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.9386572512673913


In [64]:
final_model.save('good_model')

In [84]:
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch

client = Elasticsearch(
    "http://localhost:9200",
)

# Create a Spark session
spark = SparkSession.builder.appName("movie_recommendation").getOrCreate()

# Load the ALS model
model_path = "../../app/model/good_model"  # Update with the actual path
als_model = ALSModel.load(model_path)

# User for whom you want to recommend movies
user_id_to_recommend = 123  # Replace with the actual user ID

# Load user interactions from Elasticsearch
user_interactions_query = {
    "query": {
        "term": {"user_id": user_id_to_recommend}
    }
}

# Assuming your Elasticsearch index for user interactions is named 'interactions'
user_interactions_result = client.search(index="interactions", body=user_interactions_query)
user_interactions = user_interactions_result["hits"]["hits"]


  user_interactions_result = client.search(index="interactions", body=user_interactions_query)


In [110]:
# Create a DataFrame with the user's movie interactions
user_interactions_df = spark.createDataFrame(
    [(interaction["_source"]["user_id"], interaction["_source"]["movie_id"]) for interaction in user_interactions],
    ["user_id", "movie_id"]
)

# Get the movie IDs for which the user has not provided ratings
user_unrated_movies_df = als_model.itemFactors.select("id").subtract(user_interactions_df.select("movie_id"))

In [115]:
# user_unrated_movies_df.show()

In [112]:
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

# Add a constant user ID to the unrated movies DataFrame
user_id_to_recommend = 123  # Replace with the actual user ID
user_unrated_movies_df = user_unrated_movies_df.withColumn("user_id", lit(user_id_to_recommend))

In [1]:
# Apply ALS model to generate predictions for unrated movies
als_model_predictions = als_model.transform(user_unrated_movies_df).select(
    col("user_id"),
    col("id").alias("movie_id"),  # Corrected alias from "movie_id" to "id"
    col("prediction").alias("als_prediction")
)

# Display the ALS model predictions for unrated movies
als_model_predictions.show()


In [None]:

# Rename the columns to match the ALS model input
user_unrated_movies_df = user_unrated_movies_df.selectExpr("id as user_id", "id as movie_id")