# Getting started with Apache Spark

The following sections show how to read a data file into Apache spark and perform basic operations __map__ and __countByValue__.

## 1 Download dataset

The code below downloads a dataset containing movie ratings from [Group Lense](https://grouplens.org/datasets/movielens/100k/) containing 100K movie ratings from MovieLens into our defined data folder. 
- 100,000 ratings
- from 1000 users
- on 1700 movies
- Released 4/1998

In [1]:
import wget
import zipfile
import os

data_url = "http://files.grouplens.org/datasets/movielens/ml-100k.zip"
folder_name = "ml-100k"
file_name_zip = folder_name + ".zip"

In [2]:
if(not os.path.isdir(folder_name)):
    wget.download(data_url, file_name_zip)
    with zipfile.ZipFile(file_name_zip,"r") as zip_ref:
        zip_ref.extractall()

## 2 Create spark context and load file

As a frist step, lets create a spark cluster without nodes for local execution. 

In [38]:
import pyspark
from pyspark import SparkConf, SparkContext
import collections

if(not "sc" in globals()):
    conf = SparkConf().setMaster("local").setAppName("00_Spark_basics")
    sc = SparkContext(conf = conf)

Load the downloaded data file into the created spark context to create a RDD.

In [36]:
lines = sc.textFile(folder_name + "/u.data")
rdd = lines.map(lambda x: x.split())
type(rdd)

pyspark.rdd.PipelinedRDD

The lines below give you a basic overview over the data frame. To do so, the created execution graph has to be computed (in this case using the __collect()__ statement. The code below, defines a print function that will be used throughout this notebook to visualize the operations on the rdd using pandas.

In [39]:
import pandas as pd

def print_rdd_overview(rdd:pyspark.rdd.PipelinedRDD, columns=None, head_size=10):
    return pd.DataFrame(rdd.collect(), columns=columns).head(head_size)
    
print_rdd_overview(rdd, columns=("user id", "item id", "rating", "timestamp"))

Unnamed: 0,user id,item id,rating,timestamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596
5,298,474,4,884182806
6,115,265,2,881171488
7,253,465,5,891628467
8,305,451,3,886324817
9,6,86,3,883603013


## 3 Show rating distribution over all ratings

Lets output the rating distribution for the given movie ratings file using __countByValue__.

In [85]:
%matplotlib inline
import matplotlib.pyplot as plt

___NOTE:___ The method __countByValue__ is an action that triggers the execution of the created function graph and returns unique value of a given RDD as a dictionary of (value, count) pairs. A full list of all triggers for graph execution can be found [here](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions).

In [88]:
ratings = rdd.map(lambda x: x[2])
movie_ratings = ratings.countByValue()
print(type(movie_ratings))

sorted_ratings = sorted(movie_ratings.items())
pd.DataFrame(sorted_ratings, columns=("star rating", "# of ratings"))

<class 'collections.defaultdict'>


Unnamed: 0,star rating,# of ratings
0,1,6110
1,2,11370
2,3,27145
3,4,34174
4,5,21201


Alternatively we may utilize operations such as __map__, __reduceByKey__ and __sortByKey__ to yield a similar result.

___NOTE:___ In this example, the outcome of operations into movie_ratings still maintains the execution graph until the __collect()__ action is exectued as part of the print_rdd_overview method.

In [93]:
movie_ratings = ratings.map(lambda x: (x,1)) \
                       .reduceByKey(lambda x, y: x + y) \
                       .sortByKey()
print(type(movie_ratings))

df = print_rdd_overview(movie_ratings, columns=("star rating", "# of ratings"))
df

<class 'pyspark.rdd.PipelinedRDD'>


Unnamed: 0,star rating,# of ratings
0,1,6110
1,2,11370
2,3,27145
3,4,34174
4,5,21201


## 4 Which user created most ratings

Let's use the same approach to obtain the number of ratings by user ID.

In [133]:
ratings_by_user = rdd.map(lambda x: (x[0],1)) \
   .reduceByKey(lambda x, y: x + y) \
   .map(lambda x: (x[1], x[0])) \
   .sortByKey(ascending=False)

print_rdd_overview(ratings_by_user, columns=("# of ratings", "user id"), head_size=1)

Unnamed: 0,# of ratings,user id
0,737,405
