# RDD object

In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by _run_module_as_main at /Users/mingchen/anaconda/lib/python3.5/runpy.py:170 

The class `pyspark.SparkContext` creates a client which connects to a Spark cluster. This client can be used to create an RDD object. There are two methods from this class for directly creating RDD objects:
* `parallelize()`
* `textFile()`

## `parallelize()`

`parallelize()` distribute a local **python collection** for form an RDD. Common built-in python collections include `dist`, `list`, `tuple` or `set`.

Examples:

In [2]:
# from a list
rdd = sc.parallelize([1,2,3])
rdd.collect()

[1, 2, 3]

In [3]:
# from a tuple
rdd = sc.parallelize(('cat', 'dog', 'fish'))
rdd.collect()

['cat', 'dog', 'fish']

In [4]:
# from a list of tuple
list_t = [('cat', 'dog', 'fish'), ('orange', 'apple')]
rdd = sc.parallelize(list_t)
rdd.collect()

[('cat', 'dog', 'fish'), ('orange', 'apple')]

In [5]:
# from a set
s = {'cat', 'dog', 'fish', 'cat', 'dog', 'dog'}
rdd = sc.parallelize(s)
rdd.collect()

['fish', 'dog', 'cat']

When it is a `dict`, only the keys are used to form the RDD.

In [6]:
# from a dict
d = {
    'a': 100,
    'b': 200,
    'c': 300
}
rdd = sc.parallelize(d)
rdd.collect()

['a', 'b', 'c']

## `textFile()`

The `textFile()` function reads a text file and returns it as an **RDD of strings**. Usually, you will need to apply some **map** functions to transform each elements of the RDD to some data structure/type that is suitable for data analysis.

**When using `textFile()`, each line of the text file becomes an element in the resulting RDD.**

Examples:

In [7]:
# read a csv file
rdd = sc.textFile('data/mtcars.csv')
rdd.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

In [8]:
# read a txt file
rdd = sc.textFile('data/twitter.txt')
rdd.take(5)

['Fresh install of XP on new computer. Sweet relief! fuck vista\t1018769417\t1.0',
 'Well. Now I know where to go when I want my knives. #ChiChevySXSW http://post.ly/RvDl\t10284216536\t1.0',
 '"Literally six weeks before I can take off ""SSC Chair"" off my email. Its like the torturous 4th mile before everything stops hurting."\t10298589026\t1.0',
 'Mitsubishi i MiEV - Wikipedia, the free encyclopedia - http://goo.gl/xipe Cutest car ever!\t109017669432377344\t1.0',
 "'Cheap Eats in SLP' - http://t.co/4w8gRp7\t109642968603963392\t1.0"]

# Map functions
These functions are probably the most commonly used functions when dealing with an RDD object. 

* `map()`
* `mapValues()`
* `flatMap()`
* `flatMapValues()`

## `map()`

The map() applies a function to each elements of the RDD.

In [9]:
rdd = sc.textFile('data/mtcars.csv')
rdd.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

Each element in the above RDD is a row of the *mtcars.csv* data set. We want to convert each element to a tuple so that the first element in the tuple is the car model, and the second element in the tuple is a list of values from the corresponding car model.

Step 1: before we apply the `map()` function, each elements in the RDD is a string. We can first split the split by ',' to get a list of string values.

In [10]:
rdd_1 = rdd.map(lambda x: x.split(','))
rdd_1.take(2)

[['',
  'mpg',
  'cyl',
  'disp',
  'hp',
  'drat',
  'wt',
  'qsec',
  'vs',
  'am',
  'gear',
  'carb'],
 ['Mazda RX4',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.62',
  '16.46',
  '0',
  '1',
  '4',
  '4']]

Step 2: the first string values in each lists is the car model and will be the first element in our tuple. The rest of strings values need to be put together to form the second element of our tuple.

In [11]:
rdd_2 = rdd_1.map(lambda x: (x[0], x[1:]))
rdd_2.take(2)

[('',
  ['mpg',
   'cyl',
   'disp',
   'hp',
   'drat',
   'wt',
   'qsec',
   'vs',
   'am',
   'gear',
   'carb']),
 ('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4'])]

Step 3: all the numbers are in string type. We can convert them to numeric type. But before we do that, we need to remove the first element, which is the header of the mtcars.csv file.

In [12]:
rdd_temp = rdd_2.filter(lambda x: x[0] != '')
rdd_temp.take(2)

[('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4'])]

Each element in the RDD is a tuple consisting of 2 elements: the first is a string value; the second is a list. We need to convert all strings in the list to numeric values. We use the **python `map`** function to do this: `map(float, x[1])`. The star(*) operator unpack the results. If we still want each elements to be a tuple consisting of two element: one string value and one list of numeric values, we need to wrap the results of `map(float, x[1])` with a pair of `[]` to create a list.

In [13]:
rdd_3 = rdd_temp.map(lambda x: (x[0], [*map(float, x[1])]))
rdd_3.take(2)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0])]

## `mapValues()`

This map function requires that each element in the RDD has a **key/value** pair structure, for example, a tuple of 2 items, or a list of 2 items.

The RDD object **rdd_temp** and **rdd_3** belong to this category. If we only want to operate on the values, we can use the `mapValues()` function.

In [14]:
rdd_mapValues = rdd_temp.mapValues(lambda x: [*map(float, x)])
rdd_mapValues.take(2)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0])]

When using `mapValues()`, the **x** in the above lambda function refers to the second value (which is the list) for each elements in the **rdd_temp** RDD object.

## `flatMap()`

This function **first** applies a function to each elements of an RDD and **then** flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.

Example:

In [15]:
rdd = sc.parallelize([('a', 'a', 'a'), ('b', 'b'), ('c', 'c')])
rdd.collect()

[('a', 'a', 'a'), ('b', 'b'), ('c', 'c')]

In [16]:
rdd.flatMap(lambda x: x).collect()

['a', 'a', 'a', 'b', 'b', 'c', 'c']

## `flatMapValues()`

This function implements the `flatMap` function on the value for each **key/value** pair elements. It applies a function only to the value of each **key/value** pairs and then flatten the results. 

A good use case is to use this function to **"melt"** a data frame, like the `melt()` function from the R package `reshape2`. To better explain this idea, we create a data frame with the **SparkSession** class.

In [17]:
df = spark.read.csv('data/airquality.csv', inferSchema=True, header=True, nullValue='NA')
df.toPandas().iloc[:5,]

Unnamed: 0,ozone,solar.r,wind,temp,month,day
0,41.0,190.0,7.4,67,5,1
1,36.0,118.0,8.0,72,5,2
2,12.0,149.0,12.6,74,5,3
3,18.0,313.0,11.5,62,5,4
4,,,14.3,56,5,5


We want to combine the first 4 columns into one column, but keep the column 'month' and 'day'.

First, let's create an RDD object.

In [18]:
airquality = df.rdd
airquality.take(5)

[Row(ozone=41, solar.r=190, wind=7.4, temp=67, month=5, day=1),
 Row(ozone=36, solar.r=118, wind=8.0, temp=72, month=5, day=2),
 Row(ozone=12, solar.r=149, wind=12.6, temp=74, month=5, day=3),
 Row(ozone=18, solar.r=313, wind=11.5, temp=62, month=5, day=4),
 Row(ozone=None, solar.r=None, wind=14.3, temp=56, month=5, day=5)]

Step 1: convert the RDD to a **key/value** pairs RDD so that the keys has the **month and day** info and the value has the **air quality** info.

In [19]:
aq_1 = airquality.map(lambda x: [x[4:], x[:4]])
aq_1.take(5)

[[(5, 1), (41, 190, 7.4, 67)],
 [(5, 2), (36, 118, 8.0, 72)],
 [(5, 3), (12, 149, 12.6, 74)],
 [(5, 4), (18, 313, 11.5, 62)],
 [(5, 5), (None, None, 14.3, 56)]]

Step 2: apply `MapValues()` function to associate values with air quality names.

In [20]:
aq_2 = aq_1.mapValues(lambda x: [('ozone', x[0]), ('solar.r', x[1]), ('wind', x[2]), ('temp', x[3])])
aq_2.take(5)

[((5, 1), [('ozone', 41), ('solar.r', 190), ('wind', 7.4), ('temp', 67)]),
 ((5, 2), [('ozone', 36), ('solar.r', 118), ('wind', 8.0), ('temp', 72)]),
 ((5, 3), [('ozone', 12), ('solar.r', 149), ('wind', 12.6), ('temp', 74)]),
 ((5, 4), [('ozone', 18), ('solar.r', 313), ('wind', 11.5), ('temp', 62)]),
 ((5, 5), [('ozone', None), ('solar.r', None), ('wind', 14.3), ('temp', 56)])]

Step 3: apply `flatMapValues()` function to flatten the air quality values.

In [21]:
aq_3 = aq_2.flatMapValues(lambda x: x)
aq_3.take(10)

[((5, 1), ('ozone', 41)),
 ((5, 1), ('solar.r', 190)),
 ((5, 1), ('wind', 7.4)),
 ((5, 1), ('temp', 67)),
 ((5, 2), ('ozone', 36)),
 ((5, 2), ('solar.r', 118)),
 ((5, 2), ('wind', 8.0)),
 ((5, 2), ('temp', 72)),
 ((5, 3), ('ozone', 12)),
 ((5, 3), ('solar.r', 149))]

Step 4: restructure the data so that it has 4 columns.

In [22]:
aq_4 = aq_3.map(lambda x: (x[0][0], x[0][1], x[1][0], x[1][1]))
aq_4.take(10)

[(5, 1, 'ozone', 41),
 (5, 1, 'solar.r', 190),
 (5, 1, 'wind', 7.4),
 (5, 1, 'temp', 67),
 (5, 2, 'ozone', 36),
 (5, 2, 'solar.r', 118),
 (5, 2, 'wind', 8.0),
 (5, 2, 'temp', 72),
 (5, 3, 'ozone', 12),
 (5, 3, 'solar.r', 149)]

Step 5: convert RDD to data frame to better display the result.

**Note that the 4th column has both integer strings (e.g., 41, 190) and float strings (7.4), we can't simply use `toDF()` function to convert RDD to Data Frame. We need to set the schema of that field to `StringType` and then use `cast()` function.**

In [23]:
from pyspark.sql.types import *
schema = StructType([
    StructField('month', IntegerType(), True),
    StructField('day', IntegerType(), True),
    StructField('variables', StringType(), True),
    StructField('value', StringType(), True)
])
aq_5 = aq_4.toDF(schema=schema)
aq_5.take(5)

[Row(month=5, day=1, variables='ozone', value='41'),
 Row(month=5, day=1, variables='solar.r', value='190'),
 Row(month=5, day=1, variables='wind', value='7.4'),
 Row(month=5, day=1, variables='temp', value='67'),
 Row(month=5, day=2, variables='ozone', value='36')]

Step 6: cast the 4th column to float column.

In [24]:
aq_6 = aq_5.select(aq_5.month, aq_5.day, aq_5.variables, aq_5.value.cast('double').alias('value'))
aq_6.toPandas().iloc[:10]

Unnamed: 0,month,day,variables,value
0,5,1,ozone,41.0
1,5,1,solar.r,190.0
2,5,1,wind,7.4
3,5,1,temp,67.0
4,5,2,ozone,36.0
5,5,2,solar.r,118.0
6,5,2,wind,8.0
7,5,2,temp,72.0
8,5,3,ozone,12.0
9,5,3,solar.r,149.0


## Aggregate functions
Two aggregate functions:

* `aggregate()`
* `aggregateByKey()`

### `aggregate(zeroValue, seqOp, combOp)`

* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.

The code below calculates the total sum of squares for **mpg** and **disp** in data set **mtcars**.

Step 1: get some data.

In [25]:
mtcars_df = spark.read.csv('data/mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])
mtcars_df.take(5)

[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

Step 2: calculate averages of mgp and disp

In [26]:
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print('mpg mean = ', mpg_mean, '; ' 
      'disp mean = ', disp_mean)

mpg mean =  20.090625000000003 ; disp mean =  230.721875


Step 3: build **zeroValue, seqOp** and **combOp**

We are calculating two TSS. We create a tuple to store two values.

In [27]:
zeroValue = (0, 0) 

The **z** below refers to `zeroValue`. Its values get updated after every run. The **x** refers to an element in an RDD partition. In this case, both **z** and **x** have two values.

In [28]:
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)

The `combOp` function simply aggrate all `zeroValues` into one. 

In [29]:
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )

Implement `aggregate()` function.

In [30]:
mtcars_df.rdd.aggregate(zeroValue, seqOp, combOp)

(1126.0471874999998, 476184.7946875)

The results above matches with the results from R:
><code>
sum((mtcars$mpg - mean(mtcars$mpg))^2) 
[1] 1126.047 
sum((mtcars$disp - mean(mtcars$disp))^2) 
[1] 476184.8 
></code>

## `aggregateByKey(zeroValue, seqOp, combOp)`

This function does similar things as aggregate(). The aggregate() aggregate all results to the very end, but aggregateByKey() merge results by key.

### Import data

In [31]:
iris_rdd = sc.textFile('data/iris.csv', use_unicode=True)
iris_rdd.take(2)

['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

### Transform data to a tuple RDD

In [32]:
iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\
    filter(lambda x: x[0] != 'sepal_length').\
    map(lambda x: (x[-1], [*map(float, x[:-1])]))
iris_rdd_2.take(5)

[('setosa', [5.1, 3.5, 1.4, 0.2]),
 ('setosa', [4.9, 3.0, 1.4, 0.2]),
 ('setosa', [4.7, 3.2, 1.3, 0.2]),
 ('setosa', [4.6, 3.1, 1.5, 0.2]),
 ('setosa', [5.0, 3.6, 1.4, 0.2])]

### Define initial values, seqOp and combOp

In [33]:
zero_value = (0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

### Implement `aggregateByKey()`

In [34]:
iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()

[('versicolor', (1774.8600000000001, 388.47)),
 ('setosa', (1259.0899999999997, 591.2500000000002)),
 ('virginica', (2189.9000000000005, 447.33))]

### Results from R

><code>
ddply(iris, .(Species), summarise, 
      sst_sepal_length=sum((Sepal.Length)^2),
      sst_sepal_width=sum((Sepal.Width)^2))
></code>


><code>
     Species sst_sepal_length sst_sepal_width
1     setosa          1259.09          594.60
2 versicolor          1774.86          388.47
3  virginica          2189.90          447.33
></code>
