### Step 1: Make sure Spark is running correctly

In [1]:
spark.version

'3.3.2'

In [2]:
spark

### Step 2: Input data and create RDD using SparkContext (sc)

In [3]:
input_path = "Assignment_2_movies_42.txt"

In [4]:
rdd = sc.textFile(input_path)

In [5]:
rdd.collect()

['user4,m1,4',
 'user9,m1,5',
 'user8,m2,4',
 'user1,m1,3',
 'user1,m1,5',
 'user2,m2,3',
 'user2,m3,5',
 'user3,m3,4',
 'user6,m3,4',
 'user7,m3,34',
 'user4,m1,3',
 'user4,m1,24',
 'user4,m1,4',
 'user5,m2,4',
 'user6,m3,5',
 'user7,m3,1',
 'user3,m1,4',
 'user4,m3,5',
 'user9,m6,2',
 'user9,m6,3',
 'user9,m6,4',
 'user9,m6,1',
 'user4,m4,3',
 'user5,m4,3',
 'user9,m4,4',
 'user8,m4,2',
 'user1,m4,4',
 'user1,m4,1',
 'user1,m4,8',
 'user7,m1,5',
 'user7,m1,22',
 'user2,m2,3',
 'user2,m3,5',
 'user3,m3,4',
 'user6,m3,4',
 'user7,m3,34',
 'user4,m1,3',
 'user4,m1,24',
 'user4,m1,4',
 'user5,m2,4',
 'user9,m7,2',
 'user6,m7,3']

In [6]:
rdd.count()

[Stage 1:>                                                          (0 + 2) / 2]                                                                                

42

### Step 3: Tokenize data to obtain movie ID and rating

In [7]:
def rating_pair(record):
    tokens = record.split(",")
    return (tokens[1], float(tokens[2]))

In [8]:
movie_rates = rdd.map(rating_pair)

In [9]:
movie_rates.collect()

[('m1', 4.0),
 ('m1', 5.0),
 ('m2', 4.0),
 ('m1', 3.0),
 ('m1', 5.0),
 ('m2', 3.0),
 ('m3', 5.0),
 ('m3', 4.0),
 ('m3', 4.0),
 ('m3', 34.0),
 ('m1', 3.0),
 ('m1', 24.0),
 ('m1', 4.0),
 ('m2', 4.0),
 ('m3', 5.0),
 ('m3', 1.0),
 ('m1', 4.0),
 ('m3', 5.0),
 ('m6', 2.0),
 ('m6', 3.0),
 ('m6', 4.0),
 ('m6', 1.0),
 ('m4', 3.0),
 ('m4', 3.0),
 ('m4', 4.0),
 ('m4', 2.0),
 ('m4', 4.0),
 ('m4', 1.0),
 ('m4', 8.0),
 ('m1', 5.0),
 ('m1', 22.0),
 ('m2', 3.0),
 ('m3', 5.0),
 ('m3', 4.0),
 ('m3', 4.0),
 ('m3', 34.0),
 ('m1', 3.0),
 ('m1', 24.0),
 ('m1', 4.0),
 ('m2', 4.0),
 ('m7', 2.0),
 ('m7', 3.0)]

### Step 4: Satisfy rules 1 & 2, make sure rating is between 2 and 5

In [11]:
rdd4 = movie_rates.filter(lambda x: x[1] >= 2 and x[1] <= 5)

In [12]:
rdd4.count()

33

In [13]:
rdd4.collect()

[('m1', 4.0),
 ('m1', 5.0),
 ('m2', 4.0),
 ('m1', 3.0),
 ('m1', 5.0),
 ('m2', 3.0),
 ('m3', 5.0),
 ('m3', 4.0),
 ('m3', 4.0),
 ('m1', 3.0),
 ('m1', 4.0),
 ('m2', 4.0),
 ('m3', 5.0),
 ('m1', 4.0),
 ('m3', 5.0),
 ('m6', 2.0),
 ('m6', 3.0),
 ('m6', 4.0),
 ('m4', 3.0),
 ('m4', 3.0),
 ('m4', 4.0),
 ('m4', 2.0),
 ('m4', 4.0),
 ('m1', 5.0),
 ('m2', 3.0),
 ('m3', 5.0),
 ('m3', 4.0),
 ('m3', 4.0),
 ('m1', 3.0),
 ('m1', 4.0),
 ('m2', 4.0),
 ('m7', 2.0),
 ('m7', 3.0)]

### Step 5: Identify '5' star ratings

In [14]:
def fiveRatingsCheck(pair):
    fiveList = []
    if pair[1] == 5:
        fiveList.append(pair[0])
    else:
        pass
    return fiveList

In [15]:
rdd5 = rdd4.flatMap(fiveRatingsCheck)

In [16]:
rdd5.collect()

['m1', 'm1', 'm3', 'm3', 'm3', 'm1', 'm3']

In [17]:
topMovies = ("rating-5-movies: ", str(set(rdd5.collect())))

### Step 6: Sort and shuffle phase

In [18]:
groupKey = rdd4.groupByKey().sortByKey()

In [19]:
groupKey.collect()

[('m1', <pyspark.resultiterable.ResultIterable at 0x7f93d90ea970>),
 ('m2', <pyspark.resultiterable.ResultIterable at 0x7f93e839f8e0>),
 ('m3', <pyspark.resultiterable.ResultIterable at 0x7f93e839f6a0>),
 ('m4', <pyspark.resultiterable.ResultIterable at 0x7f93e839f6d0>),
 ('m6', <pyspark.resultiterable.ResultIterable at 0x7f93e839f490>),
 ('m7', <pyspark.resultiterable.ResultIterable at 0x7f93e839f3d0>)]

In [20]:
groupKey1 = groupKey.mapValues(lambda x: list(x))

In [21]:
groupKey1.collect()

[('m1', [4.0, 5.0, 3.0, 5.0, 3.0, 4.0, 4.0, 5.0, 3.0, 4.0]),
 ('m2', [4.0, 3.0, 4.0, 3.0, 4.0]),
 ('m3', [5.0, 4.0, 4.0, 5.0, 5.0, 5.0, 4.0, 4.0]),
 ('m4', [3.0, 3.0, 4.0, 2.0, 4.0]),
 ('m6', [2.0, 3.0, 4.0]),
 ('m7', [2.0, 3.0])]

### Step 7: Calculate average ratings

In [22]:
groupKey1.mapValues(lambda x:sum(x)/len(x)).collect()

[('m1', 4.0), ('m2', 3.6), ('m3', 4.5), ('m4', 3.2), ('m6', 3.0), ('m7', 2.5)]

In [23]:
meanValues = groupKey1.mapValues(lambda x:sum(x)/len(x))

In [24]:
groupKey1.mapValues(lambda x:sum(x)/len(x)).filter(lambda x: x[1] >= 3.0).collect()

[('m1', 4.0), ('m2', 3.6), ('m3', 4.5), ('m4', 3.2), ('m6', 3.0)]

### Step 8: Satisfy rule 3, filter out average ratings below 3

In [25]:
filteredMean = groupKey1.mapValues(lambda x:sum(x)/len(x)).filter(lambda x: x[1] >= 3.0)

In [26]:
averageRatings = filteredMean.collect()

### Step 9: Output 5 star movies list and average ratings

In [27]:
print(topMovies)
print(averageRatings)

('rating-5-movies: ', "{'m1', 'm3'}")
[('m1', 4.0), ('m2', 3.6), ('m3', 4.5), ('m4', 3.2), ('m6', 3.0)]
