#### Step 1: Place the required data set "ratings.txt" and "movies.txt" in HDFS

In [1]:
!hdfs dfs -put ~/training_materials/data/ratings.txt /user/cloudera
!hdfs dfs -put ~/training_materials/data/movies.txt /user/cloudera 

#### Step 2: Review the contents of the "ratings.txt" file

In [2]:
!hdfs dfs -cat ratings.txt | head -5

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
cat: Unable to write to output stream.


#### Step 3: Review the contents of the "movies.txt" file

In [1]:
!hdfs dfs -cat movies.txt | head -5

cat: `movies.txt': No such file or directory


#### Step 4: Load the "ratings.txt" file in memory 

In [1]:
ratings=sc.textFile("ratings.txt")

#### Step 5: Split the RDD based on the delimiter used in file i.e. “::”

In [2]:
ratings_split=ratings.map(lambda line: line.split('::'))

#### Step 6:  Create a Pair RDD with key being movie ID and value being its rating 

In [3]:
ratings_pairs=ratings_split.map(lambda fields: (str(fields[0]),int(fields[2])))

#### Step 7: Group the ratings by key and observe the output

In [4]:
ratings_group=ratings_pairs.groupByKey()
ratings_group.take(5)

[('1869', <pyspark.resultiterable.ResultIterable at 0x7f835082d090>),
 ('344', <pyspark.resultiterable.ResultIterable at 0x7f835082d250>),
 ('346', <pyspark.resultiterable.ResultIterable at 0x7f835082d290>),
 ('340', <pyspark.resultiterable.ResultIterable at 0x7f835082d2d0>),
 ('342', <pyspark.resultiterable.ResultIterable at 0x7f835082d310>)]

-groupByKey() groups the values with the same key

#### Step 8: Calculate the average rating for all movies

In [5]:
ratings_average=ratings_group.mapValues(lambda x: sum(x)/len(x))

#### Step 9: Take 10 elements and observe the output

In [6]:
ratings_average.take(10)

[('1869', 3),
 ('344', 2),
 ('346', 4),
 ('340', 3),
 ('342', 3),
 ('348', 3),
 ('2318', 2),
 ('2316', 3),
 ('2314', 3),
 ('2312', 3)]

#### Step 10: Now count the number of movies for a certain rating

In [8]:
ratings_newpairs=ratings_average.map(lambda fields: (fields[1],fields[0]))
ratings_count=ratings_newpairs.countByKey()

-countByKey() return the count of every key i.e. the number of times a particular key is present

#### Step 11: Review the output

In [10]:
print ratings_count

defaultdict(<type 'int'>, {1: 2, 2: 243, 3: 2700, 4: 1007})


#### Step 12: Load the "movies.txt" file in memory

In [11]:
movies=sc.textFile("movies.txt")

#### Step 13: Split the RDD based upon '::' as delimeter 

In [13]:
movies_split=movies.map(lambda line: line.split('::'))

#### Step 14: Create a Pair RDD with movie ID being the key and movie name being its value and then review the output

In [15]:
movies_pairs=movies_split.map(lambda fields: (fields[0],fields[1]))
movies_pairs.take(10)

[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)'),
 (u'4', u'Waiting to Exhale (1995)'),
 (u'5', u'Father of the Bride Part II (1995)'),
 (u'6', u'Heat (1995)'),
 (u'7', u'Sabrina (1995)'),
 (u'8', u'Tom and Huck (1995)'),
 (u'9', u'Sudden Death (1995)'),
 (u'10', u'GoldenEye (1995)')]

#### Step  15: Now join the movies-rdd in Step 13 and ratings-rdd in Step 8 to find the average rating of movies along with their names

In [16]:
movies_join=movies_pairs.join(ratings_average)

#### Step 16: Take 10 elements and observe the output

In [17]:
movies_join.take(10)

[(u'3922', (u'Bikini Beach (1964)', 2)),
 (u'1142', (u'Get Over It (1996)', 3)),
 (u'3926', (u'Voyage to the Bottom of the Sea (1961)', 3)),
 (u'2068', (u'Fanny and Alexander (1982)', 3)),
 (u'304', (u'Roommates (1995)', 3)),
 (u'1564', (u"Roseanna's Grave (For Roseanna) (1997)", 3)),
 (u'124', (u"Star Maker, The (Uomo delle stelle, L') (1995)", 4)),
 (u'164', (u'Devil in a Blue Dress (1995)', 4)),
 (u'1160', (u'Six of a Kind (1934)', 3)),
 (u'3571', (u'Time Code (2000)', 3))]

#### Step 17: Save the RDD as text file and review the output

In [18]:
movies_join.saveAsTextFile("movies_join")
!hdfs dfs -cat movies_join/part* | head -10

(u'3922', (u'Bikini Beach (1964)', 2))
(u'1142', (u'Get Over It (1996)', 3))
(u'3926', (u'Voyage to the Bottom of the Sea (1961)', 3))
(u'2068', (u'Fanny and Alexander (1982)', 3))
(u'304', (u'Roommates (1995)', 3))
(u'1564', (u"Roseanna's Grave (For Roseanna) (1997)", 3))
(u'124', (u"Star Maker, The (Uomo delle stelle, L') (1995)", 4))
(u'164', (u'Devil in a Blue Dress (1995)', 4))
(u'1160', (u'Six of a Kind (1934)', 3))
(u'3571', (u'Time Code (2000)', 3))
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.
