In [None]:
from pyspark.sql import SparkSession
import re
import numpy as np
from pyspark.sql.functions import concat_ws, col, udf, expr, monotonically_increasing_id, regexp_replace, split, row_number, lit, array_distinct, explode, lower
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.types import *
from collections import Counter
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import nltk
nltk.download('wordnet')
global stop_words
stop_words = set(stopwords.words('english'))

In [None]:
spark = SparkSession.builder \
    .appName('Movie Recommendation System') \
    .master("local[*]") \
    .config("spark.executor.memory", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "120s") \
    .config("spark.executor.cores", "16") \
    .config("spark.num.executors", "16") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "100000")
spark.conf.set("spark.sql.shuffle.partitions", "200")

sc = spark.sparkContext

In [None]:
# Check Spark configuration for cores and memory
print("Master:", spark.sparkContext.master)  # Should print 'local[*]' or 'local[N]'
print("Executor Memory:", spark.sparkContext.getConf().get("spark.executor.memory", "Not Set"))
print("Driver Memory:", spark.sparkContext.getConf().get("spark.driver.memory", "Not Set"))
print("Number of Partitions:", spark.sparkContext.defaultParallelism)

In [None]:
data = spark.read.csv('./Data/TMDB_all_movies.csv', inferSchema=True, header=True)

In [None]:
# Step 1: Data generator
def data_generator(dataframe, chunk_size):
    dataframe = dataframe.dropna(subset=['title'])
    dataframe = dataframe.coalesce(16)
    drop_cols = [
        'id', 
        'vote_average', 
        'vote_count', 
        'status', 
        'release_date', 
        'revenue', 
        'runtime', 
        'budget', 
        'imdb_id', 
        'original_language', 
        'original_title', 
        'popularity', 
        'imdb_rating', 
        'imdb_votes', 
        'poster_path', 
        'tagline', 
        'music_composer', 
        'director_of_photography'
        ]
    
    dataframe = dataframe.drop(*drop_cols)
    w = Window().orderBy(lit('A'))
    dataframe = dataframe.withColumn('id', row_number().over(w))
    rows = dataframe.count()
    
    for i in range(0, rows, chunk_size):
        print(f"{i} to {i+chunk_size}")
        chunk = dataframe.filter((col("id")>=i) & (col("id")<(i+chunk_size)))
        # Overwrite the existing "Value" column with the transformed data
        chunk = chunk.withColumn("production_companies", regexp_replace(chunk["production_companies"], " ", ""))
        chunk = chunk.withColumn("cast", regexp_replace(regexp_replace(chunk["cast"], " ", ""), ",", " "))
        chunk = chunk.withColumn("director", regexp_replace(chunk["director"], " ", ""))
        chunk = chunk.withColumn("writers", regexp_replace(chunk["writers"], " ", ""))
        chunk = chunk.withColumn("producers", regexp_replace(chunk["producers"], " ", ""))
        chunk = chunk.withColumn("production_countries", regexp_replace(chunk["production_countries"], " ", ""))
        # chunk = chunk.withColumn("genres", regexp_replace(chunk["genres"], " ", ""))
    
        chunk = chunk.withColumn("Tags", concat_ws(" ", chunk["title"], chunk["overview"], chunk["genres"], chunk["production_companies"], chunk["production_countries"], chunk["spoken_languages"], chunk["cast"], chunk["director"], chunk["writers"], chunk["producers"]))

        chunk = chunk.select('id', 'title', 'Tags')

        chunk = chunk.withColumn("Tags", split(lower(regexp_replace(chunk["Tags"], r"[^\p{L}\s]", "")), " "))
        chunk.cache()
        yield chunk
        chunk.unpersist()

In [None]:
# Step 2: Custom CountVectorizer with minDF, maxDF, and top k
def word_occurrence(generator, input_col):
    # Step 2.1: Count word occurrences
    # word_in_corpus_freq = Counter() # How many times is each word ocurring in the whole corpus
    word_in_doc_freq = Counter() # How many documents include each word
    
    emp_RDD = sc.emptyRDD()
    columns = StructType([StructField('id', IntegerType(), False),
                          StructField('title', StringType(), False),
                          StructField('Tags', ArrayType(StringType()), False),])
    df = spark.createDataFrame(data=emp_RDD, schema=columns)
    total_docs = 0
    for chunk in generator:
        df = df.union(chunk)
        word_counts = chunk.select(explode(array_distinct(input_col))).rdd \
            .map(lambda x: (x,1)) \
                .reduceByKey(lambda x, y: x+y)

        word_counts_collected = dict(word_counts.collect())
        # word_in_corpus_freq.update(word_counts_collected)
        word_in_doc_freq.update(word_counts_collected)
        print(f"Updated Vocab Length: {len(word_in_doc_freq)}")
    
    df_preprocessed = df.repartition(32)

    return df_preprocessed, word_in_doc_freq

In [None]:
# Step 3: Apply the custom CountVectorizer
chunk_size = 100000
generator = data_generator(data, chunk_size)
vocab_size = 100000
data_processed, vocab = word_occurrence(generator, "Tags")

In [None]:
data_processed.filter(col("title")=="Ariel").show(truncate=False)

In [None]:
data.show(5, truncate=False)

In [None]:
len(vocab)

In [None]:
def filter_vocab(word_counts, docs_count, minDF, maxDF, vocab_size):
    # Step 2.2: Apply minDF and maxDF
    word_counts = dict(word_counts)
    filtered_vocab = {
        word.col: count for word, count in word_counts.items()
        if ((count / docs_count) >= minDF) and ((count / docs_count) <= maxDF) and (word.col not in stop_words)
    }
    print(f"Vocab Length after Filtering: {len(filtered_vocab.keys())}")
    # Step 2.3: Select top k words by frequency
    top_vocab = [word for word, count in sorted(filtered_vocab.items(), key=lambda x: -x[1])][:vocab_size]
    word_index = {word: idx for idx, word in enumerate(top_vocab)}
    
    return word_index