In [64]:
from pyspark.sql import SparkSession
import re
from pyspark.mllib.recommendation import Rating
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row
from pyspark.sql.functions import explode, col

# Part A Q1
class EvenOddCounter:
    def __init__(self, appName, filePath):
        try:
            # Attempt to create a SparkSession
            self.spark = SparkSession.builder.appName(appName).getOrCreate()
            # If successful, store the filePath
            self.filePath = filePath
            print("SparkSession created successfully with appName:", appName)
        except Exception as e:
            # If an error occurs, print the error message
            print("Failed to create SparkSession:", e)
        
    #function for counting even and odd numbers in the file
    def countEvenOdd(self):
        # Load the text file into an RDD
        numbers_rdd = self.spark.sparkContext.textFile(self.filePath)

        # Define transformations to filter even and odd numbers
        even_numbers = numbers_rdd.flatMap(lambda line: line.split(" "))\
                                  .filter(lambda word: word.isdigit())\
                                  .map(lambda number: int(number))\
                                  .filter(lambda number: number % 2 == 0)
        odd_numbers = numbers_rdd.flatMap(lambda line: line.split(" "))\
                                 .filter(lambda word: word.isdigit())\
                                 .map(lambda number: int(number))\
                                 .filter(lambda number: number % 2 != 0)

        # Count even and odd numbers
        even_count = even_numbers.count()
        odd_count = odd_numbers.count()

        # Return the counts
        return even_count, odd_count
    
    #method to save counts to a txt file
    def saveCountsToFile(self, evenCount, oddCount, outputPath):
        # Save the counts to a specified text file
        with open(outputPath, 'w') as file:
            file.write(f"Even numbers count: {evenCount}\n")
            file.write(f"Odd numbers count: {oddCount}\n")
        print(f"Counts saved to {outputPath}")

    def stop(self):
        self.spark.stop()

In [4]:
filepath = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/integer.txt"
counter = EvenOddCounter("EvenOddCounter",filepath)
outputpath = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/Even_Odd_counts.tex"
even_count, odd_count = counter.countEvenOdd()
print(f"Even Counts: {even_count} \nOdd Counts: {odd_count}")
counter.saveCountsToFile(even_count, odd_count, outputpath)
counter.stop()

SparkSession created successfully with appName: EvenOddCounter
Even Counts: 514 
Odd Counts 496
Counts saved to /Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/Even_Odd_counts.tex


In [6]:
# Part A Q2
class DepartmentSalarySum:
    def __init__(self, appName, filePath):
        self.spark = SparkSession.builder.appName(appName).getOrCreate()
        self.filePath = filePath

    def calculateSalarySums(self):
        # Load the text file into an RDD
        lines_rdd = self.spark.sparkContext.textFile(self.filePath)

        # Map each line to a (department, salary) pair
        department_salary_rdd = lines_rdd.map(lambda line: line.split(" "))\
                                          .map(lambda parts: (parts[0], float(parts[1])))

        # Reduce by key (department) to sum the salaries
        department_sums_rdd = department_salary_rdd.reduceByKey(lambda a, b: a + b)

        # Collect the results as a list of tuples
        department_sums = department_sums_rdd.collect()

        return department_sums

    def saveDepartmentSumsToFile(self, departmentSums, outputPath):
        # Save the department sums to a specified text file
        with open(outputPath, 'w') as file:
            for department, sum in departmentSums:
                file.write(f"{department}: {sum}\n")
        print(f"Department salary sums saved to {outputPath}")

    def stop(self):
        self.spark.stop()
        

In [8]:
filepath2 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/salary.txt"
outputpath2 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/department_sums.tex"
salarycounter = DepartmentSalarySum("SalarySum",filepath2)
departmentsums = salarycounter.calculateSalarySums()
print(f"Departments and sums:\n{departmentsums}")
salarycounter.saveDepartmentSumsToFile(departmentsums,outputpath2)

Departments and sums:
[('Sales', 3488491.0), ('Research', 3328284.0), ('Developer', 3221394.0), ('QA', 3360624.0), ('Marketing', 3158450.0)]
Department salary sums saved to /Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/department_sums.tex


In [2]:
# Part A Q3
class WordCount:
    def __init__(self, appName, filePath):
        self.spark = SparkSession.builder.appName(appName).getOrCreate()
        self.filePath = filePath

    def CountWords(self, keyword_list):
        # Load the text file into an RDD
        lines_rdd = self.spark.sparkContext.textFile(self.filePath)
        
        # Split lines into words, also SPLIT PUNCTUATION so that words adjacent to punctuation are still counted!!
        # IE: 'WILLIAM.' will be counted even though 'WILLIAM' != 'WILLIAM.'
        words = lines_rdd.flatMap(lambda line: re.split(r'\W+', line))
        
        # Filter only words in keyword list
        keyword_pairs = words.filter(lambda word: word in keyword_list).map(lambda word: (word, 1))
        
        # Reduce (count) pairs
        keyword_counts = keyword_pairs.reduceByKey(lambda a, b: a + b)
        
        # Collect keyword and counts back to the driver program
        collected_pairs = keyword_counts.collect()
        
        return collected_pairs

        
    def saveWordCount(self, collected_pairs, outputPath):
        # Save the keyword counts to a txt file
        with open(outputPath, 'w') as file:
            for keyword, count in collected_pairs:
                file.write(f"{keyword}: {count}\n")
        print(f"Keyword counts saved to {outputPath}")

    def stop(self):
        self.spark.stop()

In [11]:
filepath3 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/shakespeare-2.txt" #input path
keywords = ["Shakespeare", "When", "Lord", "Library", "GUTENBERG", "WILLIAM", "COLLEGE", "WORLD"] #Keyword list
outputpath3 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/shakespeare_wordcounts.tex" #output path

word_counter = WordCount("WordCounter",filepath3)
collected_pairs = word_counter.CountWords(keywords)
print(f"Words and their counts:\n{collected_pairs}")
word_counter.saveWordCount(collected_pairs, outputpath3)
word_counter.stop()

24/02/25 12:58:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Words and their counts:
[('Shakespeare', 22), ('WILLIAM', 128), ('COLLEGE', 98), ('When', 406), ('GUTENBERG', 100), ('Library', 5), ('WORLD', 98), ('Lord', 402)]
Keyword counts saved to /Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/shakespeare_wordcounts.tex


In [12]:
# Part A Q4

class WordCount2:
    def __init__(self, appName, filePath):
        self.spark = SparkSession.builder.appName(appName).getOrCreate()
        self.filePath = filePath

    def CountWords(self, keyword_list):
        # Load the text file into an RDD
        lines_rdd = self.spark.sparkContext.textFile(self.filePath)
        
        # Split lines into words, also SPLIT PUNCTUATION so that words adjacent to punctuation are still counted!!
        # IE: 'WILLIAM.' will be counted even though 'WILLIAM' != 'WILLIAM.'
        words = lines_rdd.flatMap(lambda line: re.split(r'\W+', line.lower())).filter(lambda word: word != "") # make lowercase for case insensitivity
        
        # map words to single value
        keyword_pairs = words.map(lambda word: (word, 1))
        
        # Reduce (count) pairs
        keyword_counts = keyword_pairs.reduceByKey(lambda a, b: a + b)
        
        # Sort the counts in ascending order (for bottom 15) and then in descending order (for top 15)
        sorted_counts_asc = keyword_counts.sortBy(lambda pair: pair[1])
        sorted_counts_desc = keyword_counts.sortBy(lambda pair: pair[1], ascending=False)

        # Take the top 15 and bottom 15 entries
        top_15 = sorted_counts_desc.take(15)
        bottom_15 = sorted_counts_asc.take(15)
        
        # Print
        print("Top 15 words by count (case insensitive): \n", top_15)
        print("\n\nBottom 15 words by count (case insensitive): \n", bottom_15)
        
        return top_15, bottom_15

        
    def saveWordCount(self, top_15, bot_15, outputPath):
        # Save the keyword counts to a txt file
        with open(outputPath, 'w') as file:
            file.write("Top 15 words:\n")
            for keyword, count in top_15:
                file.write(f"{keyword}: {count}\n")
            file.write("Bottom 15 words:\n")
            for keyword, count in bottom_15:
                file.write(f"{keyword}: {count}\n")
            
        print(f"Keyword counts saved to {outputPath}")

    def stop(self):
        self.spark.stop()


In [13]:
filepath4 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/shakespeare-2.txt" #input path
outputpath4 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/top15_bottom15.tex" #output path

word_counter2 = WordCount2("WordCounter2",filepath4)
top_15, bottom_15 = word_counter2.CountWords(keywords)
word_counter2.saveWordCount(top_15, bottom_15, outputpath4)
word_counter2.stop()

Top 15 words by count (case insensitive): 
 [('the', 13717), ('and', 12974), ('i', 9874), ('of', 9382), ('to', 9351), ('a', 6725), ('you', 6154), ('my', 5699), ('that', 5409), ('in', 5341), ('d', 4369), ('is', 4344), ('not', 3917), ('for', 3887), ('with', 3817)]


Bottom 15 words by count (case insensitive): 
 [('restrictions', 1), ('online', 1), ('www', 1), ('org', 1), ('2011', 1), ('release', 1), ('january', 1), ('cdroms', 1), ('releases', 1), ('implications', 1), ('materials', 1), ('provisions', 1), ('proprietary', 1), ('underline', 1), ('punctuation', 1)]
Keyword counts saved to /Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/output_files/top15_bottom15.tex


In [66]:
# Part B
""" Note:
This is where I stopped creating output txt files since i figured there was no point, and that I can just use the
outputs from within this notebook, aswell as copy and paste them into my pdf document. In the future I will use
databricks, but I didn't get the memo this time, so I hope this notebook will suffice.
"""

class CreateMovieRecommender:
    def __init__(self, appName, filePath):
        self.spark = SparkSession.builder.appName(appName).getOrCreate()
        self.filePath = filePath
        
    def LoadData(self):
        # Load the CSV file into an RDD
        movie_data = self.spark.sparkContext.textFile(self.filePath)

        # Index each row and filter out the first row (header)
        header = movie_data.first()
        self.movie_data_parsed = movie_data.filter(lambda line: line != header).map(lambda line: line.split(","))
    
    # Part 1
    def GetHighestRatedMovie(self):
        # Map to (movieId, (rating, 1)) for summing and counting
        movie_ratings = self.movie_data_parsed.map(lambda tokens: (tokens[0], (float(tokens[1]), 1)))
        # Reduce by key to sum ratings and count
        movie_rating_totals = movie_ratings.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
        # Calculate average ratings
        movie_average_ratings = movie_rating_totals.mapValues(lambda total_count: total_count[0] / total_count[1])
        # Sort by average rating
        sorted_movies = movie_average_ratings.sortBy(lambda movie_rating: movie_rating[1], ascending=False)
        # Take the top movie
        highest_rated_movie = sorted_movies.first()
        return highest_rated_movie

    def GetUserWithHighestRatings(self):
        # Map to (userId, rating) for summing
        user_ratings = self.movie_data_parsed.map(lambda tokens: (tokens[2], float(tokens[1])))
        # Reduce by key to sum ratings
        user_rating_totals = user_ratings.reduceByKey(lambda a, b: a + b)
        # Sort by total rating
        sorted_users = user_rating_totals.sortBy(lambda user_rating: user_rating[1], ascending=False)
        # Take the top user
        highest_rating_user = sorted_users.first()
        return highest_rating_user
    
    # Part 2: Create a train-test split and then return a trained model (This class creates models, but they're stored externally.)
    def TrainModel(self, numIterations = 10, rank = 10, regParam = 0.01, testSplit = 0.2):
        
        # Create dataset as rating object from mllib library
        self.movie_data_ratings  = self.movie_data_parsed.map(lambda p: Row(userId=int(p[2]), movieId=int(p[0]),
                                         rating=float(p[1])))
        
        # Split data into train and test
        self.ratings = self.spark.createDataFrame(self.movie_data_ratings) #create ratings as spark dataframe
        (train_data, test_data) = self.ratings.randomSplit([1 - testSplit, testSplit], seed = 0) # use 0 seed
        
        # Create ALS Object
        als = ALS(maxIter=numIterations, rank = rank, regParam = regParam, userCol="userId", 
                  itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed = 0) # use 0 seed
        
        model = als.fit(train_data) #Train
        
        return model, test_data
    
    # Part 3
    def EvaluateModel(self, model, test_data):

        # Evaluate the model by getting predictions
        predictions = model.transform(test_data)
        
        # calculate RMSE
        rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                        predictionCol="prediction")
        rmse = rmse_evaluator.evaluate(predictions)
        print("Root-mean-square error = " + str(rmse))
        
        # calculate MAE
        mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="rating",
                                        predictionCol="prediction")
        mae = mae_evaluator.evaluate(predictions)
        print("Mean-absolute error = " + str(mae))
        
        return rmse, mae
    
    # Part 5
    def RecommendMovies(self, model, user_id, num_recommendations):
        # Recommend movies for the user
        
        # Generate top movie recommendations for specific user
        user = self.ratings.filter(self.ratings.userId == user_id) # Get user reviews
        userSubsetRecs = model.recommendForUserSubset(user, num_recommendations) # Get Recommendations

        print(f"Top {num_recommendations} for user {user_id}:") # Print:
        
        # Explode the recommendations to flatten the DataFrame
        recommendations = userSubsetRecs \
            .withColumn("recommendation", explode("recommendations")) \
            .select(
                col("userId"),
                col("recommendation.*")
            )

        # Show the recommendations
        recommendations.show()
        
        # Return the recommended movie IDs
        return userSubsetRecs
    
    def stop(self):
        self.spark.stop()

In [43]:
filepath5 = "/Users/declanbracken/Development/UofT_Projects/MIE_1628/A2/movies.csv" #input path

# Create movie recommendation building object
movie_recommender = CreateMovieRecommender("MovieRecommender",filepath5)

# Load the data
movie_recommender.LoadData()

# Part 1:
# Get the highest rated movies
highest_rated_movie = movie_recommender.GetHighestRatedMovie()
print(f"Highest rated movie ID: {highest_rated_movie[0]}, with average rating: {highest_rated_movie[1]}")

# Get the highest rated user
highest_rated_user = movie_recommender.GetUserWithHighestRatings()
print(f"User ID with highest rating: {highest_rated_user[0]}, total rating: {highest_rated_user[1]}")


# Part 2:
# Given the same model hyper-parameters, try 2 train-test splits:
# Train the model on an 80-20 train-test split
model8020, test_data_8020 = movie_recommender.TrainModel(rank = 10, numIterations = 10, testSplit = 0.2)
print("\n80-20 split performance:\n")
RMSE_8020, MAE_8020 = movie_recommender.EvaluateModel(model8020, test_data_8020)

# Train the model on a 70-30 train-test split
model7030, test_data_7030 = movie_recommender.TrainModel(rank = 10, numIterations = 10, testSplit = 0.3)
print("\n70-30 split performance:\n")
RMSE_7030, MAE_7030 = movie_recommender.EvaluateModel(model7030, test_data_7030)

# Stop
movie_recommender.stop()


Highest rated movie ID: 32, with average rating: 2.9166666666666665
User ID with highest rating: 11, total rating: 128.0

80-20 split performance:

Root-mean-square error = 1.852018322300239
Mean-absolute error = 1.355651779338769

70-30 split performance:

Root-mean-square error = 2.469070292648919
Mean-absolute error = 1.821911167812746


In [None]:
""" 

Part 3:

MSE: 
Mean squared error is calculated as the average of the squared errors between the prediction 
and the target in a dataset: let x = prediction, x' = target value, and n = number of samples. 
    MSE = sum((x - x')^2)/n

RMSE:
Root mean squared error is nearly the same as the mean squarred error, but with an additional 
square root operation applied after averaging in order to proportionalize the RMSE to the difference
in values between the prediction and target.
    RMSE = sqrt(MSE)

MAE:
Mean absolute error is the average of the absolute difference in prediction and target values. 
The absolute error is different from squared error since it scales linearly with the difference
in value between target and prediction, and is also non-differentiable around 0.
    MAE = sum(abs(x - x'))/n

RMSE is more widely used than MAE. Both are invariant to the direction of error (ranging from 0 to infinity),
the key difference in how these loss functions behave is in how they treat outliers, and punish predictions
which are VERY wrong. When using rmse, if a prediction is very far from the target, the error will be 
greater than a linearly proportional increase when compared to mean absolute error, in which the error will always
scale linearly. To summarize: RMSE increases more for values which are very wrong (often outliers or bad predictions),
while MAE is linear and treats all variance between predictions and targets the same. For me, I'd like to be
risk adverse and implement RMSE, because I want my model to be robust and never given a completely wrong prediction
(if possible).

Part 4:

To tune my collaborative filtering algorithm, I've varied the train-test split, the number of iterations, 
the matrix rank, and the regularization parameter. In varrying these parameters, I attempted to minimize
the test data error using RMSE over MAE, as explained in the prior section.
The other tuned hyperparameters can be explained as follows:

Train-test split (not really a hyper param but i'll explain my choice anyways):
    The ratio of data used for training vs evaluation. Given that this dataset is small, you want as much data as
    possible to create the model, but you also don't want to skew your evaluation results by reducing the test data
    too much. I'm opting to use slightly more training data with an 80-20 ratio, as I don't want my model to overfit
    and feel as though in the real worlds, I can always collect more.

Number of iterations:
    the alternating least squares (ALS) method for collaborative filtering relies on an iterative approach in which
    a latent space is produced through U-V matrix decomposition. The matrices U and V are updated cyclically, where U
    is held constant and V is updated, then vice versa, with the goal of minimizing the least squares error for the
    original user/item matrix and the product of the U - V matrix: min( ||R − U × V||^2 ). Therefore the number of
    iterations are the iterations set to allow U and V to keep updating. This value needs to be balanced to avoid 
    overfitting, and over training.

Rank:
    refering back to the explanation of the ALS algorithm and U - V matrix decomposition above, the rank is the 
    dimensionality of the latent space shared between matrices U and V. Rank can be thought of almost like the size
    of the embedding space, a rank allows the algorithm to capture greater data complexity, but also makes it more
    likely to overfit. Alternatively, a low-rank means that not all the complexity of the data may be captured, and
    could result in underfitting. rank and the number of iterations for the ALS algorithm should be tuned in tandem.

Regularization:
    The regularization parameter "regParam" is a regularization factor which effectively punishes very large values/
    weights in the model. This is used to combat overfitting, but too high a value may similarily lead to underfitting.


"""

In [68]:
# Part 4: Tuning

""" Changes made based on only RMSE results using an 80-20 split from evaluation.

Tuning changes:
    - Regularization Parameter increased from 0.01 to 0.08
    - Increased numIterations from 10 to 20
    - Increased rank from 10 to 15

Result:
    RMSE decreased from 1.852 to 0.860 !
"""

# Create movie recommendation building object
movie_recommender = CreateMovieRecommender("MovieRecommender",filepath5)

# Load the data
movie_recommender.LoadData()

# Train the model on an 80-20 train-test split
model, test_data = movie_recommender.TrainModel(rank = 15, numIterations = 20, regParam = 0.08, testSplit = 0.2)
print("\nTuned Model performance:\n")
RMSE, MAE = movie_recommender.EvaluateModel(model, test_data)

# Part 5:

# Recommendations for user 1:
recommendations_user1 = movie_recommender.RecommendMovies(model, 1, 12)

recommendations_user12 = movie_recommender.RecommendMovies(model, 12, 12)

movie_recommender.stop()


Tuned Model performance:

Root-mean-square error = 0.8605250315889759
Mean-absolute error = 0.6136316180126065
Top 12 for user 1:
+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1|     62|3.3536956|
|     1|     68|3.2473934|
|     1|     77|2.7373428|
|     1|     22|2.3228655|
|     1|     21|2.1096992|
|     1|     41|2.0874867|
|     1|     52|2.0024612|
|     1|     94|1.9991494|
|     1|     88|1.9760745|
|     1|     74|1.9546779|
|     1|     70|1.9544704|
|     1|     30|1.8418361|
+------+-------+---------+

Top 12 for user 12:
+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|    12|     46|4.5404897|
|    12|     64|4.3234367|
|    12|     27|4.2886424|
|    12|     35|4.1497645|
|    12|     55|3.8154309|
|    12|     50|3.6875954|
|    12|     48|3.6543837|
|    12|     16|3.6436348|
|    12|     94|3.5683517|
|    12|     65| 3.462337|
|    12|     90|3.3048935|
|    12|     23|3.2955499|
+------+---