In [27]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ALS and K-Means Example") \
    .getOrCreate()

movies_df = spark.read.csv("movies.csv", header=True)
ratings_df = spark.read.csv("ratings.csv", header=True)

joined_df = ratings_df.join(movies_df, on="Movieid", how="inner")

joined_df.show()


+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [28]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col

user_indexer = StringIndexer(inputCol="userId", outputCol="userIndex")
movie_indexer = StringIndexer(inputCol="movieId", outputCol="movieIndex")

indexed_data = user_indexer.fit(joined_df).transform(joined_df)
indexed_data = movie_indexer.fit(indexed_data).transform(indexed_data)

indexed_data = indexed_data.withColumn("rating", col("rating").cast(FloatType()))

(training_data, test_data) = indexed_data.randomSplit([0.8, 0.2])

als = ALS(maxIter=5, regParam=0.01, userCol="userIndex", itemCol="movieIndex", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training_data)

predictions = model.transform(test_data)

predictions.show()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)


+-------+------+------+----------+--------------------+--------------------+---------+----------+----------+
|movieId|userId|rating| timestamp|               title|              genres|userIndex|movieIndex|prediction|
+-------+------+------+----------+--------------------+--------------------+---------+----------+----------+
|   1017|   140|   4.0|1021899101|Swiss Family Robi...|  Adventure|Children|     31.0|    2122.0|  3.038995|
|   1017|   217|   4.0| 955941990|Swiss Family Robi...|  Adventure|Children|     30.0|    2122.0| 1.7919734|
|   1033|   274|   3.5|1171932501|Fox and the Hound...|Animation|Childre...|      4.0|    1238.0| 3.3742902|
|   1033|   484|   4.5|1342300406|Fox and the Hound...|Animation|Childre...|     93.0|    1238.0|  4.045306|
|   1217|   390|   5.0|1250335792|          Ran (1985)|           Drama|War|    284.0|    1580.0|  4.686745|
|   1217|   474|   4.0|1089386649|          Ran (1985)|           Drama|War|      2.0|    1580.0| 4.1292043|
|    135|   109|   

In [41]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MovieRecommendation") \
    .getOrCreate()

# Load movies and ratings data
movies_df = spark.read.csv("movies.csv", header=True)
ratings_df = spark.read.csv("ratings.csv", header=True)

# Convert string columns to numeric
ratings_df = ratings_df \
    .withColumn("userId", ratings_df["userId"].cast("int")) \
    .withColumn("movieId", ratings_df["movieId"].cast("int")) \
    .withColumn("rating", ratings_df["rating"].cast("float"))

# Apply ALS algorithm for movie recommendation
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(ratings_df)

# Generate top 5 movie recommendations for each user
user_recs = model.recommendForAllUsers(5)
user_recs.show(truncate=False)

# Compute RMSE
predictions = model.transform(ratings_df)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

# Apply K-Means clustering Algorithm
vector_assembler = VectorAssembler(inputCols=["userId", "movieId", "rating"], outputCol="features")
kmeans_input_df = vector_assembler.transform(ratings_df)

kmeans = KMeans(k=3, seed=1)
kmeans_model = kmeans.fit(kmeans_input_df)
clustered_df = kmeans_model.transform(kmeans_input_df)

# Rename the prediction column from KMeans to avoid conflicts
clustered_df = clustered_df.withColumnRenamed("prediction", "cluster")

# Apply ALS Algorithm to each cluster
clusters = clustered_df.select("cluster").distinct().collect()

for cluster_row in clusters:
    cluster_id = cluster_row["cluster"]
    cluster_data = clustered_df.filter(clustered_df["cluster"] == cluster_id)
    
    als_cluster = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
    model_cluster = als_cluster.fit(cluster_data)
    
    # Generate recommendations for this cluster if needed
    cluster_recs = model_cluster.recommendForAllUsers(5)
    cluster_recs.show(truncate=False)
    
    # Evaluate this cluster's predictions if needed
    cluster_predictions = model_cluster.transform(cluster_data)
    cluster_rmse = evaluator.evaluate(cluster_predictions)
    print(f"Cluster {cluster_id} RMSE:", cluster_rmse)

# You can further analyze or store the ALS models and their predictions as needed.

# Stop the Spark session
spark.stop()





+------+-------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                  |
+------+-------------------------------------------------------------------------------------------------+
|1     |[{4617, 7.531906}, {928, 6.8841333}, {6093, 6.7758484}, {5650, 6.773191}, {3814, 6.749843}]      |
|2     |[{5480, 7.9016833}, {6639, 7.7438927}, {7700, 7.690603}, {5135, 7.4496465}, {6993, 7.3859186}]   |
|3     |[{69640, 5.5210605}, {5919, 5.454347}, {70946, 5.2648187}, {4518, 5.2075057}, {7991, 5.1961794}] |
|4     |[{119141, 6.977345}, {2936, 6.7035866}, {1218, 6.6940384}, {1191, 6.6081853}, {5577, 6.439983}]  |
|5     |[{4649, 7.117719}, {1658, 6.9301386}, {135861, 6.667493}, {7169, 6.6196218}, {4678, 6.399167}]   |
|6     |[{82, 7.090411}, {55276, 6.1945457}, {3200, 5.7568674}, {2587, 5.7450943}, {2423, 5.707869}]     |
|7     |[{158872, 11.167834}, {82, 10

Cluster 0 RMSE: 0.548656877128919
