# Big Data Processing Coursework





In this short notebook, we will load and explore the movielens dataset. Specifically, this notebook covers:

Loading data in memory
Creating SQLContext
Creating Spark DataFrame
Group data by columns
Operating on columns
Running SQL Queries from a Spark DataFrame
Loading in a DataFrame

Build a recommendation system which uses transactional data linking a user and an item to get a list of items to recommend to the user. There are 2 approaches:
## Collaborative Filtering
In the MovieLens dataset, there are movies previously rated by a user, and the view is to attempt to identify if users who previously behaved similarly, ie liked/ disliked similar movies in the past, will have similar behaviors in the future. The input information is a user and the output is a list of items and their associated score. 
## Content based Filtering
In the MovieLens dataset, there are further information contained about each individual item, ie the movie and there are additional information supplied such as tags by users, genre information which can be used to further compare similar items.  The input information would be a model and the output information a list of items and their associated score.

## Our approach
For this project, item based collaborative filtering has been selected. By comparing every pair of items (movies X, Y), try to find users who rated both items. Create a vector for each item and calculate the correlation btetween these vectors. When someone rates an item, we take this as our input and recommend other items (movies) most correlated with that item. 

First, let's get the data that we will working with in this notebook. We are using two files from the MovieLens dataset.  As part of this project, data was already set up, however the way to obtain the data has been added for completeness.

In [1]:
#!wget --quiet http://www.grouplens.org/system/files/ml-100k.zip | unzip -q -o -d /data/movie-ratings/ | hadoop fs -put - /data/movie-ratings/

In [2]:
!hadoop fs -ls  '/data/movie-ratings'

Found 5 items
drwxr-xr-x   - alvarogr ECS640U          0 2017-12-11 11:52 /data/movie-ratings/cv
drwxr-xr-x   - hdfs     bigdata          0 2015-12-01 10:57 /data/movie-ratings/ml-10M100K
-rw-r--r--   3 hdfs     bigdata     522197 2015-12-01 10:17 /data/movie-ratings/movies.dat
-rw-r--r--   3 hdfs     bigdata  265105635 2015-12-01 10:17 /data/movie-ratings/ratings.dat
-rw-r--r--   3 hdfs     bigdata    3584119 2015-12-01 10:17 /data/movie-ratings/tags.dat


In [3]:
!hadoop fs -ls  '/data/movie-ratings/ml-10M100K'

Found 6 items
-rw-r--r--   3 hdfs bigdata      11135 2015-12-01 10:57 /data/movie-ratings/ml-10M100K/README.html
-rw-r--r--   3 hdfs bigdata        753 2015-12-01 10:57 /data/movie-ratings/ml-10M100K/allbut.pl
-rw-r--r--   3 hdfs bigdata     522197 2015-12-01 10:57 /data/movie-ratings/ml-10M100K/movies.dat
-rw-r--r--   3 hdfs bigdata  265105635 2015-12-01 10:57 /data/movie-ratings/ml-10M100K/ratings.dat
-rw-r--r--   3 hdfs bigdata       1092 2015-12-01 10:57 /data/movie-ratings/ml-10M100K/split_ratings.sh
-rw-r--r--   3 hdfs bigdata    3584119 2015-12-01 10:57 /data/movie-ratings/ml-10M100K/tags.dat


In [4]:
!hadoop fs -ls  '/data/movie-ratings/cv'

Found 2 items
drwxr-xr-x   - alvarogr ECS640U          0 2017-12-11 11:52 /data/movie-ratings/cv/10-item
drwxr-xr-x   - alvarogr ECS640U          0 2017-12-11 11:53 /data/movie-ratings/cv/5-fold


In [5]:
!hadoop fs -cat /data/movie-ratings/ml-10M100K/README.html 

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="en" xml:lang="en">
  <head>
    <meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
    <style type="text/css">
      h1 {
        color:#fc3;
        font-family:"Lucida Grande",Verdana,sans-serif; 
        font-size: 150%; 
        font-weight: normal; 
        margin:34px 0 0;
        background-color: #7A0019;
      }
      p {
        margin-left: 20px;
      }
      p.file_line_structure {
        margin-left: 40px;
      }
      table {
        margin-left: 30px;
      }
      th {
        text-align:left;
      }
    </style>

    <title>MovieLens 10M/100k Data Set README</title>
  </head>
  <body>
    <h1>
        Summary
    </h1>
    <p>
      This data set contains 10000054 ratings and 95580 tags 
      applied to 10681 movies by 71567 users of the 
      online 

## Importing External files/Libraries¶

Ensure the environment variables for loading sparkfrom jupyter notebook is setup correctly.

In [6]:
#!/usr/bin/python
import os
os.environ['SPARK_HOME']='/usr/lib/spark'

To use print function from python 3, use the from future command.  Ensure that jupyter notebook can find spark by using the findspark library, this references SPARK_HOME environment variable set up earlier.  Any external libraries imported need to be installed using pip install example, however if lacking admin permissions do pip install example --user  for example pip install sys --user.  Any error that has ImportError: No module named example means a pip install is required for a module named example.

In [29]:
#!/usr/bin/python
from __future__ import print_function 


import findspark
findspark.init()

from pyspark import SparkConf, SparkContext,sql

import sys
import re
import random
import array
import numpy as np
import scipy.sparse as sps
import plotly

## Load PySpark

In [8]:
sc = SparkContext(appName = "MovieLens").getOrCreate()
sqlContext = sql.SQLContext(sc)

## Custom Functions 

Functions for parsing movielens data and functions for comparing item similarity. Add the files to the spark context. Import the functions to be used here to use shorthand notation and not do filename.function

In [9]:
sc.addPyFile('movielensfcn.py')
sc.addPyFile('similarity.py')

In [10]:
from movielensfcn import parseMovies,parseRatings, removeDuplicates, itemItem
from similarity import cosine_similarity, jaccard_similarity, pearson_similarity

## Resilient Distributed Dataset (RDD) creation 

###  Review the raw data files
Inspect the files to see what we are dealing with. Review the contents of the movies.dat and ratings.dat files.

In [11]:
ratings_file = "/data/movie-ratings/ratings.dat"
movies_file = "/data/movie-ratings/movies.dat"

Read the text file from Hadoop File System (HDFS) and return it as an RDD of strings. An RDD is a distibuted collection which is automatically distributed by spark and you can set how many partitions it is distributed over.

In [12]:
ratings_raw = sc.textFile(ratings_file)
movies_raw = sc.textFile(movies_file)

## Sanity checking

According to the read me there are 10681 movies and 10000054 ratings.  Double check the data corresponds to this. and check timings for larger file set to see if optimization of partitions is required. 

In [35]:
%timeit numMovies = movies_raw.count()
print('There are {0} rows in the {1} file'.format(numMovies,movies_file))
# tt = time() - t0
# print('Count completed in {0} seconds'.format(tt))

The slowest run took 185.06 times longer than the fastest. This could mean that an intermediate result is being cached.
1 loop, best of 3: 87.4 ms per loop
There are 10681 rows in the /data/movie-ratings/movies.dat file


In [37]:
%timeit numRatings = ratings_raw.count()
print('There are {0} rows in the {1} file'.format(numRatings, ratings_file))

1 loop, best of 3: 20.1 s per loop


NameError: name 'numRatings' is not defined

Since results as expected.  Proceed with notebook.

Since there are approximately 1M records, it may be faster to set the number of partitions on spark.  Since the movie file is relatively small with approximately 10K records we can hold in memory using collect

In [15]:
numPartitions =1000

## Inspecting the Raw Data

Using take(n) on the RDD to return an array of n elements 

In [16]:
ratings_raw.take(5)

[u'1::122::5::838985046',
 u'1::185::5::838983525',
 u'1::231::5::838983392',
 u'1::292::5::838983421',
 u'1::316::5::838983392']

In [17]:
movies_raw.take(5)

[u'1::Toy Story (1995)::Adventure|Animation|Children|Comedy|Fantasy',
 u'2::Jumanji (1995)::Adventure|Children|Fantasy',
 u'3::Grumpier Old Men (1995)::Comedy|Romance',
 u'4::Waiting to Exhale (1995)::Comedy|Drama|Romance',
 u'5::Father of the Bride Part II (1995)::Comedy']

<ol>
    <li>There is no header file.</li>
    <li>Notice that the columns are separated by :: </li>
    <li>Ratings File Format: user::movie::rating::timestamp</li>
    <li>Movies File format: movie::titleandyear::genre.  The genre fields are separated by |</li>
</ol>

## Data Preprocessing and Feature Selection

Preliminary investigations has shown that file ending with .dat are separated by ::, so split the contents to get the corresponding fields for user, movie, rating.

In [18]:
print('The data will be split into {0} partitions'.format(numPartitions))
print('Processing the following files: \nRatings:{0}\nMovies:{1}'.format(ratings_file, movies_file))

The data will be split into 1000 partitions
Processing the following files: 
Ratings:/data/movie-ratings/ratings.dat
Movies:/data/movie-ratings/movies.dat


In [None]:
# Map transformation


In [26]:
if (ratings_file.find('.dat') != -1):
    movies_tmp= movies_raw.map(lambda line: re.split(r'::'))
    movies= movies_tmp.map(lambda line: (int(line[0]),(line[1],line[2])))
    ratings_tmp = ratings_raw.map(lambda line: re.split(r'::'))\
        .partitionBy(numPartitions)
    ratings = ratings_tmp.map(lambda line: (int(line[0]),(int(line[1]),float(line[2]))))\
                         .partitionBy(numPartitions)
else:
    ratings_header = ratings_raw.take(1)[0]
    movies_header = movies_raw.take(1)[0]
    movies= movies_raw.filter(lambda line: line!=movies_header)\
                    .map(lambda line: re.split(r',',line)).map(lambda line: (int(line[1]),(line[0],line[2])))
    ratings = ratings_raw.filter(lambda line: line!=ratings_header)\
                    .map(lambda line: re.split(r',',line))\
                    .map(lambda x: (int(line[1]),(int(line[0]),float(line[2]))))\
                    .partitionBy(numPartitions)


In [27]:
ratings_tmp.take(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 40, 138.37.32.71, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/worker.py", line 111, in main
    process()
  File "/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1702, in add_shuffle_key
  File "<ipython-input-26-b4bea9263dba>", line 4, in <lambda>
TypeError: split() takes at least 2 arguments (1 given)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1445)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1444)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1444)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1668)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1627)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1616)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1862)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1875)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1888)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/worker.py", line 111, in main
    process()
  File "/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1702, in add_shuffle_key
  File "<ipython-input-26-b4bea9263dba>", line 4, in <lambda>
TypeError: split() takes at least 2 arguments (1 given)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


If the file has a header, strip it out by filtering the header contents.  Make the user the key for ratings.

In [None]:
movies.toDF(['item_id', "title", "genre"])

In [None]:
ratings.take(5)

In [None]:
# ratingsDF = ratings.toDF(['user_id','item','rating'])

In [None]:
RatingsDF = ratings.toDF(['item_id','userid_rating'])

Let's check the type of RatingsDF

In [None]:
type(RatingsDF)

The printSchema() method gives more details about the DataFrame’s schema and structure:

In [None]:
RatingsDF.printSchema()

In [None]:
RatingsDF.show(5)

How many movies do we have in the movies file?

In [None]:
numMovies = ratings.values().map(lambda line: line[1]).distinct().count()
print("number of movies: {0}".format(numMovies))

How many users have rated our movies?

In [None]:
numUsers = ratings.values().map(lambda line: line[0]).distinct().count()
print("number of users: {0}".format(numUsers))

In [None]:
numRatings = ratings.count()
print("total number of ratings: {0}".format(numRatings))

## Co-rated Items
Firstly join the ratings against itself to get all pairs of co-rated items (movies).

In [None]:
# user_ratings_data = ratings.join(ratings)

Join two dataframes and get only one 'item_id' column 

In [None]:
user_ratingsDF = RatingsDF.join(RatingsDF,'item_id')

In [None]:
user_ratings_data.take(5)

In [None]:
user_ratingsDF.show(5)

In [None]:
user_ratingsDF2 = sqlContext.createDataFrame(user_ratings_data,['item_id','userid_rating'])

In [None]:
user_ratingsDF2.show(5)

# Filter transformation


Remove a rating if a user gives the same value for the same movie

The join function combines two datasets (Key,ValueV) and (Key,ValueW) together to get (Key, (ValueV,ValueW)).  Let's join the movie and ratings file together to get meaningful recommendations

In [34]:
%timeit unique_joined_ratings = user_ratings_data.filter(removeDuplicates)

NameError: global name 'user_ratings_data' is not defined

Map RDDs

In [None]:
movie_pairs = unique_joined_ratings.map(itemItem).partitionBy(numPartitions)

Now group all ratings together for the same movie

In [None]:
movie_pairs_ratings= movie_pairs.groupByKey()

In [None]:
algorithms=["JACCARD,""COSINE", "PEARSON"]

In [None]:
algorithm ="COSINE"

In [None]:
if algorithm == algorithms[0] :
	item_item_similarities = movie_pairs_ratings.mapValues(jaccard_similarity).persist()
elif algorithm == algorithms[1]  :
	item_item_similarities = movie_pairs_ratings.mapValues(cosine_similarity).persist()
elif algorithm == algorithms[2]  :
	item_item_similarities = movie_pairs_ratings.mapValues(pearson_similarity).persist()
else:
	item_item_similarities = movie_pairs_ratings.mapValues(cosine_similarity).persist()

Sort the item pairs (each co-rated movie)

In [None]:
item_item_sorted=item_item_similarities.sortByKey()
item_item_sorted.persist()

Set up your parameters

In [None]:
movie_id = 1

In [None]:
selectedmovie = sqlContext.sql('select  fro  movies limit 1')
print("You have selected {0}".format(movie.filter(movie_id))

In [None]:
threshold = float(0.97)
topN= int(10)

Filter for movies with this similarity that are "good" as defined by our quality thresholds above

In [None]:
filteredResults = item_item_sorted.filter(lambda((item_pair,similarity_occurence)): \
        (item_pair[0] == movie_id or item_pair[1] == movie_id) \
        and similarity_occurence[0] > threshold and similarity_occurence[1] > minOccurence)

In [None]:
results = filteredResults.map(lambda((x,y)): (y,x)).sortByKey(ascending = False)

In [None]:
resultsTopN = sc.parallelize(results.take(topN))
results.coalese(1).saveAsTextFile("movielens")

In [None]:
results.registerTempTable("table_topN")
df2.write.mode("append").saveAsTable("table_topN")

Display data as a heatmap

In [None]:
plotly.offline.init_notebook_mode()
plotly.offline.iplot(resultsTopN, filename='basic-heatmap')

In [None]:
sc.stop()

This program was tested with the following versions:

In [None]:
%reload_ext version_information
%version_information numpy, scipy, matplotlib, pyspark