# Managing Big Data for Connected Devices

## 420-N63-NA

## Kawser Wazed Nafi

--------------------------------------------------------------------------------------------------------------------------------------

## Additional RDD Operation
At first, create the sparksession and rdd

In [1]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.master("local[4]").appName("movielens-join").getOrCreate()
sc = ss.sparkContext

### Load the Ratings Data and remove the header

In [2]:
ratingsRDDWithHeader = sc.textFile('ratings.csv')
header = ratingsRDDWithHeader.first()
print(header)
ratingsRDD = ratingsRDDWithHeader.filter(lambda x: x != header)
ratingsRDD = ratingsRDD.map(lambda x: x.split(','))
ratingsRDD.take(5)

userId,movieId,rating,timestamp


[['1', '1', '4.0', '964982703'],
 ['1', '3', '4.0', '964981247'],
 ['1', '6', '4.0', '964982224'],
 ['1', '47', '5.0', '964983815'],
 ['1', '50', '5.0', '964982931']]

### Load the Movies data and remove the header

In [3]:
moviesRDDWithHeader = sc.textFile('movies.csv')
moviesRDDWithHeader.first()
headerMovies = moviesRDDWithHeader.first()
print(headerMovies)
moviesRDD = moviesRDDWithHeader.filter(lambda x: x != headerMovies)
moviesRDD = moviesRDD.map(lambda x: x.split(','))
moviesRDD.take(5)

movieId,title,genres


[['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'],
 ['2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'],
 ['3', 'Grumpier Old Men (1995)', 'Comedy|Romance'],
 ['4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance'],
 ['5', 'Father of the Bride Part II (1995)', 'Comedy']]

### Load the tags data


In [4]:
tagsRDDWithHeader = sc.textFile('tags.csv')
tagsRDDWithHeader.first()
headerTags = tagsRDDWithHeader.first()
print(headerTags)
tagsRDD = tagsRDDWithHeader.filter(lambda x: x != headerTags)
tagsRDD = tagsRDD.map(lambda x: x.split(','))
tagsRDD.take(5)

userId,movieId,tag,timestamp


[['2', '60756', 'funny', '1445714994'],
 ['2', '60756', 'Highly quotable', '1445714996'],
 ['2', '60756', 'will ferrell', '1445714992'],
 ['2', '89774', 'Boxing story', '1445715207'],
 ['2', '89774', 'MMA', '1445715200']]

### Filterout only the Movies with Funny Tag


In [5]:
filteredFunnyTagsRDD = tagsRDD.filter(lambda x: 'funny' in x[2].lower())
filteredFunnyTagsRDD.take(5)

[['2', '60756', 'funny', '1445714994'],
 ['62', '2953', 'funny', '1525636885'],
 ['62', '3114', 'funny', '1525636913'],
 ['62', '60756', 'funny', '1528934381'],
 ['62', '68848', 'funny', '1527274322']]

### Multivalue Mapping Creation
Till now we have created a number of RDDs. At this point, our goal is to create a multicolumn/multivalue RDD mapping



In [6]:
moviesWithTagFunnyRDD = filteredFunnyTagsRDD.map(lambda x: (x[1], (x[0], x[2]))).reduceByKey(lambda x,y: ('1', 'funny')).map(lambda x: x[0]).map(lambda x: (x,1))
moviesWithTagFunnyRDD.take(5)

[('60756', 1), ('2953', 1), ('71535', 1), ('88405', 1), ('99114', 1)]

### Exercise 1
It's a big line performing a big value operation. Could you please divide the operation in small operations and check what is happening with every small operation? Please list your findings one by one and explain in your own word.

In [7]:
sampleRDD = filteredFunnyTagsRDD.map(lambda x: (x[1], (x[0], x[2])))
sampleRDD.collect()

[('60756', ('2', 'funny')),
 ('2953', ('62', 'funny')),
 ('3114', ('62', 'funny')),
 ('60756', ('62', 'funny')),
 ('68848', ('62', 'funny')),
 ('71535', ('62', 'funny')),
 ('88405', ('62', 'funny')),
 ('99114', ('62', 'funny')),
 ('107348', ('62', 'stupid but funny')),
 ('119141', ('62', 'funny')),
 ('179401', ('62', 'funny')),
 ('183611', ('62', 'funny')),
 ('101142', ('119', 'funny')),
 ('115617', ('177', 'very funny')),
 ('126548', ('256', 'funny')),
 ('39', ('357', 'funny')),
 ('60756', ('424', 'funny')),
 ('2706', ('477', 'not funny')),
 ('69757', ('477', 'Funny')),
 ('69122', ('537', 'funny')),
 ('106766', ('567', 'funny')),
 ('112852', ('567', 'funny')),
 ('134170', ('567', 'funny')),
 ('148626', ('567', 'funny')),
 ('167746', ('567', 'funny')),
 ('296', ('599', 'funny')),
 ('296', ('599', 'very funny')),
 ('1732', ('599', 'funny'))]

Here, I'm fetching for every movie with a tag that has the word 'funny' in it, case insensitive.

In [8]:
reducedSampleRDD = sampleRDD.reduceByKey(lambda x, y: ('1', 'funny'))
reducedSampleRDD.collect()

[('60756', ('1', 'funny')),
 ('2953', ('62', 'funny')),
 ('71535', ('62', 'funny')),
 ('88405', ('62', 'funny')),
 ('99114', ('62', 'funny')),
 ('179401', ('62', 'funny')),
 ('183611', ('62', 'funny')),
 ('2706', ('477', 'not funny')),
 ('69757', ('477', 'Funny')),
 ('69122', ('537', 'funny')),
 ('112852', ('567', 'funny')),
 ('148626', ('567', 'funny')),
 ('167746', ('567', 'funny')),
 ('296', ('1', 'funny')),
 ('3114', ('62', 'funny')),
 ('68848', ('62', 'funny')),
 ('107348', ('62', 'stupid but funny')),
 ('119141', ('62', 'funny')),
 ('101142', ('119', 'funny')),
 ('115617', ('177', 'very funny')),
 ('126548', ('256', 'funny')),
 ('39', ('357', 'funny')),
 ('106766', ('567', 'funny')),
 ('134170', ('567', 'funny')),
 ('1732', ('599', 'funny'))]

The .reduceByKey() transformation merged the userIDs in the RDD sampleRDD together.

In [9]:
uidRDD = reducedSampleRDD.map(lambda x: x[0])
uidRDD.collect()

['60756',
 '2953',
 '71535',
 '88405',
 '99114',
 '179401',
 '183611',
 '2706',
 '69757',
 '69122',
 '112852',
 '148626',
 '167746',
 '296',
 '3114',
 '68848',
 '107348',
 '119141',
 '101142',
 '115617',
 '126548',
 '39',
 '106766',
 '134170',
 '1732']

We're getting the userIDs from the reducedSampleRDD RDD

In [10]:
targetUsersRDD = uidRDD.map(lambda x: (x,1))
targetUsersRDD.take(5)

[('60756', 1), ('2953', 1), ('71535', 1), ('88405', 1), ('99114', 1)]

Finally, put the userIDs in tuples and append a value of '1' to each of them.

### RDDs Join Operation
We are recurring the problem we discussed in our previous exercise. We are going to find out the highest average rating movie from the movie list. This time, we are going to see only the highest average rated movie under "funny" genere.

In [11]:
moviesRDDwithKey = moviesRDD.map(lambda x: (x[0], (x[1], x[2])))
moviesRDDwithKey.take(5)

moviesWithTagFunnyRDD.join(moviesRDDwithKey).take(5)

[('60756', (1, ('Step Brothers (2008)', 'Comedy'))),
 ('99114', (1, ('Django Unchained (2012)', 'Action|Drama|Western'))),
 ('183611', (1, ('Game Night (2018)', 'Action|Comedy|Crime|Horror'))),
 ('2706', (1, ('American Pie (1999)', 'Comedy|Romance'))),
 ('69757', (1, ('(500) Days of Summer (2009)', 'Comedy|Drama|Romance')))]

From the Data we can see that we have performed a new tuple configuration, where MovieID is merged with Movie name and the Funny Genere. We should see how the data tuple is configured and distributed.

('60756', (1, ('Step Brothers (2008)', 'Comedy')))

x[0] = '60756'
x[1] = (1, ('Step Brothers (2008)', 'Comedy'))

x[1][0] = 1
x[1][1] = ('Step Brothers (2008)', 'Comedy')

x[1][1][0] = 'Step Brothers (2008)'
x[1][1][1] = 'Comedy'

map(lambda x: (x[0], x[1][1][0], x[1][1][1]))


(x[0],) + x[1][1]

Based on this analysis, we now need to perform the mapping in such a way that movieID will represent Movie Name and Genere.

In [12]:
moviesWithTagFunnyJoinedRDD = moviesWithTagFunnyRDD.join(moviesRDDwithKey).map(lambda x: (x[0],) + x[1][1]).map(lambda x: (x[0], (x[1],x[2])))
moviesWithTagFunnyJoinedRDD.take(5)

[('60756', ('Step Brothers (2008)', 'Comedy')),
 ('99114', ('Django Unchained (2012)', 'Action|Drama|Western')),
 ('183611', ('Game Night (2018)', 'Action|Comedy|Crime|Horror')),
 ('2706', ('American Pie (1999)', 'Comedy|Romance')),
 ('69757', ('(500) Days of Summer (2009)', 'Comedy|Drama|Romance'))]

After this, our next step is to match the movieID with the ratings and generate a representational tuple

In [13]:
ratingsKeyValueRDD = ratingsRDD.map(lambda x: (x[1], x[2]))
ratingsKeyValueRDD.take(5)

[('1', '4.0'), ('3', '4.0'), ('6', '4.0'), ('47', '5.0'), ('50', '5.0')]

Finally, we need to join our funny Genere movies with the ratings to find out the best average rated funny movie in the movie list.

In [14]:
ratingsFilteredWithTagFunnyRDD = ratingsKeyValueRDD.join(moviesWithTagFunnyJoinedRDD).map(lambda x: (x[0], float(x[1][0]), x[1][1][0]))
ratingsFilteredWithTagFunnyRDD = ratingsFilteredWithTagFunnyRDD.map(lambda x: (x[0], (x[1], 1, x[2]))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2])).filter(lambda x: x[1][1] > 10).map(lambda x: (x[0], x[1][0]/x[1][1], x[1][2]))
print(ratingsFilteredWithTagFunnyRDD.max(lambda x: x[1]))

('296', 4.197068403908795, 'Pulp Fiction (1994)')


### Exercise 2
Can we get the top 5 average rated funny movies instead of just the top? Give a try

In [15]:
topFive = ratingsFilteredWithTagFunnyRDD.sortBy(lambda x: x[1]).take(5)
print(topFive)

[('2953', 2.5161290322580645, 'Home Alone 2: Lost in New York (1992)'), ('88405', 3.05, 'Friends with Benefits (2011)'), ('39', 3.293269230769231, 'Clueless (1995)'), ('107348', 3.3636363636363638, 'Anchorman 2: The Legend Continues (2013)'), ('2706', 3.378640776699029, 'American Pie (1999)')]


### Exercise 3

Create different RDDs with joined by key. For each RDD, create an RDD that is a tuple where the first element is the key which is the movie ID, and the second element is the rest of the data that we want. Use the ratings file and the movies file

In [None]:
masterRatingsRDD = sc.textFile("ratings.csv")
ratingsHeader = masterRatingsRDD.first()
ratingsRDD = masterRatingsRDD.filter(lambda x: x!= ratingsHeader).map(lambda x: (x.split(",")[1], x.split(",")[2]))
# ratingsRDD.collect()

masterMoviesRDD = sc.textFile("movies.csv")
moviesHeader = masterMoviesRDD.first()
moviesRDD = masterMoviesRDD.filter(lambda x: x!= moviesHeader)

titlesRDD = moviesRDD.map(lambda x: (x.split(",")[0], x.split(",")[1]))
genresRDD = moviesRDD.map(lambda x: (x.split(",")[0], x.split(",")[2]))

movieRatings = titlesRDD.join(ratingsRDD)
movieGenres = genresRDD.join(ratingsRDD)
movieRatingsFinal = movieRatings.map(lambda x: (x[0], (x[1][0], x[1][1])))
movieGenresFinal = movieGenres.map(lambda x: (x[0], (x[1][0], x[1][1])))
print(movieRatingsFinal.take(10))
print(movieGenresFinal.take(10))

[('1', '4.0'),
 ('3', '4.0'),
 ('6', '4.0'),
 ('47', '5.0'),
 ('50', '5.0'),
 ('70', '3.0'),
 ('101', '5.0'),
 ('110', '4.0'),
 ('151', '5.0'),
 ('157', '5.0'),
 ('163', '5.0'),
 ('216', '5.0'),
 ('223', '3.0'),
 ('231', '5.0'),
 ('235', '4.0'),
 ('260', '5.0'),
 ('296', '3.0'),
 ('316', '3.0'),
 ('333', '5.0'),
 ('349', '4.0'),
 ('356', '4.0'),
 ('362', '5.0'),
 ('367', '4.0'),
 ('423', '3.0'),
 ('441', '4.0'),
 ('457', '5.0'),
 ('480', '4.0'),
 ('500', '3.0'),
 ('527', '5.0'),
 ('543', '4.0'),
 ('552', '4.0'),
 ('553', '5.0'),
 ('590', '4.0'),
 ('592', '4.0'),
 ('593', '4.0'),
 ('596', '5.0'),
 ('608', '5.0'),
 ('648', '3.0'),
 ('661', '5.0'),
 ('673', '3.0'),
 ('733', '4.0'),
 ('736', '3.0'),
 ('780', '3.0'),
 ('804', '4.0'),
 ('919', '5.0'),
 ('923', '5.0'),
 ('940', '5.0'),
 ('943', '4.0'),
 ('954', '5.0'),
 ('1009', '3.0'),
 ('1023', '5.0'),
 ('1024', '5.0'),
 ('1025', '5.0'),
 ('1029', '5.0'),
 ('1030', '3.0'),
 ('1031', '5.0'),
 ('1032', '5.0'),
 ('1042', '4.0'),
 ('1049', '5.0

### Exercise 4
Can you join two RDDs you created at exercise 3? Please join them and see the results

In [32]:
joinedRDD = movieRatingsFinal.join(movieGenresFinal)
print(joinedRDD.take(10))

[('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '1.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '2.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '1.5'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0'))), ('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0')))]


### Exercise 5

Can you explain with the below mapping operation is performing? You should execute the following operation on your joined RDD you created in Exercise 4

In [34]:
joinedRDD.map(lambda x: (x[0], (x[1][0]+ x[1][1]))).map(lambda x: ((x[0],) + x[1])).take(10)

[('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '1.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '2.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '1.5'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0'),
 ('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0')]

The code above merges the subtuples in an entry together.

From 
```
('4', (('Waiting to Exhale (1995)', '3.0'), ('Comedy|Drama|Romance', '3.0')))
```
To
```
('4', 'Waiting to Exhale (1995)', '3.0', 'Comedy|Drama|Romance', '3.0')
```

### Exercise 6 (Advanced)

We have done enough works with multivalued mapreduce operations. Now is the time to use your own understanding to perform the multivalued ratings in your own style.

I have given your 4 files together: Links, Movies, Ratings, Tags. You have to use at least 3 of these files. You first need to derive
your proposal based on the example given over here(You proposal will not be related to find out the highest average rated movie. It could be anything else, such as adding links of the movies under Action genere) . Based on your analysis proposal, perform the data analysis as shown in this assignment.

Let's say, I want to get all movie titles along with their average ratings, and imdb ID's. 

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import count

sps = SparkSession.builder.master("local[4]").appName("movie-app").getOrCreate()
sc = sps.sparkContext

In [3]:
# links = sps.read.csv('links.csv', header=True, inferSchema=True)
# ratings = sps.read.csv('ratings.csv', header=True, inferSchema=True)
# movies = sps.read.csv('movies.csv', header=True, inferSchema=True)

# imdb_set = movies.join(links, 'movieId').join(ratings, 'movieId')
# comedy_set = imdb_set.filter(imdb_set.genres.contains('Comedy'))

# sum_df = comedy_set.select(F.sum(comedy_set.rating).alias('rating total'))

# sum_df.show()
# comedy_sum = sum_df.first()['rating total']
# total = comedy_set.count()

# print(f'Number of comedy movies: {total}')
# print(f'Average rating: {comedy_sum / total}')

link = sc.textFile('links.csv')
header = link.first()
linksRDD = link.filter(lambda x: x!= header)

movie = sc.textFile('movies.csv')
header = movie.first()
moviesRDD = movie.filter(lambda x: x!= header).map(lambda x: x.split(","))

rating = sc.textFile('ratings.csv')
header = rating.first()
ratingsRDD = rating.filter(lambda x: x!= header).map(lambda x: (x.split(",")[1], x.split(",")[2]))

linksRDD.collect()

['movieId,imdbId,tmdbId',
 '1,0114709,862',
 '2,0113497,8844',
 '3,0113228,15602',
 '4,0114885,31357',
 '5,0113041,11862',
 '6,0113277,949',
 '7,0114319,11860',
 '8,0112302,45325',
 '9,0114576,9091',
 '10,0113189,710',
 '11,0112346,9087',
 '12,0112896,12110',
 '13,0112453,21032',
 '14,0113987,10858',
 '15,0112760,1408',
 '16,0112641,524',
 '17,0114388,4584',
 '18,0113101,5',
 '19,0112281,9273',
 '20,0113845,11517',
 '21,0113161,8012',
 '22,0112722,1710',
 '23,0112401,9691',
 '24,0114168,12665',
 '25,0113627,451',
 '26,0114057,16420',
 '27,0114011,9263',
 '28,0114117,17015',
 '29,0112682,902',
 '30,0115012,37557',
 '31,0112792,9909',
 '32,0114746,63',
 '34,0112431,9598',
 '36,0112818,687',
 '38,0113442,33689',
 '39,0112697,9603',
 '40,0112749,34615',
 '41,0114279,31174',
 '42,0112819,11443',
 '43,0114272,35196',
 '44,0113855,9312',
 '45,0114681,577',
 '46,0113347,11861',
 '47,0114369,807',
 '48,0114148,10530',
 '49,0114916,8391',
 '50,0114814,629',
 '52,0113819,11448',
 '53,0110299,4913