In [7]:
from pyspark.sql import SparkSession

# the following scripts is for starting a spark program
spark = SparkSession \
    .builder \
    .appName('Ass1_Q0') \
    .master('spark://spark-master:7077') \
    .getOrCreate()
# this line is to omit unnecessary info
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# read the file in plain text to RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/movies.csv')

# print the first 5 lines of the RDD
for line in rdd.take(5):
    print(line)

# the first line of the file contains the header "movieId,title,genres", we should ignore this line in our code.
header = rdd.first()
rdd = rdd.filter(lambda x:x != header)

# each line of rdd contains 3 attributes separated by a comma
# the title would be the second object of split(',')
# we map the line to a tuple (title,len_of_title)
rdd = rdd.map(lambda x:(x.split(',')[1],len(x.split(',')[1])))

# after the mapping,each line of rdd would be converted to a tuple which contains movie title and its length
# a reduce with a simple comparison to get the movie with the longest title
result = rdd.reduce(lambda y,x:x if x[1]>y[1] else y)

print(result)

spark.stop()

                                                                                

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance


                                                                                

("Dragon Ball Z the Movie: The World's Strongest (a.k.a. Dragon Ball Z: The Strongest Guy in The World) (Doragon bôru Z: Kono yo de ichiban tsuyoi yatsu) (1990)", 158)


In [9]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q1') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV file from HDFS into RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')

# Parse and filter data
def parse_and_filter(line):
    # Skip the header row
    if line.startswith("userId"):
        return None
    
    fields = line.split(',')
    userId = fields[0]
    movieId = fields[1]
    rating = float(fields[2])
    timestamp = int(fields[3])
    
    # Check if the rating is within the year 2018
    if 1514764800 <= timestamp < 1546300800:  # Timestamp range for the year 2018
        return (userId, (rating, 1))
    else:
        return None

# Map data and filter out records not in 2018
mapped_rdd = rdd.map(parse_and_filter).filter(lambda x: x is not None)

# Aggregate data
# Use reduceByKey to calculate the total rating and count for each user
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Calculate average rating
average_rating_rdd = reduced_rdd.mapValues(lambda v: v[0] / v[1])

# Print the first 5 results
for result in average_rating_rdd.take(5):
    print(result)

# Stop SparkSession
spark.stop()

                                                                                

('65364', 3.6328125)
('65533', 3.5259515570934257)
('65602', 3.5416666666666665)
('65659', 2.6544117647058822)
('65752', 5.0)


In [11]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q2') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV files from HDFS into RDDs
rdd_rating = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')
rdd_movies = sc.textFile('hdfs://namenode:9000/input_files/movies.csv')

# Step 1: Extract movie IDs for 'Comedy' and 'Romance' genres from movies.csv
def parse_movie(line):
    if line.startswith("movieId"):  # Skip header row
        return []
    
    fields = line.split(',')
    movie_id = fields[0]
    title = fields[1]
    genres = fields[2].split('|')
    
    results = []
    if 'Comedy' in genres:
        results.append(('Comedy', movie_id))
    if 'Romance' in genres:
        results.append(('Romance', movie_id))
    
    return results

# Filter out header and map movies to their respective genres
genre_movie_ids = rdd_movies.flatMap(parse_movie)

# Collect all Comedy and Romance movie IDs into a dictionary for quick lookup
comedy_movie_ids = set(genre_movie_ids.filter(lambda x: x[0] == 'Comedy').map(lambda x: x[1]).collect())
romance_movie_ids = set(genre_movie_ids.filter(lambda x: x[0] == 'Romance').map(lambda x: x[1]).collect())

# Step 2: Filter ratings for these movies and calculate average ratings
def parse_rating(line):
    if line.startswith("userId"):  # Skip header row
        return None
    
    fields = line.split(',')
    user_id = fields[0]
    movie_id = fields[1]
    rating = float(fields[2])
    
    return (movie_id, rating)

# Filter ratings for Comedy and Romance movies
ratings_rdd = rdd_rating.map(parse_rating).filter(lambda x: x is not None)

# Map ratings to their respective genres
def map_ratings_to_genres(rating_tuple):
    movie_id, rating = rating_tuple
    results = []
    if movie_id in comedy_movie_ids:
        results.append(('Comedy', (rating, 1)))
    if movie_id in romance_movie_ids:
        results.append(('Romance', (rating, 1)))
    return results

mapped_ratings_rdd = ratings_rdd.flatMap(map_ratings_to_genres)

# Aggregate data using reduceByKey
reduced_ratings_rdd = mapped_ratings_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Calculate average ratings
average_ratings_rdd = reduced_ratings_rdd.mapValues(lambda v: v[0] / v[1])

# Collect and print the results
results = average_ratings_rdd.collect()
for result in results:
    print(f"Average rating for {result[0]} movies: {result[1]:.2f}")

# Stop SparkSession
spark.stop()

                                                                                

Average rating for Romance movies: 3.53
Average rating for Comedy movies: 3.41


In [12]:
from pyspark.sql import SparkSession
from math import sqrt

# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q3') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV file from HDFS into RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')

user_A = "20"
user_B = "30"

# Part 1: Filter records for user A and user B
def parse_rating(line):
    if line.startswith("userId"):  # Skip header row
        return None
    
    fields = line.split(',')
    user_id = fields[0]
    movie_id = fields[1]
    rating = float(fields[2])
    
    return (user_id, (movie_id, rating))

ratings_rdd = rdd.map(parse_rating).filter(lambda x: x is not None)

# Filter ratings for user A and user B
ratings_user_A = ratings_rdd.filter(lambda x: x[0] == user_A).map(lambda x: (x[1][0], x[1][1]))
ratings_user_B = ratings_rdd.filter(lambda x: x[0] == user_B).map(lambda x: (x[1][0], x[1][1]))

# Join the ratings of both users on movie ID
common_ratings = ratings_user_A.join(ratings_user_B)

# Part 2: Compute the denominator of the formula
# Calculate Euclidean norm for each user
def compute_euclidean_norm(ratings):
    sum_squares = sum(rating ** 2 for rating in ratings)
    return sqrt(sum_squares)

norm_A = compute_euclidean_norm(ratings_user_A.map(lambda x: x[1]).collect())
norm_B = compute_euclidean_norm(ratings_user_B.map(lambda x: x[1]).collect())

denominator = norm_A * norm_B

# Part 3: Compute the numerator of the formula
# Sum of product of ratings for common movies
numerator = common_ratings.map(lambda x: x[1][0] * x[1][1]).sum()

# Part 4: Compute the similarity score
similarity_score = numerator / denominator if denominator != 0 else 0

print(f"Similarity score between user {user_A} and user {user_B}: {similarity_score:.4f}")

# Stop SparkSession
spark.stop()

                                                                                

Similarity score between user 20 and user 30: 0.0460


### Q4. Devise a solution for effectively calculating the similarity score matrix, which encompasses all similarity scores for every pair of users. (20%)

In [None]:
# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q4') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV file from HDFS into RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')

# Parse ratings
def parse_rating(line):
    if line.startswith("userId"):  # Skip header row
        return None
    
    fields = line.split(',')
    user_id = fields[0]
    movie_id = fields[1]
    rating = float(fields[2])
    
    return (user_id, (movie_id, rating))

ratings_rdd = rdd.map(parse_rating).filter(lambda x: x is not None)# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q4') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV file from HDFS into RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')

# Parse ratings
def parse_rating(line):
    if line.startswith("userId"):  # Skip header row
        return None
    
    fields = line.split(',')
    user_id = fields[0]
    movie_id = fields[1]
    rating = float(fields[2])
    
    return (user_id, (movie_id, rating))


In [None]:
# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q4') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV file from HDFS into RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')

# Parse ratings
def parse_rating(line):
    if line.startswith("userId"):  # Skip header row
        return None
    
    fields = line.split(',')
    user_id = fields[0]
    movie_id = fields[1]
    rating = float(fields[2])
    
    return (user_id, (movie_id, rating))

ratings_rdd = rdd.map(parse_rating).filter(lambda x: x is not None)# Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Ass1_Q4') \
    .master('spark://spark-master:7077') \
    .getOrCreate()

# Set log level to WARN to reduce unnecessary output
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

# Read CSV file from HDFS into RDD
rdd = sc.textFile('hdfs://namenode:9000/input_files/ratings.csv')

# Parse ratings
def parse_rating(line):
    if line.startswith("userId"):  # Skip header row
        return None
    
    fields = line.split(',')
    user_id = fields[0]
    movie_id = fields[1]
    rating = float(fields[2])
    
    return (user_id, (movie_id, rating))