### COMP5349 Week 5 Spark Lab Sample Code

This is a sample notebook showing basic spark RDD operations.
The program has two input data sources: *ratings.csv* and *movies.csv*
The *movies.csv* file contains movie information. Each  row represents one movie, and has the following format:
```
movieId,title,genres
```

The *ratings.csv* file contains rating information. Each row represents one rating of one movie by one user, and has the following format:

```
userId,movieId,rating,timestamp
```

Spark can get data from various data source. The example obatin data from an external cluster and write the output on your own HDFS.

Click *run cell* button on the menu to run the following two cells.

In [1]:
import findspark
from functions import *
findspark.init()
from pyspark import SparkContext
from datetime import datetime

In [2]:
sc = SparkContext(appName="Top10_result")

In [3]:

#You can change the input path pointing to your own HDFS
#If spark is able to read hadoop configuration, you can use relative path
input_path = "../data/"
#'hdfs://soit-hdp-pro-1.ucc.usyd.edu.au/share/movie/small/'

#Relative path is used to specify the output directory
#The relative path is always relative to your home directory in HDFS: /user/<yourUserName>
output_path = 'Top10_result'

data = sc.textFile(input_path + "AllVideos_short.csv")
# ratings = sc.textFile(input_path + "ratings.csv")
# movieData = sc.textFile(input_path + "movies.csv")

In [4]:
header = data.first()

In [5]:
allVideos = data.filter(lambda line:line!=header).map(extractColumns)
allVideos.collect()

[(('SbOwzAl9ZfQ', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'Entertainment', 4182, 361)),
 (('klOV6Xh-DnI', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'People & Blogs', 271, 174)),
 (('6L2ZF7Qzsbk', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 10105, 266)),
 (('hcY52MFWMDM', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 378, 171)),
 (('_OXDcGPVAa4', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'Howto & Style', 57781, 681)),
 (('Q9kK6NWZR1U', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'Music', 506, 67)),
 (('c9VTD3n_IDs', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'People & Blogs', 2277, 69)),
 (('XzULSsZYMRc', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 7745, 659)),
 (('uijjYNtl_UM', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'Entertainment', 20155, 912)),
 (('cOJ68MQm2ac', 'MX'),
  (datetime.datetime(2017, 11, 14, 0, 0), 'Entertainment', 83582, 2194)),
 (('rZZEeeAVgog', 'MX'),
  (da

In [6]:
GroupBy_VideoId_Country = allVideos.groupByKey(1)
GroupBy_VideoId_Country.collect()

[(('SbOwzAl9ZfQ', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3860>),
 (('klOV6Xh-DnI', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3160>),
 (('6L2ZF7Qzsbk', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d39b0>),
 (('hcY52MFWMDM', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d38d0>),
 (('_OXDcGPVAa4', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3a90>),
 (('Q9kK6NWZR1U', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3b00>),
 (('c9VTD3n_IDs', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3ba8>),
 (('XzULSsZYMRc', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d39e8>),
 (('uijjYNtl_UM', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3c88>),
 (('cOJ68MQm2ac', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3cf8>),
 (('rZZEeeAVgog', 'MX'),
  <pyspark.resultiterable.ResultIterable at 0x7fa3239d3d68>),
 (('kTT472QeJGg', 'MX'),
  <pyspark.resulti

In [None]:
#         trending_date = line[1]
#     for t in y.collect():
#     print((t[0],[i for i in t[1]]))

In [16]:
def sortByDate(video_list):

    VideoId_Country, TrendingList = video_list    
    sortedByDate = sorted(TrendingList, key=lambda date:date[0], reverse=True)

    if len(sortedByDate) >= 2:    
        return VideoId_Country, sortedByDate[:2]

In [18]:

# for id_country, (trending_date, category, likes, dislikes) in video_record:
# video.reduceByKey(append(value) for key, value in )    

a = GroupBy_VideoId_Country.map(sortByDate).filter(lambda x:x!=None)

a.collect()

[(('SbOwzAl9ZfQ', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'Entertainment', 5891, 553),
   (datetime.datetime(2017, 11, 14, 0, 0), 'Entertainment', 4182, 361)]),
 (('6L2ZF7Qzsbk', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'News & Politics', 18377, 586),
   (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 10105, 266)]),
 (('hcY52MFWMDM', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'News & Politics', 846, 393),
   (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 378, 171)]),
 (('_OXDcGPVAa4', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'Howto & Style', 93269, 1792),
   (datetime.datetime(2017, 11, 14, 0, 0), 'Howto & Style', 57781, 681)]),
 (('c9VTD3n_IDs', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'People & Blogs', 3809, 138),
   (datetime.datetime(2017, 11, 14, 0, 0), 'People & Blogs', 2277, 69)]),
 (('7jmJtdqI6YE', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'Entertainment', 347, 78),
   (datetime.datetime(2017, 11, 14, 0,

In [19]:
a.map(Function).take(5)

[(('SbOwzAl9ZfQ', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'Entertainment', 5891, 553),
   (datetime.datetime(2017, 11, 14, 0, 0), 'Entertainment', 4182, 361)]),
 (('6L2ZF7Qzsbk', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'News & Politics', 18377, 586),
   (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 10105, 266)]),
 (('hcY52MFWMDM', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'News & Politics', 846, 393),
   (datetime.datetime(2017, 11, 14, 0, 0), 'News & Politics', 378, 171)]),
 (('_OXDcGPVAa4', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'Howto & Style', 93269, 1792),
   (datetime.datetime(2017, 11, 14, 0, 0), 'Howto & Style', 57781, 681)]),
 (('c9VTD3n_IDs', 'MX'),
  [(datetime.datetime(2017, 11, 15, 0, 0), 'People & Blogs', 3809, 138),
   (datetime.datetime(2017, 11, 14, 0, 0), 'People & Blogs', 2277, 69)])]

In [None]:
data.map(extractColumns).collect()

#### What to expect
If successful, you will not see any output. The output is written toHDFS. use ```hdfs dfs -cat ratingOut/part-00000``` on command line to read the content of the output. Similar to MapReduce, Spark named its output as ```part-xxxxx```

If not successful, some error message will print out as output. The most likely cause would be input file does not exists or output path already exists. Spark also creates a new output directory if you need to write output. Remember to remove the existing directory or change the output_path name for multiple run.

**Remember to run the following cell to close the sparkcontext whethre spark program runs without or with any error**

In [None]:
#always run this to close the sparkcontext whether you spark program runs without or with any error.
sc.stop()