In [1]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MovieRecommendation") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

API_KEY = "43d64229413cc934538918dd45a80f87"
BASE_URL = "https://api.themoviedb.org/3"

def fetch_movies():
    url = f"{BASE_URL}/movie/popular?api_key={API_KEY}&language=en-US&page=1"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()["results"]
    return []

movies = fetch_movies()
df_movies = pd.DataFrame(movies)[["id", "title", "vote_average", "vote_count"]]

# Convert to Spark DataFrame
spark_df_movies = spark.createDataFrame(df_movies)

# Save to HDFS
spark_df_movies.write.csv("hdfs://localhost:9000/movies.csv", header=True, mode="overwrite")

print("Movie data saved to HDFS.")


25/03/02 12:22:38 WARN Utils: Your hostname, DESKTOP-H8P4CBA resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/02 12:22:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/02 12:22:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Movie data saved to HDFS.


In [2]:
import numpy as np

# Simulate ratings for 100 users and 20 movies
num_users = 100
num_movies = len(df_movies)
ratings_matrix = np.random.randint(0, 6, size=(num_users, num_movies))  # Ratings from 0 to 5

df_ratings = pd.DataFrame(ratings_matrix, columns=df_movies["title"])

# Convert to Spark DataFrame
spark_df_ratings = spark.createDataFrame(df_ratings)

# Save to HDFS
spark_df_ratings.write.csv("hdfs://localhost:9000/ratings.csv", header=True, mode="overwrite")

print("User ratings saved to HDFS.")


                                                                                

User ratings saved to HDFS.


In [3]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from sklearn.preprocessing import MinMaxScaler

# Load data from HDFS
movies_df = spark.read.csv("hdfs://localhost:9000/movies.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("hdfs://localhost:9000/ratings.csv", header=True, inferSchema=True)

# Convert to Pandas for deep learning
ratings_pd = ratings_df.toPandas()
scaler = MinMaxScaler()
ratings_scaled = scaler.fit_transform(ratings_pd)

print("Data loaded and preprocessed.")


                                                                                

Data loaded and preprocessed.


In [4]:
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split

# Split data for training
train_data, test_data = train_test_split(ratings_scaled, test_size=0.2, random_state=42)

input_dim = num_movies

autoencoder = keras.models.Sequential([
    keras.layers.Dense(128, activation='relu', input_shape=(input_dim,)),
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(32, activation='relu'),
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(input_dim, activation='sigmoid')  # Output same shape as input
])

autoencoder.compile(optimizer='adam', loss='mse')
autoencoder.summary()

# Train model
autoencoder.fit(train_data, train_data, epochs=50, batch_size=16, validation_data=(test_data, test_data))


2025-03-02 12:24:30.007714: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-02 12:24:30.304454: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-02 12:24:30.630010: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1740918270.903825    3070 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1740918271.012015    3070 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-02 12:24:31.671153: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU ins

Epoch 1/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 91ms/step - loss: 0.1176 - val_loss: 0.1143
Epoch 2/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step - loss: 0.1118 - val_loss: 0.1130
Epoch 3/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step - loss: 0.1135 - val_loss: 0.1115
Epoch 4/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step - loss: 0.1104 - val_loss: 0.1099
Epoch 5/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step - loss: 0.1067 - val_loss: 0.1076
Epoch 6/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step - loss: 0.1029 - val_loss: 0.1042
Epoch 7/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step - loss: 0.0995 - val_loss: 0.1010
Epoch 8/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step - loss: 0.0934 - val_loss: 0.0972
Epoch 9/50
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

<keras.src.callbacks.history.History at 0x7fd6f0141580>

In [5]:
import numpy as np

# Predict user ratings
predicted_ratings = autoencoder.predict(ratings_scaled)
predicted_ratings = scaler.inverse_transform(predicted_ratings)

# Convert to Spark DataFrame
predicted_df = pd.DataFrame(predicted_ratings, columns=df_movies["title"])
spark_df_predicted = spark.createDataFrame(predicted_df)

# Save predictions to HDFS
spark_df_predicted.write.csv("hdfs://localhost:9000/predicted_ratings.csv", header=True, mode="overwrite")

print("Predicted ratings saved to HDFS.")


[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step 




Predicted ratings saved to HDFS.


                                                                                

In [6]:
def recommend_movies(user_id, top_n=5):
    user_ratings = predicted_ratings[user_id]
    top_movies = np.argsort(user_ratings)[-top_n:][::-1]
    return df_movies.iloc[top_movies]["title"].values

# Example: Recommend movies for user 0
print("Recommended movies:", recommend_movies(0))


Recommended movies: ['Companion' 'Flight Risk' 'Dog Man' 'Mufasa: The Lion King'
 'Venom: The Last Dance']
