<a href="https://colab.research.google.com/github/Shahid619/Machine-learning-/blob/main/Mvie%20Recommender%20System%20using%20Apache%20spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**installing jdk & pyspark**

In [None]:
!apt-get update -q
!apt-get install -y openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark


**downloading & deploying spark**

In [None]:
# Download Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Verify the download
!ls -l spark-3.4.1-bin-hadoop3.tgz

# Extract Spark only if the file exists
if os.path.isfile('spark-3.4.1-bin-hadoop3.tgz'):
    !tar xf spark-3.4.1-bin-hadoop3.tgz
else:
    print("Download failed: spark-3.4.1-bin-hadoop3.tgz not found.")


**Set Environment Variables**

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PATH"] += ":/content/spark-3.4.1-bin-hadoop3/bin"


**Initializing a SparkSession**

In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("ColabSparkApp") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# Test the Spark session
df = spark.range(5)
df.show()


** Download and Loading the Dataset**

In [None]:
# Download the MovieLens dataset
!wget -q https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip -q ml-latest-small.zip

# Load the dataset into a Spark DataFrame
ratings_file = '/content/ml-latest-small/ratings.csv'
movies_file = '/content/ml-latest-small/movies.csv'

ratings_df = spark.read.csv(ratings_file, header=True, inferSchema=True)
movies_df = spark.read.csv(movies_file, header=True, inferSchema=True)

# Show the first few rows of the ratings DataFrame
ratings_df.show(5)
# Show the first few rows of the movies DataFrame
movies_df.show(5)


**Building the Recommender System**

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import Row
# Import the necessary function
from pyspark.sql.functions import expr ,col# Import expr here


# Split the data into training and test sets
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)

# Train the model
model = als.fit(training)

# Make predictions
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")
# Step 3: Prepare Data for Evaluation
threshold = 3.0
# Now expr is defined and can be used
predictions = predictions.withColumn("binary_prediction", expr(f"IF(prediction >= {threshold}, 1.0, 0.0)"))
predictions = predictions.withColumn("binary_rating", expr(f"IF(rating >= {threshold}, 1.0, 0.0)"))
# or
predictions = predictions.withColumn("binary_prediction", col("binary_prediction").cast("double"))
predictions = predictions.withColumn("binary_rating", col("binary_rating").cast("double"))

# Step 5: Evaluate Accuracy and F1 Score
# ... rest of your code ...# Step 5: Evaluate Accuracy and F1 Score
accuracy_evaluator = MulticlassClassificationEvaluator( # Now MulticlassClassificationEvaluator is defined
    labelCol="binary_rating", predictionCol="binary_prediction", metricName="accuracy"
)
accuracy = accuracy_evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy}")

f1_evaluator = MulticlassClassificationEvaluator( # Now MulticlassClassificationEvaluator is defined
    labelCol="binary_rating", predictionCol="binary_prediction", metricName="f1"
)
f1 = f1_evaluator.evaluate(predictions)
print(f"F1 Score = {f1}")

**Generate Movie Recommendations**

In [None]:
from pyspark.sql.functions import lit

# Get top 10 movie recommendations for a specific user
user_id = 1

# Create a DataFrame for the specific user
user_df = ratings_df.filter(ratings_df.userId == user_id).select("movieId").distinct()
user_df = user_df.withColumn("userId", lit(user_id)) # Now lit is defined and can be used

# Get recommendations
recommendations = model.transform(user_df)

# Show the top 10 movie recommendations
recommendations.orderBy("prediction", ascending=False).show(10)


**Display Movie Titles for Recommendations**

In [None]:
from pyspark.sql.functions import lit

# Join with movies DataFrame to get the movie titles
recommendations_with_titles = recommendations.join(movies_df, "movieId")

# Show the top 10 movie recommendations with titles
recommendations_with_titles.select("title", "prediction").orderBy("prediction", ascending=False).show(10)


NameError: name 'recommendations' is not defined