In [65]:
# Standard library imports
import os
import warnings

# Data processing and numerical libraries
import numpy as np
import pandas as pd
import scipy.sparse as sp
import dask.dataframe as dd
from sklearn.model_selection import train_test_split

# Visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Machine learning and recommendation libraries
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml import Pipeline
from transformers import AutoTokenizer, AutoModel

# MLflow for experiment tracking
import mlflow

# IPython for displaying outputs
from IPython.display import display

# Suppress warnings
warnings.filterwarnings('ignore')


In [3]:
# # split data once - no temporal features
# train, temp = spark_df.randomSplit([0.75, 0.25], seed=42)
# val, test = temp.randomSplit([.15, .10], seed=42)
# # 

# # define file paths (relative to the current directory)
# model_data_path = "../../data/interim/"

# # save each DataFrame in parquet format
# train.write.parquet(os.path.join(model_data_path, f"train_set.parquet"), mode='overwrite')
# val.write.parquet(os.path.join(model_data_path, f"val_set.parquet"), mode='overwrite')
# test.write.parquet(os.path.join(model_data_path, f"test_set.parquet"), mode='overwrite')

# from pyspark.sql import SparkSession

# ========================================
# Open sessions for necessary packages
# ========================================
# spark = None

# def open_session(close=False):
#     global spark  # 
#     if not close:
#         if spark is None or spark.sparkContext is None:
#             spark = SparkSession.builder \
#                 .appName("ALS in Spark") \
#                 .getOrCreate()
#             # set up MLflow (only needs to be done once)
#     else:
#         if spark is not None:
#             spark.stop()
#             spark = None
            
# # open_session()
# open_session(close=True)

In [42]:
# READ DATA
# model_data_path = "../data/interim/"

# train = spark.read.parquet(os.path.join(model_data_path, "train_set.parquet")).toPandas()
# val = spark.read.parquet(os.path.join(model_data_path, "val_set.parquet")).toPandas()
# test = spark.read.parquet(os.path.join(model_data_path, "test_set.parquet")).toPandas()

# train.repartition(10)


In [31]:
# # define PySpark ALS model
# als = ALS(
#     userCol="user_index",
#     itemCol="bus_index",
#     ratingCol="rating",
#     coldStartStrategy="drop"
# )


# # # Grid search through hyperparameters
# # paramGrid = (ParamGridBuilder()
# #              .addGrid(als.rank, [5, 10, 15])
# #              .addGrid(als.maxIter, [5, 10, 20])
# #              .addGrid(als.regParam, [0.01, 0.1, 0.5])
# #              .build())

# # define criterion
# evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

# # define cross-validation w simple grid
# crossval = CrossValidator(
#     estimator=als,
#     evaluator=evaluator,
#     estimatorParamMaps=paramGrid,
#     numFolds=1
# )

In [38]:
# rdd = train.rdd
# partitions = rdd.glom().collect()
# for index, partition in enumerate(partitions):
#     print(f"Partition {index} contains {len(partition)} rows.")
#     if len(partition) > 0:
#         print(f"Sample data from partition {index}: {partition[:5]}")

In [39]:

# open log 
# mlflow.set_experiment("ALS_Hyperparameter_Tuning")

# log results
# with mlflow.start_run():
    
    # fit model using cross-validation
    # cv_model = crossval.fit(train)
    
    # Log the best model
    # best_model = cv_model.bestModel
    # mlflow.spark.log_model(best_model, "best_model")
    
    # # Log metrics and model parameters for each parameter combination
    # for param_map, metric in zip(crossval.getEstimatorParamMaps(), cv_model.avgMetrics):
    #     rank = param_map[als.rank]
    #     regParam = param_map[als.regParam]
    #     maxIter = param_map[als.maxIter]
        
    #     mlflow.log_param("rank", rank)
    #     mlflow.log_param("regParam", regParam)
    #     mlflow.log_param("maxIter", maxIter)
    #     mlflow.log_metric("validation_rmse", metric)

    # # Log validation scores
    # validation_predictions = best_model.transform(val)
    # validation_rmse = evaluator.evaluate(validation_predictions)
    # mlflow.log_metric("validation_rmse", validation_rmse)


# # Log test metrics (optional, after final model selection)
# test_predictions = best_model.transform(test)
# test_rmse = evaluator.evaluate(test_predictions)
# mlflow.log_metric("test_rmse", test_rmse)


In [48]:
# # Define paths
# model_data_path = "../data/interim/"

# # read data and conver to pandas df
# train = spark.read.parquet(os.path.join(model_data_path, "train_set.parquet")).toPandas()
# val = spark.read.parquet(os.path.join(model_data_path, "val_set.parquet")).toPandas()
# test = spark.read.parquet(os.path.join(model_data_path, "test_set.parquet")).toPandas()

# def convert_to_tensors(df):
#     ratings_tensor = torch.tensor(df['rating'].values, dtype=torch.float32)
#     user_id_tensor = torch.tensor(df['user_index'].values, dtype=torch.int64)
#     bus_id_tensor = torch.tensor(df['bus_index'].values, dtype=torch.int64)
#     return ratings_tensor, user_id_tensor, bus_id_tensor

# def save_tensors(prefix, **tensors):
#     for name, tensor in tensors.items():
#         file_path = os.path.join(model_data_path, f'{prefix}_{name}.pt')
#         torch.save(tensor, file_path)

# # Convert data to tensors
# train_tensors = convert_to_tensors(train)
# val_tensors = convert_to_tensors(val)
# test_tensors = convert_to_tensors(test)

# # Save tensors
# save_tensors('train', ratings=train_tensors[0], user_id=train_tensors[1], bus_id=train_tensors[2])
# save_tensors('val', ratings=val_tensors[0], user_id=val_tensors[1], bus_id=val_tensors[2])
# save_tensors('test', ratings=test_tensors[0], user_id=test_tensors[1], bus_id=test_tensors[2])

In [None]:
mac_data_path = "../data/interim/"

data_path = mac_data_path



In [102]:
mac_data_path = "../data/processed/Final Dataframes"

data_path = mac_data_path

# # Check if Mac GPU is available
# device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

# Check if nvidia gpu is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f'Using device: {device}')

# df = pd.read_parquet(os.path.join(mac_data_path, f"final_df.parquet"))

# train, val_test = train_test_split(df, test_size=.20, shuffle=True)
# val, test = train_test_split(val_test, test_size=.5, shuffle=True)


Using device: cpu


In [141]:
class CFDataset(Dataset):
    def __init__(self, dataframe):
        self.features = {
            'user_id': torch.tensor(dataframe['user_num_id'].values, dtype=torch.long),
            'business_id': torch.tensor(dataframe['bus_num_id'].values, dtype=torch.long),
            'city_id': torch.tensor(dataframe['city_code'].values, dtype=torch.long),
            'state_id': torch.tensor(dataframe['state_code'].values, dtype=torch.long),
            'region_id': torch.tensor(dataframe['region_code'].values, dtype=torch.long),
            'dotw': torch.tensor(dataframe['day_of_week'].values, dtype=torch.long),
            'doty': torch.tensor(dataframe['day_of_year'].values, dtype=torch.long),
            'numerical_features': torch.tensor(dataframe[['user_avg_rating_norm', 
                                                          'bus_avg_rating_norm', 
                                                          'log_business_review_count_norm', 
                                                          'log_user_review_count_norm', 
                                                          'years_yelp_member_norm', 
                                                          'years_since_review_norm']]
                                               .values,
                                               dtype=torch.float),
            'tokens': torch.tensor(dataframe['tokens'].tolist(), dtype=torch.long)
        }
        self.target = torch.tensor(dataframe['mean_centered_rating'].values, dtype=torch.float)

    def __len__(self):
        return len(self.target)

    def __getitem__(self, idx):
        return {k: v[idx] for k, v in self.features.items()}, self.target[idx]

In [95]:
# Concat user and business data into one tensor
total_user_ids = torch.concat((train.users, val.users, test.users), dim=0)
total_business_ids = torch.concat((train.businesses, val.businesses, test.businesses), dim=0)

# get number of unique entities
num_unique_users = torch.unique(total_user_ids).shape[0] 
num_unique_bus = torch.unique(total_business_ids).shape[0]

# output findings
print(f"Number of unique users: {num_unique_users}")
print(f"Number of unique business's : {num_unique_bus}")

Number of unique users: 287116
Number of unique business's : 148523


In [113]:
# Check if nvidia gpu is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

Device: cpu


In [211]:
class CFmodel(nn.Module):
    def __init__(self, rank=9, num_users=287116, num_bus=148523, num_city=1273, num_regions=11, num_states=50, token_length=10):
        super().__init__()
        self.rank = rank
        self.user_emb = nn.Embedding(num_users, rank)
        self.bus_emb = nn.Embedding(num_bus, rank)
        self.city_emb = nn.Embedding(num_city, rank//3)
        self.state_emb = nn.Embedding(num_states, rank//3)
        self.region_emb = nn.Embedding(num_regions, rank//3)
        self.dotw = nn.Embedding(7, rank)
        self.doty = nn.Embedding(367, rank)
        self.fc1 = nn.Linear(rank, 1)
        self.token_len = token_length
        
        # add a layer for numerical features
        self.numerical_layer = nn.Linear(6, rank)  # 6 is the number of numerical features
    
    def forward(self, **kwargs):
        user_emb = self.user_emb(kwargs['user_id'])     
        bus_emb = self.bus_emb(kwargs['business_id']) 
        
        city_emb = self.city_emb(kwargs['city_id'])
        state_emb = self.state_emb(kwargs['state_id'])
        region_emb = self.region_emb(kwargs['region_id'])
        
        location = torch.cat((city_emb, state_emb, region_emb), dim=1)
        
        dotw = self.dotw(kwargs['dotw'])
        doty = self.doty(kwargs['doty'])
        
        numerical = self.numerical_layer(kwargs['numerical_features'])
        
        product = user_emb * bus_emb
        
        prod_loc = product + location + dotw + doty + numerical
        result = self.fc1(prod_loc)
        
        return result.squeeze()

First iteration of the model, I will be just using cosine similiarity and storing results in mlflow. 

In [214]:
# # Hyperparameters
rank = 10
batch_size = 64 
num_epochs = 1

train_dataset = CFDataset(train)
val_dataset = CFDataset(val)
test_dataset = CFDataset(test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)



In [215]:
# initialize model and define optimization

model = CFmodel()
model.to(device)

criterion = nn.MSELoss()

optimizer = torch.optim.Adam(
    model.parameters(),    
    lr=0.001,              
    betas=(0.9, 0.999),    
    eps=1e-08,             
    weight_decay=.01 
)

In [219]:

# Initialize W&B
wandb.init(project='Initial Model')  # Replace with your project and username


ModuleNotFoundError: No module named 'wandb'

In [218]:

def to_device(data, device):
    if isinstance(data, dict):
        return {k: v.to(device) for k, v in data.items()}
    return data.to(device)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CFmodel().to(device)
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.MSELoss()

# Training loop
for epoch in range(num_epochs):
    model.train()
    num_iter = 0
    for features, target in train_loader:
        features = to_device(features, device)
        target = to_device(target, device)
        
        optimizer.zero_grad()
        pred = model(**features)
        loss = criterion(pred, target)
        loss.backward()
        optimizer.step()
        
        if num_iter % 1000 == 0:
            # Validation loop
            model.eval()
            with torch.no_grad():
                val_loss = 0
                for features, target in val_loader:
                    features = to_device(features, device)
                    target = to_device(target, device)
                    
                    pred = model(**features)
                    val_loss += criterion(pred, target).item()
                
                val_loss /= len(val_loader)
            
            # Log metrics to W&B
            wandb.log({
                'epoch': epoch,
                'iteration': num_iter,
                'train_loss': loss.item(),
                'val_loss': val_loss
            })
            
            print(f"Epoch: {epoch}, Iter: {num_iter}, Train Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}")
        
        num_iter += 1

# Finish the W&B run
wandb.finish()

Iterations: 0, loss: 2.1258974075317383


KeyboardInterrupt: 

Exception ignored in: 'zmq.backend.cython._zmq.Frame.__del__'
Traceback (most recent call last):
  File "_zmq.py", line 160, in zmq.backend.cython._zmq._check_rc
KeyboardInterrupt: 


Iter:  <built-in function iter>, Train Loss: 2.1259, Val Loss: 2.5342
Iterations: 100, loss: 1.5003511905670166
Iter:  <built-in function iter>, Train Loss: 1.5004, Val Loss: 1.8582
Iterations: 200, loss: 1.8833609819412231
Iter:  <built-in function iter>, Train Loss: 1.8834, Val Loss: 1.6077
Iterations: 300, loss: 1.9653592109680176
Iter:  <built-in function iter>, Train Loss: 1.9654, Val Loss: 1.5065
Iterations: 400, loss: 1.2224197387695312


KeyboardInterrupt: 