In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.4-py2.py3-none-any.whl size=317849765 sha256=41788aa65a2ed0a278d1670b01b304a4c44ced04554b2619cf21e8c5ca963b73
  Stored in directory: /root/.cache/pip/wheels/d9/1c/98/31e395a42d1735d18d42124971ecbbade844b50bb9845b6f4a
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.4


In [2]:
!pip install torch-geometric

Collecting torch-geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m24.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch-geometric
Successfully installed torch-geometric-2.6.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, isnan, when, count
import pandas as pd
import numpy as np
import torch
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from torch.nn import Linear, ReLU, Dropout, BatchNorm1d
from sklearn.model_selection import train_test_split

In [4]:
spark = SparkSession.builder.appName('MovieLens') \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.rapids.sql.explain", "ALL") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.rapids.memory.pinnedPool.size", "2G") \
    .getOrCreate()

In [5]:
PATH = "/kaggle/input/movielens100k/"

In [6]:
ratings_path = PATH + "ratings.csv"
movies_path = PATH + "movies.csv"

ratings_df = spark.read.csv(ratings_path, header=True, inferSchema=True)
movies_df = spark.read.csv(movies_path, header=True, inferSchema=True)

In [7]:
print(ratings_df.count())
print(movies_df.count())

100004
9125


In [8]:
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [9]:
movies_df = movies_df.drop('genres')

In [10]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [11]:
ratings_df = ratings_df.drop('timestamp')

In [12]:
missing_values = ratings_df.select(
    [count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in ratings_df.columns]
)

missing_values.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|      0|     0|
+------+-------+------+



In [13]:
missing_values = movies_df.select(
    [count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in movies_df.columns]
)

missing_values.show()

+-------+-----+
|movieId|title|
+-------+-----+
|      0|    0|
+-------+-----+



In [14]:
user_ids = ratings_df.select("userId").distinct()
movie_ids = ratings_df.select("movieId").distinct()

user_map = {row.userId: idx for idx, row in enumerate(user_ids.collect())}
movie_map = {row.movieId: idx + len(user_map) for idx, row in enumerate(movie_ids.collect())}

edges = ratings_df.rdd.map(lambda row: [user_map[row.userId], movie_map[row.movieId]]).collect()
edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()

num_users = len(user_map)
num_movies = len(movie_map)
node_features = torch.eye(num_users + num_movies)

ratings = ratings_df.select("rating").rdd.map(lambda row: row.rating).collect()
edge_weight = torch.tensor(ratings, dtype=torch.float)
edge_weight = (edge_weight - edge_weight.min()) / (edge_weight.max() - edge_weight.min())

data = Data(x=node_features, edge_index=edge_index, edge_attr=edge_weight)

class GNNRecommender(torch.nn.Module):
    def __init__(self, num_features, hidden_channels):
        super(GNNRecommender, self).__init__()
        
        self.conv1 = GCNConv(num_features, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        
        self.bn1 = BatchNorm1d(hidden_channels)
        self.bn2 = BatchNorm1d(hidden_channels)
        self.bn3 = BatchNorm1d(hidden_channels)
        
        self.dropout = Dropout(0.3)
        
        self.lin1 = Linear(hidden_channels, hidden_channels // 2)
        self.lin2 = Linear(hidden_channels // 2, 1)
        
        self.relu = ReLU()

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        
        x = self.dropout(x)
        
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout(x)
       
        x = self.conv3(x, edge_index)
        x = self.bn3(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.lin1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.lin2(x)
        return x

model = GNNRecommender(num_features=data.num_node_features, hidden_channels=64)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
criterion = torch.nn.HuberLoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=10, factor=0.5)

def train():
    model.train()
    optimizer.zero_grad()
    out = model(data)
    loss = criterion(out[data.edge_index[0]].squeeze(), data.edge_attr)
    loss.backward()
    optimizer.step()
    return loss.item()

def calculate_metrics(pred, true):
    pred = pred.squeeze().detach().cpu()
    true = true.detach().cpu()
    rmse = torch.sqrt(torch.mean((pred - true) ** 2))
    mae = torch.mean(torch.abs(pred - true))
    return rmse.item(), mae.item()

def test():
    model.eval()
    with torch.no_grad():
        out = model(data)
        pred = out[data.edge_index[0]]
        rmse, mae = calculate_metrics(pred, data.edge_attr)
        return pred, rmse, mae

best_rmse = float('inf')
patience = 0
max_patience = 20

for epoch in range(200):
    loss = train()
    pred, rmse, mae = test()
    
    scheduler.step(rmse)
    
    print(f"Epoch {epoch}, Loss: {loss:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")
    
    if rmse < best_rmse:
        best_rmse = rmse
        patience = 0
        torch.save(model.state_dict(), 'best_model.pt')
    else:
        patience += 1
        
    if patience >= max_patience:
        print("Early stopping!")
        break

# En iyi modeli yükle
model.load_state_dict(torch.load('best_model.pt'))

Epoch 0, Loss: 0.2614, RMSE: 0.6552, MAE: 0.6130
Epoch 1, Loss: 0.2085, RMSE: 0.6444, MAE: 0.6017
Epoch 2, Loss: 0.1787, RMSE: 0.6342, MAE: 0.5909
Epoch 3, Loss: 0.1445, RMSE: 0.6237, MAE: 0.5799
Epoch 4, Loss: 0.1277, RMSE: 0.6071, MAE: 0.5627
Epoch 5, Loss: 0.1075, RMSE: 0.5858, MAE: 0.5415
Epoch 6, Loss: 0.0916, RMSE: 0.5609, MAE: 0.5166
Epoch 7, Loss: 0.0727, RMSE: 0.5345, MAE: 0.4900
Epoch 8, Loss: 0.0613, RMSE: 0.5111, MAE: 0.4662
Epoch 9, Loss: 0.0560, RMSE: 0.4906, MAE: 0.4457
Epoch 10, Loss: 0.0506, RMSE: 0.4638, MAE: 0.4188
Epoch 11, Loss: 0.0470, RMSE: 0.4400, MAE: 0.3943
Epoch 12, Loss: 0.0470, RMSE: 0.4196, MAE: 0.3730
Epoch 13, Loss: 0.0449, RMSE: 0.3936, MAE: 0.3489
Epoch 14, Loss: 0.0484, RMSE: 0.3763, MAE: 0.3329
Epoch 15, Loss: 0.0558, RMSE: 0.3600, MAE: 0.3174
Epoch 16, Loss: 0.0573, RMSE: 0.3464, MAE: 0.3041
Epoch 17, Loss: 0.0554, RMSE: 0.3345, MAE: 0.2920
Epoch 18, Loss: 0.0553, RMSE: 0.3258, MAE: 0.2835
Epoch 19, Loss: 0.0447, RMSE: 0.3206, MAE: 0.2786
Epoch 20, 

  model.load_state_dict(torch.load('best_model.pt'))


<All keys matched successfully>

In [15]:
print(model)

GNNRecommender(
  (conv1): GCNConv(9737, 64)
  (conv2): GCNConv(64, 64)
  (conv3): GCNConv(64, 64)
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout): Dropout(p=0.3, inplace=False)
  (lin1): Linear(in_features=64, out_features=32, bias=True)
  (lin2): Linear(in_features=32, out_features=1, bias=True)
  (relu): ReLU()
)


In [16]:
def get_recommendations(user_id, top_k=10):
    model.eval()
    
    internal_user_id = user_map[user_id]
    
    rated_movies = set(ratings_df.filter(ratings_df.userId == user_id)
                      .select("movieId")
                      .rdd.map(lambda x: x.movieId)
                      .collect())
    
    all_movies = set(movie_map.keys())
    unwatched_movies = all_movies - rated_movies
    
    test_edges = [[internal_user_id, movie_map[movie_id]] for movie_id in unwatched_movies]
    test_edge_index = torch.tensor(test_edges, dtype=torch.long).t().contiguous()
    
    with torch.no_grad():
        test_data = Data(x=data.x, edge_index=test_edge_index)
        predictions = model(test_data)
    
    predictions = predictions.squeeze()
    original_min = edge_weight.min()
    original_max = edge_weight.max()
    predictions = predictions * (original_max - original_min) + original_min
    
    movie_predictions = list(zip(unwatched_movies, predictions.tolist()))
    
    top_recommendations = sorted(movie_predictions, key=lambda x: x[1], reverse=True)[:top_k]
    
    recommended_movies = []
    for movie_id, pred_rating in top_recommendations:
        movie_info = movies_df.filter(movies_df.movieId == movie_id).select("title").first()
        movie_title = movie_info.title if movie_info else f"Unknown Movie ({movie_id})"
        
        recommended_movies.append({
            'movie_id': movie_id,
            'title': movie_title,
            'predicted_rating': round(pred_rating, 2)
        })
    
    return recommended_movies

user_id = 2
recommendations = get_recommendations(user_id, top_k=10)

print(f"\nTop 10 Movie Recommendations (User ID: {user_id}):")
for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec['title']} (Estimated Rating: {rec['predicted_rating']:.2f})")


Top 10 Movie Recommendations (User ID: 2):
1. In the Mouth of Madness (1995) (Estimated Rating: 0.48)
2. Cliffhanger (1993) (Estimated Rating: 0.48)
3. Wallace & Gromit: A Close Shave (1995) (Estimated Rating: 0.48)
4. Favor, The (1994) (Estimated Rating: 0.48)
5. Pyromaniac's Love Story, A (1995) (Estimated Rating: 0.48)
6. Flower of My Secret, The (La flor de mi secreto) (1995) (Estimated Rating: 0.48)
7. Run of the Country, The (1995) (Estimated Rating: 0.48)
8. Fresh (1994) (Estimated Rating: 0.48)
9. Something to Talk About (1995) (Estimated Rating: 0.48)
10. Manhattan Murder Mystery (1993) (Estimated Rating: 0.48)
