In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from gensim.models import Word2Vec
from sklearn.cluster import MiniBatchKMeans
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import lit
from sklearn.preprocessing import LabelEncoder
import os
import joblib

# ----------- Step 1: Spark Session Initialization -----------
spark = SparkSession.builder \
    .appName("NewsRecommendationALS") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.hadoop.hadoop.security.authentication", "simple") \
    .config("spark.hadoop.hadoop.security.authorization", "false") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")  # Reduce log verbosity
print("Spark session created successfully!")

save_dir = '/Users/n7/Desktop/ie University SAMBD Acadamics/Capstone Project/Machine Learning Codes/Trained Models'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# ----------- Step 2: Load News and Behavior Datasets in Batches -----------
batch_size = 2000  # Reduced batch size to minimize memory issues

news_file_path = '/Users/n7/Desktop/ie University SAMBD Acadamics/Capstone Project/Data/MINDlarge_train/Cleaned Datasets/News_cleaned.csv'
news_columns = [
    "News ID", "Category", "Subcategory", "Title", "Abstract", "URL", "Entities Mentioned", "Entities in Abstract"
]
news_df_iterator = pd.read_csv(news_file_path, sep=',', names=news_columns, chunksize=batch_size)

behavior_file_path = '/Users/n7/Desktop/ie University SAMBD Acadamics/Capstone Project/Data/MINDlarge_train/Cleaned Datasets/cleaned_behavior_dataset.csv'
behavior_columns = [
    "Impression ID", "User ID", "Timestamp", "Displayed News List", "Impression List (Clicked Status)",
    "Impression Dictionary", "Clicked News IDs", "Not-Clicked News IDs"
]
behavior_df_iterator = pd.read_csv(behavior_file_path, sep=',', names=behavior_columns, chunksize=batch_size)

# ----------- Step 3: Initialize Label Encoders for User ID and News ID -----------
user_encoder = LabelEncoder()
news_encoder = LabelEncoder()

# Fit the label encoders on complete datasets (to ensure consistency across batches)
user_ids = []
news_ids = []
for behavior_df in behavior_df_iterator:
    user_ids.extend(behavior_df['User ID'].unique())
    news_ids.extend(behavior_df['Clicked News IDs'].str.split(',').explode().dropna().unique())
    news_ids.extend(behavior_df['Not-Clicked News IDs'].str.split(',').explode().dropna().unique())

user_encoder.fit(user_ids)
news_encoder.fit(news_ids)

# Restart iterators after fitting label encoders
behavior_df_iterator = pd.read_csv(behavior_file_path, sep=',', names=behavior_columns, chunksize=batch_size)
news_df_iterator = pd.read_csv(news_file_path, sep=',', names=news_columns, chunksize=batch_size)

# ----------- Step 4: Counter Function -----------
# Simple batch counter function
def batch_counter(start=0):
    count = start
    while True:
        yield count
        count += 1

# Create an instance of the counter generator
counter = batch_counter()

# Initialize MiniBatchKMeans for incremental learning
best_k = 70
mini_batch_kmeans = MiniBatchKMeans(n_clusters=best_k, batch_size=500, random_state=42)

# Initialize Word2Vec model for incremental training
word2vec_model = Word2Vec(vector_size=100, window=5, min_count=2, workers=4)

# To accumulate ALS training data across batches
combined_behavior_spark_df = None

# ----------- Step 5: Process Each Batch -----------
for news_df, behavior_df in zip(news_df_iterator, behavior_df_iterator):
    
    batch_number = next(counter)
    print(f"Processing batch number: {batch_number}")
    
    # ----------- Preprocess the News Dataset (for Content-Based Filtering) -----------
    news_df['Text'] = news_df['Category'] + " " + news_df['Subcategory'] + " " + news_df['Title'] + " " + news_df['Abstract']

    # Incremental TF-IDF Vectorizer (requires a global vectorizer initialized once, then updated with each batch)
    vectorizer = TfidfVectorizer(max_features=5000)
    tfidf_matrix = vectorizer.fit_transform(news_df['Text'])
    cosine_sim_matrix = cosine_similarity(tfidf_matrix, tfidf_matrix)

    # ----------- Word2Vec for News Embedding (Incremental Training) -----------
    sentences = [text.split() for text in news_df['Text']]
    word2vec_model.build_vocab(sentences, update=True)
    word2vec_model.train(sentences, total_examples=len(sentences), epochs=5)

    def get_article_embedding(text):
        words = text.split()
        word_vecs = [word2vec_model.wv[word] for word in words if word in word2vec_model.wv]
        return np.mean(word_vecs, axis=0) if word_vecs else np.zeros(100)

    news_df['Article Embedding'] = news_df['Text'].apply(get_article_embedding)

    # ----------- Mini-Batch KMeans Clustering (Incremental Update) -----------
    news_embeddings = np.vstack(news_df['Article Embedding'].values)
    mini_batch_kmeans.partial_fit(news_embeddings)

    joblib.dump(mini_batch_kmeans, os.path.join(save_dir, f'mini_batch_kmeans_news_model.pkl'))
    print("Mini-Batch KMeans Model Updated and Saved successfully")

    # ----------- ALS (Collaborative Filtering) for Clicked and Not-Clicked News -----------
    clicked_df = behavior_df[['User ID', 'Clicked News IDs']].copy()
    clicked_df = clicked_df.assign(Clicked_News=clicked_df['Clicked News IDs'].str.split(',')).explode('Clicked_News').drop(columns='Clicked News IDs')
    clicked_df['Clicked_News'] = clicked_df['Clicked_News'].astype(str)

    not_clicked_df = behavior_df[['User ID', 'Not-Clicked News IDs']].copy()
    not_clicked_df = not_clicked_df.assign(Not_Clicked_News=not_clicked_df['Not-Clicked News IDs'].str.split(',')).explode('Not_Clicked_News').drop(columns='Not-Clicked News IDs')
    not_clicked_df['Not_Clicked_News'] = not_clicked_df['Not_Clicked_News'].astype(str)

    # Encode User ID and News ID using the fitted label encoders
    clicked_df['User ID'] = user_encoder.transform(clicked_df['User ID'])
    clicked_df['Clicked_News'] = news_encoder.transform(clicked_df['Clicked_News'])
    not_clicked_df['User ID'] = user_encoder.transform(not_clicked_df['User ID'])
    not_clicked_df['Not_Clicked_News'] = news_encoder.transform(not_clicked_df['Not_Clicked_News'])

    clicked_spark_df = spark.createDataFrame(clicked_df.dropna())
    not_clicked_spark_df = spark.createDataFrame(not_clicked_df.dropna())

    clicked_spark_df = clicked_spark_df.withColumn('rating', lit(1.0))
    not_clicked_spark_df = not_clicked_spark_df.withColumn('rating', lit(0.0))

    if combined_behavior_spark_df is None:
        combined_behavior_spark_df = clicked_spark_df.union(
            not_clicked_spark_df.withColumnRenamed('Not_Clicked_News', 'Clicked_News')
        ).withColumnRenamed('Clicked_News', 'News ID')
    else:
        batch_behavior_spark_df = clicked_spark_df.union(
            not_clicked_spark_df.withColumnRenamed('Not_Clicked_News', 'Clicked_News')
        ).withColumnRenamed('Clicked_News', 'News ID')
        combined_behavior_spark_df = combined_behavior_spark_df.union(batch_behavior_spark_df)

    # ALS model initialization and progressive training (retraining with accumulated data)
    als = ALS(userCol="User ID", itemCol="News ID", ratingCol="rating", implicitPrefs=True, coldStartStrategy="drop",
              rank=10, maxIter=10, regParam=0.1)

    try:
        als_model = als.fit(combined_behavior_spark_df)
        als_model.save(os.path.join(save_dir, 'als_model_incremental.pkl'))
        print("ALS Model Updated and Saved successfully")
    except Exception as e:
        print(f"Failed to train ALS model for batch {batch_number}: {e}")

    # Save other models for future use
    joblib.dump(vectorizer, os.path.join(save_dir, 'tfidf_vectorizer.pkl'))
    print("TF-IDF Vectorizer Model Saved successfully")
    word2vec_model.save(os.path.join(save_dir, 'word2vec_model.model'))
    print("Word2Vec Model Updated and Saved successfully")
    print("All Models Updated and Saved successfully")

    print(f"Finished processing batch number: {batch_number}\n")
    
print("Training Completed and models were saved successfully")
# ----------- End -----------