In [None]:
from google.colab import files

uploaded = files.upload()





Saving tags.csv to tags.csv


In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=dcfd1e515bf37edc70975143495186b8311084a23c0193d3d97a5cbbaafe8db9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
!pip install findspark


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline
import findspark

In [None]:
findspark.init()

In [None]:
spark = SparkSession.builder.appName("MovieRecommendationSystem").getOrCreate()


In [None]:
spark

In [None]:
movies=spark.read.csv('/content/drive/MyDrive/Bigdata/movies.csv',header=True,inferSchema=True)
movies.printSchema()
movies.show(5)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings=spark.read.csv('/content/drive/MyDrive/Bigdata/ratings.csv',header=True,inferSchema=True)
# Drop unnecessary columns
ratings = ratings.drop("timestamp")

ratings.printSchema()
ratings.show(5)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [None]:
movie_ratings=movies.join(ratings,'movieId',"inner")
movie_ratings.show(10)

+-------+--------------------+--------------------+------+------+
|movieId|               title|              genres|userId|rating|
+-------+--------------------+--------------------+------+------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1|   4.0|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1|   4.0|
|      6|         Heat (1995)|Action|Crime|Thri...|     1|   4.0|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|     1|   5.0|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|     1|   5.0|
|     70|From Dusk Till Da...|Action|Comedy|Hor...|     1|   3.0|
|    101|Bottle Rocket (1996)|Adventure|Comedy|...|     1|   5.0|
|    110|   Braveheart (1995)|    Action|Drama|War|     1|   4.0|
|    151|      Rob Roy (1995)|Action|Drama|Roma...|     1|   5.0|
|    157|Canadian Bacon (1...|          Comedy|War|     1|   5.0|
+-------+--------------------+--------------------+------+------+
only showing top 10 rows



In [None]:

# Split the data into training and testing sets
train_data, test_data = movie_ratings.randomSplit([0.75, 0.25], seed=42)

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import RankingEvaluator

def get_similarities(train_data,test_data):
    genres_indexer = StringIndexer(inputCol="genres", outputCol="genresIndex")
    #********************User Based****************************
    # Create a vector assembler
    # Create a vector assembler
    feature_cols = ["userId", "movieId"]
    # Drop the existing "features" column if it exists
    if "features" in train_data.columns:
        train_data = train_data.drop("features")

    # Create a vector assembler
    vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

    #ratings = vector_assembler.transform(ratings)
    #ratings.show()
    # Create an ALS model
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

    # Create a pipeline
    pipeline = Pipeline(stages=[genres_indexer,vector_assembler, als])


    # Fit the pipeline to the data
    model = pipeline.fit(train_data)

    # Extract unique user IDs
    users = train_data.select("userId").distinct()

    # Generate user similarity matrix
    user_similarities = model.stages[-1].userFactors.alias("u1").crossJoin(model.stages[-1].userFactors.alias("u2"))
    # Select the desired columns
    user_similarities = user_similarities.select(col("u1.id").alias("u1"),
    col("u1.features").alias("u1_features"),
    col("u2.id").alias("u2"),
    col("u2.features").alias("u2_features"))

    # Make predictions on the test set
    predictions = model.transform(test_data)

    # Evaluate the model using RMSE as an example
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

    rmse = evaluator.evaluate(predictions)

    print("User Based")
    print(f"nRoot Mean Squared Error (RMSE): {rmse}")
    evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="mse")
    mse = evaluator.evaluate(predictions)
    print("\nMean Squared Error (MSE) on test data: {}".format(mse))



    #evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="mae")
    #mae = evaluator.evaluate(predictions)
    #print("Mean Average Error (MAE) on test data: {}".format(mae))
    #evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="r2")
    #r2 = evaluator.evaluate(predictions)
    #print("Co efficient of dtermination (R2) on test data: {}".format(r2))

    #user_similarities.show()

    # Fit the pipeline to the data
    #movie_model = pipeline.fit(movies)



    #******************Content based *************************

    # Create indexers for user and movie IDs
     # Create a StringIndexer for the 'genres' column

    transformed_movies = genres_indexer.fit(train_data).transform(train_data)

     # Create a vector assembler for content features
    feature_col_content = [ "genresIndex"]
    vector_assembler = VectorAssembler(inputCols=feature_col_content, outputCol="features")

    # Transform the movies DataFrame using the indexers and vector assembler
    transformed_movies = vector_assembler.transform(transformed_movies)
    #transformed_movies.show()
    #Step 2: Train ALS Model
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

    # Create a pipeline
    pipeline = Pipeline(stages=[als])

    # Fit the pipeline to the data
    model_content = pipeline.fit(train_data)
    # Make predictions on the test set
    predictions = model_content.transform(test_data)

    # Evaluate the model using RMSE as an example
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

    rmse = evaluator.evaluate(predictions)
    print("Content Based")
    print("\nRoot Mean Squared Error (RMSE)on test data: {}".format(rmse))

    evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="mse")
    mse = evaluator.evaluate(predictions)
    print("\nMean Squared Error (MSE) on test data: {}".format(mse))

    #evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="mae")
    #mae = evaluator.evaluate(predictions)
    #print("Mean Average Error (MAE) on test data: {}".format(mae))
    #evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="r2")
    #r2 = evaluator.evaluate(predictions)
    #print("Co efficient of dtermination (R2) on test data: {}".format(r2))


    # Step 3: Generate Content Similarity Matrix
    # Extract unique Movie IDs with features
    content = transformed_movies.select("movieId", "features").distinct()

    # Generate content similarity matrix
    content_similarities = content.alias("m1").crossJoin(content.alias("m2")).filter("m1.movieId != m2.movieId").select(
       col("m1.movieId").alias("m1"),
       col("m1.features").alias("m1_features"),
      col("m2.movieId").alias("m2"),
      col("m2.features").alias("m2_features"))

    #content_similarities.show()
    return user_similarities,content_similarities



In [None]:
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define a UDF to calculate cosine similarity
def calculate_cosine_similarity(features1, features2):
    dense_vector1 = DenseVector(features1)
    dense_vector2 = DenseVector(features2)

    dot_product = dense_vector1.dot(dense_vector2)
    norm_product = dense_vector1.norm(2) * dense_vector2.norm(2)

    similarity = dot_product / norm_product if norm_product != 0 else 0.0
    return float(similarity)


def get_user_cosine_similarity(train_data,test_data):
    # Register the UDF
    cosine_similarity_udf = udf(calculate_cosine_similarity, DoubleType())
    user_similarities,_=get_similarities(train_data,test_data)
    #user_similarities.show()
    # Assuming you have a DataFrame named user_similarities with columns
      # "u1.features" and "u2.features"
    # The features columns are assumed to be of type list

    # Calculate cosine similarity and add a new column
    user_similarities_with_similarity = user_similarities.withColumn("cosineSimilarity",
        cosine_similarity_udf(col("u1_features"), col("u2_features")))

    # Show the resulting DataFrame
    #user_similarities_with_similarity.show()

    return user_similarities_with_similarity


In [None]:
from pyspark.sql.functions import col
def colabrative_filering(userId,num_movies,train_data,test_data):
    # Assuming you have a DataFrame named user_similarities
    target_user_id = userId
    user_similarities_with_cosine=get_user_cosine_similarity(train_data,test_data)
    # Filter user similarities for the target user
    target_user_similarities = user_similarities_with_cosine.filter(
        col("u1") == target_user_id)
    # Get movies watched by the target user
    target_user_movies = train_data.filter(col("userId") == target_user_id).select("movieId","rating")

    #target_user_similarities.show()
    # Filter rows with cosineSimilarity greater than 0.8
    similar_users = user_similarities_with_cosine.filter(col(
        "cosineSimilarity") > 0.8)
    # Show the resulting DataFrame
    #similar_users.show()


    # Get movies watched by similar users
    movies_watched_by_similar_users = similar_users.join(ratings.alias("m"),
                col("u2") == col("m.userId")).select("m.movieId","m.rating")
    movies_watched_by_similar_users = movies_watched_by_similar_users.filter(col(
        "rating") > 2.5)
    # Get movies not watched by the target user but watched by similar users
    recommended_movies_id = movies_watched_by_similar_users.join(
           target_user_movies, on="movieId", how="left_anti")

    # Show the recommended movies
    #recommended_movies_id.show()
    # Join recommended movies with movie titles
    recommended_movies = recommended_movies_id.join(
              movies, on="movieId")
    #recommended_movies=recommended_movies.orderBy(col("rating").desc())
    # Show the recommended movies with titles
    #recommended_movies.show()

    return recommended_movies.limit(num_movies)



In [None]:
def get_content_cosine_similarity(train_data,test_data):
    # Register the UDF
    cosine_similarity_udf = udf(calculate_cosine_similarity, DoubleType())
    _,content_similarities=get_similarities(train_data,test_data)
    #user_similarities.show()
    # Assuming you have a DataFrame named user_similarities with columns
      # "u1.features" and "u2.features"
    # The features columns are assumed to be of type list

    # Calculate cosine similarity and add a new column
    content_similarities_with_cosine = content_similarities.withColumn("cosineSimilarity",
        cosine_similarity_udf(col("m1_features"), col("m2_features")))
    # Select relevant columns
    #content_similarities_with_cosine = content_similarities_with_cosine.select("m1", "m2", "cosineSimilarity")


    # Show the resulting DataFrame
    #content_similarities_with_cosine.show()

    return content_similarities_with_cosine


In [None]:
from pyspark.sql.functions import col,desc
def content_based_recommendation(userId,num_movies,train_data,test_data):

    target_user_id = userId

    #get content based similarities
    content_similarities_with_cosine=get_content_cosine_similarity(train_data,test_data)
    #content_similarities_with_cosine.show()


    # Get genres of movies watched by the target user
    user_watched_genres = train_data.filter(col("userId") == target_user_id).select("genres")

    # Extract unique genres
    unique_genres = set(user_watched_genres.rdd.flatMap(lambda row: row[0].split("|")).collect())

    # Show the unique genres
    print("Unique Genres:", unique_genres)

    # Filter movies with at least one common genre
    similar_movies_genres = train_data.filter(
       (col("userId") != target_user_id) &
       (col("genres").isNotNull()) &
       (col("genres") != "") &
       (col("genres").isin(list(unique_genres)))).select("title", "genres").distinct()

    # Show the similar movies based on genres
    #similar_movies_genres.show()

    return similar_movies_genres.limit(num_movies)


In [None]:
# UI for user input
def get_user_input():
    target_user_id = int(input("Enter the user ID: "))
    num_movies = int(input("Enter the number of movies to recommend: "))
    return target_user_id, num_movies

In [None]:
# Main function
def main():
    user_id, num_movies = get_user_input()
    recommendations = colabrative_filering(user_id, num_movies,train_data,test_data)
    # Print a message before showing the collabrative results
    print("\nUser Based Recommendation:\n")
    recommendations.show()
    content_recommendation=content_based_recommendation(user_id, num_movies,train_data,test_data)
    print("\n Content Based Recommendation:\n")
    content_recommendation.show()



In [None]:
if __name__ == "__main__":
    main()

Enter the user ID: 12
Enter the number of movies to recommend: 15
User Based
nRoot Mean Squared Error (RMSE): 0.8859522380886061

Mean Squared Error (MSE) on test data: 0.7849113681742101
Content Based

Root Mean Squared Error (RMSE)on test data: 0.8859522380886061

Mean Squared Error (MSE) on test data: 0.7849113681742101

User Based Recommendation:

+-------+------+--------------------+--------------------+
|movieId|rating|               title|              genres|
+-------+------+--------------------+--------------------+
|  84374|   3.5|No Strings Attach...|      Comedy|Romance|
|  56367|   3.5|         Juno (2007)|Comedy|Drama|Romance|
|   6942|   4.0|Love Actually (2003)|Comedy|Drama|Romance|
|  51705|   4.5|Priceless (Hors d...|      Comedy|Romance|
|  81847|   3.5|      Tangled (2010)|Animation|Childre...|
|   3578|   4.0|    Gladiator (2000)|Action|Adventure|...|
|  95543|   3.5|Ice Age 4: Contin...|Adventure|Animati...|
|  66203|   3.5|He's Just Not Tha...|Comedy|Drama|Romanc

In [None]:
spark.stop()