## PySpark Cheat Sheet

Apache Spark and Apache Hadoop are both open-source frameworks for big data processing. Spark can be 100x faster than Hadoop for large scale data processing, however, Hadoop has distributed file system (HDFS) while Spark is built on resilient distributed datasets (__RDDs__). Usually, we use Spark for data computation on top of Hadoop.  

In this article, we will mainly talk about the data manipulation on __RDD__.

### Dataset
We will be using the movielens dataset (ml-100k) for demo.

source: https://grouplens.org/datasets/movielens/

### Reference

More Information: https://www.datacamp.com/community/blog/pyspark-cheat-sheet-python

In [1]:
import numpy as np
import pandas as pd
from pyspark import SparkContext, StorageLevel, SparkConf
from pyspark.sql import SparkSession


### 1. Initializing Spark Environment

Whenever we want to use the Spark Engine, we need to first initialize the Spark environment.  

#### SparkContext

The driver program will use the SparkContext to connect and communicate with the cluster and it coordinate the jobs with the resource management system.

#### SparkSession

Starting from Spark 2.0, SparkSession built the gateway to different data sources, such as SQL or Hive. In Spark 1.x, we need to define the SQLContext or HiveContext to communicate with the corresponding sources.

_In short, we need to define the SparkSession if we want to use the Spark Dataframe or Dataset, otherwise SparkContext can execute the RDDs_

In [2]:
conf = SparkConf()
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext.getOrCreate(conf)
spark = SparkSession.builder.getOrCreate()

### 2. Loading Data

We are able to create the _rdd_ with iterable data or external data sources.

We will first load the movielen datafile as text string, and do the manipulation later.

_Note: SparkSession need to be defined before accessing the __rdd.toDF()__ function_

In [3]:
data_path = '../data/ml-100k/'
data = [('apple', 10),('orange', 20),('peach', 100)]

rdd = sc.parallelize(data)
rdd_list = sc.parallelize(range(100))
rdd_movie = sc.textFile(data_path+"u.data")

In [15]:
# to create the Dataframe, we can use rdd.toDF()

dfFromRDD1 = rdd.toDF()
dfFromRDD1.show()

+------+---+
|    _1| _2|
+------+---+
| apple| 10|
|orange| 20|
| peach|100|
+------+---+



### 3. Data Retrieval

By using the following commands, we are able to retrieve information of the data in the rdd.

- getNumPartitions()
- count(), countByKey(), countByValue()
- collect(), collectAsMap(), take(_num_), top(_num_)
- max(), min(), mean(), stdev(), variance(), histogram(_num of bin_), stats()

In [4]:
rdd_movie.take(3)

['196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116']

### 4. Data Processing

Processing data in PySpark might reminds you about the __Pandas__ Dataframe. Similar to Pandas, PySpark also provide the functionality to _group, aggregate, sort, and reduce_. However, there are some functions which are similar but perform differently. 

#### map vs flatMap

map and flatMap looks similar but they are different, and worth extra attention. 

The map function is able to maintain the list structure of the original data shape, and the flatMap will unpack the list structure and form a large list data structure. 

In the following cells, we will visualize the difference by separating the rdd_movie rdd with proper delimiter.

In [5]:
# map function
rdd_movie.map(lambda x: x.split("\t")).take(3)

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116']]

In [6]:
# flatMap function
rdd_movie.flatMap(lambda x: x.split("\t")).take(8)

['196', '242', '3', '881250949', '186', '302', '3', '891717742']

#### groupBy vs groupByKey

groupBy function will group the data based on the result from the function in the input, groupByKey will group the data based on the key of the original rdd.

In [7]:
# we will show the groupBy function by separating every 10 userid
# for example, [0,10,20..] userid will be in the same group

groupby_sample = rdd_movie.map(lambda x: x.split("\t")) \
                            .groupBy(lambda x: int(x[0]) % 10).take(1)

# we will only print 4 data in the list
for x,y in groupby_sample:
    print(x)
    for val in list(y)[:4]:
        print(val)


6
['196', '242', '3', '881250949']
['186', '302', '3', '891717742']
['166', '346', '1', '886397596']
['6', '86', '3', '883603013']


In [8]:
# to illustrate the groupbykey function, we will first transform the data into the 
# (K,V) pair. In this example, we define the rating to be the key and group by the rating

groupbykey_sample = rdd_movie.map(lambda x: x.split("\t")) \
                            .keyBy(lambda x: x[2]) \
                            .groupByKey().take(1)

# we will only print 4 data in the list
for x,y in groupbykey_sample:
    print(x)
    for val in list(y)[:4]:
        print(val)


1
['22', '377', '1', '878887116']
['166', '346', '1', '886397596']
['181', '1081', '1', '878962623']
['276', '796', '1', '874791932']


#### reduce vs reduceByKey

Before we dive into the code, we need to first realize that _reduce_ is an action and _reduceByKey_ is transformation. 

Definition on the original document: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

_reduce: Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel._

_reduceByKey: When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument._

In this section, we will demonstrate some examples of the functions.

In [9]:
# we will use the rdd data to demonstrate the reduce function. Similar to the 
# collect(), collectAsMap() functions, the reduce function will return the result

rdd.reduce(lambda a,b: a+b)

('apple', 10, 'orange', 20, 'peach', 100)

In [10]:
# Next, we can use reduceByKey to calculate the average rating by person
# we transform the rdd to (userid, (sum of rating, count of rating)), then calculate the average rating 

user_rating_count_pair = rdd_movie.map(lambda x: x.split("\t")) \
                            .map(lambda x: tuple([x[0], (int(x[2]), 1)]))

user_rating_count_pair.take(3)

[('196', (3, 1)), ('186', (3, 1)), ('22', (1, 1))]

In [12]:
user_sum_rating_count_pair = user_rating_count_pair.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

user_sum_rating_count_pair.take(3)

[('22', (429, 128)), ('244', (869, 238)), ('115', (362, 92))]

In [13]:
user_avg_rating = user_sum_rating_count_pair.map(lambda x: tuple([x[0], x[1][0]/ x[1][1]]))
user_avg_rating.take(3)

[('22', 3.3515625), ('244', 3.6512605042016806), ('115', 3.9347826086956523)]

### 5. Sample usage on MovieLen dataset

In this section, we will use the PySpark RDD to process the data of the movie, and retrieve the corresponding genre.


In [21]:
data_path = '../data/ml-100k/'

# first we load the data into rdd
rdd_movie = sc.textFile(data_path+"u.item")
rdd_genre = sc.textFile(data_path+"u.genre")

rdd_movie = rdd_movie.map(lambda x: x.split("|"))
genre_map = rdd_genre.map(lambda x: x.split("|")) \
                    .map(lambda x: tuple([int(x[1]),x[0]])).collectAsMap()

In [22]:
genre_map

{0: 'unknown',
 1: 'Action',
 2: 'Adventure',
 3: 'Animation',
 4: "Children's",
 5: 'Comedy',
 6: 'Crime',
 7: 'Documentary',
 8: 'Drama',
 9: 'Fantasy',
 10: 'Film-Noir',
 11: 'Horror',
 12: 'Musical',
 13: 'Mystery',
 14: 'Romance',
 15: 'Sci-Fi',
 16: 'Thriller',
 17: 'War',
 18: 'Western'}

In [20]:
# From the data definition, the columns in u.items are
# movie id | movie title | release date | video release date |
# IMDb URL | unknown | Action | Adventure | Animation |
# Children's | Comedy | Crime | Documentary | Drama | Fantasy |
# Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
# Thriller | War | Western |

rdd_movie.take(1)

[['1',
  'Toy Story (1995)',
  '01-Jan-1995',
  '',
  'http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)',
  '0',
  '0',
  '0',
  '1',
  '1',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0']]

In [23]:
# Next we transform the data into [movieid, movie_name, list of genre]
def process_movie_data(data, genre_map):
    list_genre = []
    for i, ix in enumerate(data):
        # the genre columns start at index 5
        genre_idx = i - 5
        if genre_idx >= 0:
            if ix == '1':
                list_genre.append(genre_map[genre_idx])
    
    res = [data[0], data[1], list_genre]
    return res

rdd_movie_detail = rdd_movie.map(lambda x: process_movie_data(x, genre_map))

In [24]:
rdd_movie_detail.take(2)

[['1', 'Toy Story (1995)', ['Animation', "Children's", 'Comedy']],
 ['2', 'GoldenEye (1995)', ['Action', 'Adventure', 'Thriller']]]