# Big Data Modeling & Analytics - Movie Ratings

In [172]:
from pyspark.sql import SparkSession
import numpy as np

In [173]:
spark = SparkSession\
        .builder\
        .appName("PythonMovieRatings")\
        .getOrCreate()

In [174]:
input_path = './ratings.txt'

In [175]:
data = spark.sparkContext.textFile(input_path)

In [176]:
records_as_list = data.collect()

In [177]:
print(records_as_list)

['U1,M4,4', 'U1,M4,3', 'U1,M2,5', 'U1,M2,0', 'U1,M3,2', 'U2,M4,3', 'U2,M4,4', 'U2,M4,5', 'U3,M1,1', 'U3,M5,6', 'U3,M4,4', 'U3,M4,5', 'U4,M2,3', 'U4,M1,1', 'U4,M1,4', 'U4,M1,5', 'U5,M1,3', 'U5,M1,1', 'U6,M1,3', 'U6,M9,4']


In [203]:
rdd = data.map(lambda x: (x.split(",")[0], x.split(",")[1],x.split(",")[2]))

In [204]:
list_Of_Records = rdd.collect()

In [205]:
list_Of_Records

[('U1', 'M4', '4'),
 ('U1', 'M4', '3'),
 ('U1', 'M2', '5'),
 ('U1', 'M2', '0'),
 ('U1', 'M3', '2'),
 ('U2', 'M4', '3'),
 ('U2', 'M4', '4'),
 ('U2', 'M4', '5'),
 ('U3', 'M1', '1'),
 ('U3', 'M5', '6'),
 ('U3', 'M4', '4'),
 ('U3', 'M4', '5'),
 ('U4', 'M2', '3'),
 ('U4', 'M1', '1'),
 ('U4', 'M1', '4'),
 ('U4', 'M1', '5'),
 ('U5', 'M1', '3'),
 ('U5', 'M1', '1'),
 ('U6', 'M1', '3'),
 ('U6', 'M9', '4')]

In [181]:
records_after_filtering = rdd.filter(lambda x: (x[2] in ['1','2','3','4','5']))

In [182]:
records_after_filtering.collect()

[('U1', 'M4', '4'),
 ('U1', 'M4', '3'),
 ('U1', 'M2', '5'),
 ('U1', 'M3', '2'),
 ('U2', 'M4', '3'),
 ('U2', 'M4', '4'),
 ('U2', 'M4', '5'),
 ('U3', 'M1', '1'),
 ('U3', 'M4', '4'),
 ('U3', 'M4', '5'),
 ('U4', 'M2', '3'),
 ('U4', 'M1', '1'),
 ('U4', 'M1', '4'),
 ('U4', 'M1', '5'),
 ('U5', 'M1', '3'),
 ('U5', 'M1', '1'),
 ('U6', 'M1', '3'),
 ('U6', 'M9', '4')]

In [183]:
mapper_output = records_after_filtering.map(lambda x: (x[1],x[0]))

In [184]:
mapper_output.collect()

[('M4', 'U1'),
 ('M4', 'U1'),
 ('M2', 'U1'),
 ('M3', 'U1'),
 ('M4', 'U2'),
 ('M4', 'U2'),
 ('M4', 'U2'),
 ('M1', 'U3'),
 ('M4', 'U3'),
 ('M4', 'U3'),
 ('M2', 'U4'),
 ('M1', 'U4'),
 ('M1', 'U4'),
 ('M1', 'U4'),
 ('M1', 'U5'),
 ('M1', 'U5'),
 ('M1', 'U6'),
 ('M9', 'U6')]

In [210]:
grouped_output = mapper_output.groupByKey()

In [211]:
grouped_output.collect()

[('M4', <pyspark.resultiterable.ResultIterable at 0x256bbb56668>),
 ('M2', <pyspark.resultiterable.ResultIterable at 0x256bbb4f2b0>),
 ('M3', <pyspark.resultiterable.ResultIterable at 0x256bbb68588>),
 ('M1', <pyspark.resultiterable.ResultIterable at 0x256bbb68208>),
 ('M9', <pyspark.resultiterable.ResultIterable at 0x256bbb684a8>)]

In [214]:
grouped_key_values = grouped_output.map(lambda x: (x[0], list(x[1])))

In [215]:
grouped_key_values.collect()

[('M4', ['U1', 'U1', 'U2', 'U2', 'U2', 'U3', 'U3']),
 ('M2', ['U1', 'U4']),
 ('M3', ['U1']),
 ('M1', ['U3', 'U4', 'U4', 'U4', 'U5', 'U5', 'U6']),
 ('M9', ['U6'])]

In [220]:
reducer_result = grouped_key_values.map(lambda x: (x[0], (np.size(np.asarray(x[1])),np.size(np.unique(np.asarray(x[1]))))))

In [221]:
reducer_result.collect()

[('M4', (7, 3)),
 ('M2', (2, 2)),
 ('M3', (1, 1)),
 ('M1', (7, 4)),
 ('M9', (1, 1))]

In [222]:
filtered_reducer_result = reducer_result.filter(lambda x: (x[1][1] >= 2))

In [223]:
filtered_reducer_result.collect()

[('M4', (7, 3)), ('M2', (2, 2)), ('M1', (7, 4))]