# <center> Introduction to Spark In-memory Computing via Python PySpark </center>

In [1]:
import sys
import os
import pyspark

print(os.environ['SPARK_HOME'])
print(os.environ['SPARK_MASTER_HOST'])
print(os.environ['SPARK_MASTER_PORT'])
print(os.environ['SPARK_MASTER_WEBUI_PORT'])

/ifs/opt/spark/3.5.0
node042
51224
57057


In [2]:
!ls -lh $HOME/data/ml-latest

total 1.4G
-rw-r--r-- 1 lbn28 lbn28 396M Feb 21 10:19 genome-scores.csv
-rw-r--r-- 1 lbn28 lbn28  18K Feb 21 10:23 genome-tags.csv
-rw-r--r-- 1 lbn28 lbn28 1.3M Feb 21 10:23 links.csv
-rw-r--r-- 1 lbn28 lbn28 2.8M Feb 21 10:23 movies.csv
-rw-r--r-- 1 lbn28 lbn28 725M Feb 21 10:18 ratings.csv
-rw-r--r-- 1 lbn28 lbn28 9.6K Feb 21 10:23 README.txt
-rw-r--r-- 1 lbn28 lbn28  38M Feb 21 10:21 tags.csv


In [3]:
rating_file = os.getenv('HOME') + "/data/ml-latest/ratings.csv"
ratings = sc.textFile(rating_file).cache()

In [4]:
%%time
ratings.count()

CPU times: user 4.17 ms, sys: 7.41 ms, total: 11.6 ms
Wall time: 6.57 s


27753445

In [5]:
%%time
ratings.count()

CPU times: user 2.62 ms, sys: 2.6 ms, total: 5.22 ms
Wall time: 890 ms


27753445

In [6]:
print(ratings)

/home/lbn28/data/ml-latest/ratings.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


### Find movies which have the highest average ratings over the years and identify the corresponding genre.

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [9]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,307,3.5,1256677221',
 '1,481,3.5,1256677456',
 '1,1091,1.5,1256677471',
 '1,1257,4.5,1256677460']

In [10]:
ratings[2]

TypeError: 'RDD' object is not subscriptable

In [11]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

userId,movieId,rating,timestamp


In [12]:
ratingsOnly = ratings.filter(lambda x: x != ratingHeader)

In [13]:
ratingsOnly.take(5)

['1,307,3.5,1256677221',
 '1,481,3.5,1256677456',
 '1,1091,1.5,1256677471',
 '1,1257,4.5,1256677460',
 '1,1449,4.5,1256677264']

In [14]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [15]:
movieRatings.take(5)

[('307', 3.5), ('481', 3.5), ('1091', 1.5), ('1257', 4.5), ('1449', 4.5)]

**Possible approaches in aggregating data:** 
- groupByKey and mapValues
- reduceByKey and countByKey

**groupByKey and mapValues**

In [16]:
groupByKeyRatings = movieRatings.groupByKey()
groupByKeyRatings.take(5)

[('3826', <pyspark.resultiterable.ResultIterable at 0x15552e637490>),
 ('104', <pyspark.resultiterable.ResultIterable at 0x15552e6372b0>),
 ('153', <pyspark.resultiterable.ResultIterable at 0x15552e6371c0>),
 ('165', <pyspark.resultiterable.ResultIterable at 0x15552e637730>),
 ('181', <pyspark.resultiterable.ResultIterable at 0x15552e637100>)]

In [17]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)
mapValuesToListRatings.take(5)

[('2424',
  [3.5,
   2.5,
   4.0,
   2.0,
   2.5,
   5.0,
   3.0,
   3.0,
   3.5,
   2.5,
   3.0,
   3.5,
   4.0,
   1.0,
   2.5,
   3.0,
   4.0,
   4.0,
   3.0,
   5.0,
   4.0,
   2.5,
   2.5,
   2.5,
   3.0,
   4.0,
   5.0,
   3.0,
   2.0,
   5.0,
   3.0,
   2.0,
   2.0,
   3.0,
   5.0,
   3.5,
   0.5,
   5.0,
   4.0,
   2.5,
   3.0,
   3.0,
   4.0,
   4.0,
   2.0,
   2.5,
   3.5,
   1.0,
   2.0,
   3.0,
   2.5,
   2.0,
   1.5,
   3.0,
   3.0,
   1.0,
   2.5,
   2.0,
   3.0,
   3.0,
   2.0,
   1.5,
   3.0,
   1.0,
   2.5,
   3.5,
   4.0,
   2.5,
   2.0,
   1.0,
   3.5,
   4.0,
   3.5,
   4.0,
   2.5,
   4.0,
   2.0,
   4.0,
   3.0,
   3.5,
   3.0,
   2.5,
   3.0,
   3.0,
   2.5,
   4.0,
   3.5,
   3.5,
   3.5,
   4.0,
   3.0,
   2.0,
   3.5,
   2.5,
   3.0,
   3.0,
   1.5,
   3.5,
   2.5,
   3.0,
   4.0,
   4.5,
   2.5,
   2.0,
   3.0,
   5.0,
   3.5,
   4.0,
   2.5,
   3.0,
   3.0,
   1.5,
   3.0,
   3.0,
   3.5,
   2.5,
   2.0,
   3.0,
   1.0,
   3.0,
   2.5,
   4.0,
   3.5,
   2.0

In [18]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))
avgRatings01.take(5)

[('2424', 3.187726495726496),
 ('2761', 3.8424793091039944),
 ('2791', 3.787632990415897),
 ('4306', 3.7718788707128517),
 ('25850', 3.9313346228239845)]

In [22]:
tmpX = mapValuesToListRatings.take(5)

In [24]:
tmpX[0][0]

'2424'

In [25]:
sum(tmpX[0][1]) / len(tmpX[0][1])

3.187726495726496

Is this correct?

In [None]:
test = [2.0, 4.0, 3.0]
sum(test) / len(test)

**reduceByKey and countByKey**

In [None]:
countsByKey = movieRatings.countByKey()

countsByKey

In [None]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)

sumRatings.take(5)

In [None]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

In [None]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))

avgRatings02.take(5)

How do we augment movie ratings data with title information?

In [None]:
import os
uid = os.getenv('USER')
movie_file = "/scratch1/" + uid + "/movielens/movies.csv"
movies = sc.textFile(movie_file)

In [None]:
movieHeader = movies.first() #extract header
print(movieHeader)

In [None]:
movies = movies.filter(lambda x: x != movieHeader)

movies.take(50)

In [None]:
s = '40,"Cry, the Beloved Country (1995)",Drama'
s.rsplit(",",1)[0]

In [None]:
s.rsplit(",",1)[0].split(",",1)[1]

**NOTE:** This is not a good way to handle CSV parsing since some strings may contain commas. Instead Consider using a CSV library supported by Spark. We will show this in the next notebook in the series. The following will work for now but is difficult to understand.

In [None]:
movieInfo = movies.map(lambda line: (line.split(",")[0], ((line.rsplit(",",1)[0]).split(",",1)[1], line.rsplit(",",1)[1])))

movieInfo.take(50)

In [None]:
augmentedRatings = avgRatings01.join(movieInfo)

augmentedRatings.take(5)

In [None]:
x = ('2959',
  (4.229890025011034, ('Fight Club (1999)', 'Action|Crime|Drama|Thriller')))

In [None]:
x[0]

In [None]:
x[1]

In [None]:
x[1][0]

*Movie with highest average rating:*

In [None]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

*Movie with lowest average rating:*

In [None]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

### Challenge:

Make appropriate changes so that only movies with average ratings higher than 3.75 and number of ratings totalling at least 1000 are collected.

In [None]:
highRatings = sumRatings.filter(lambda x: countsByKey.get(x[0]) >= 1000)
print(highRatings.take(10))

avgHighRatings = highRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))
avgHighRatings = avgHighRatings.filter(lambda x: x[1] > 3.75)
augmentedHighRatings = avgHighRatings.join(movieInfo)

augmentedHighRatings.takeOrdered(10, key = lambda x : -x[1][0])

### Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating
- Each movie can have multiple genres. How to flip the Key/Value pair?

In [None]:
movieRatings.take(5)

In [None]:
movieInfo.take(5)

In [None]:
augmentedInfo = movieRatings.join(movieInfo)

In [None]:
augmentedInfo.take(5)

In [None]:
def extractGenreRating (t):
    final_tuples = []
    genreList = t[1][1][1].split("|")
    for genre in genreList:
        final_tuples.append((genre,t[1][0]))
    return final_tuples

print(extractGenreRating((u'1', (3.0, (u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy')))))

In [None]:
def countGenre (t):
    genreList = t[1][1][1].split("|")
    return (t[0],(len(genreList), t[1][0]))

print(countGenre(('112674', (5.0, ('Bulletproof Salesman (2008)', 'Documentary')))))

In [None]:
genreCount = augmentedRatings.map(countGenre)
genreCount.take(10)

In [None]:
genreStudy = avgRatings01.join(movieInfo)

In [None]:
genreRatings = augmentedInfo.flatMap(extractGenreRating)

In [None]:
genreRatings.take(20)

In [None]:
countsByKey = genreRatings.countByKey()

countsByKey

In [None]:
genreRatings.take(5)

### Challenge:

Calculate the average rating for each genre