In [7]:
import keras

# RDD 

Spark uses Resilient Distributed Datasets (RDD) to perform parallel processing across a cluster or computer processors.


1. What is RDD?
 - core abstraction of Spark
 - immutable collection of objects
 - distributed data across many nodes across the cluster => parallelization

2. Basics of RDD

- can be created by loading an external dataset
- can use the TextFile or parallelize mathods

collect() method - gives us all the values in the RDD

In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
pythonList = [2.3,3.4,4.3,2.4,2.3,4.0]

In [3]:
# creates an RDD
# this will split the list into 2 ("clusters"?) - we have 2 resources!
# however, we don't know which elements go in which RDD
parPythonData = sc.parallelize(pythonList,2)

In [4]:
# retrieve all the elements in the original list
parPythonData.collect()

[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]

In [5]:
type(parPythonData)

pyspark.rdd.RDD

In [8]:
# get the first element in the RDD - use this, NOT an index!
parPythonData.first()

2.3

In [9]:
# take the first two elements
parPythonData.take(2)

[2.3, 3.4]

In [10]:
# tell us the number of partitions in the RDD
parPythonData.getNumPartitions()

2

In [11]:
tempData = [59,57.2,53.6,55.4,51.8,53.6,55.4]

In [12]:
parTempData = sc.parallelize(tempData,2)

In [14]:
parTempData.collect()  # return the elements in order

[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]

In [15]:
def fahrenheitToCentigrade(temperature):
    '''Function for the elements in the RDD'''
    centigrade = (temperature-32)*5/9
    return centigrade

In [17]:
fahrenheitToCentigrade(59)  # test input

15.0

## Comparing map in Pyspark, vs. vanilla Python

Python - map is functional

```
# Python

map(function, sequence)

Other functions include
map(f, list),
reduce(f, list),
filter(f, list)
```

Pyspark - map is object-oriented, for the RDD class

```
# Pyspark 
new RDD = RDD.map(function)

Other functions include
RDD.map(f),
RDD.reduce(f),
RDD.filter(f)
```

In [21]:
parCentigradeData = parTempData.map(fahrenheitToCentigrade)

In [22]:
parCentigradeData.collect()

[15.0, 14.000000000000002, 12.0, 13.0, 10.999999999999998, 12.0, 13.0]

In [23]:
def tempMoreThanThirteen(temperature):
    return temperature >=13

In [24]:
# filter takes out all elements from the RDD that return False, when passed into the function used as the argument

filteredTemprature = parCentigradeData.filter(tempMoreThanThirteen)

In [25]:
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

In [26]:
# can also use lambda functions
filteredTemprature = parCentigradeData.filter(lambda x : x>=13)

In [20]:
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

## Work with text data

In [35]:
rdd = sc.textFile("for_pyspark.txt")  # for example coming from a YT clip => dist is being invoked implictly
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)  # gives the total number of chars in the whole file

In [36]:
lineLengths.collect()

[27, 14, 21, 28, 16, 0, 2, 50, 7]

In [37]:
totalLength 

165

In [41]:
# Making a histogram in Pyspark
words = lines.flatMap(lambda x: x.split(" "))
# this is a words RDD

In [42]:
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [43]:
result.collect()

[('"asdfg:Foef', 1),
 ('eF', 1),
 ('', 8),
 ('deflfjdlfdk', 1),
 ('efelfjelmfeefefm', 1),
 ('fkk', 1),
 ('kk', 2),
 ('fkjfkfk', 1),
 ('jeF', 1),
 ("w'fdpk", 1),
 ('m', 1),
 ('wfefljgme', 2),
 ('efelfjelmfe;fkefkefke;fkdmv', 1),
 ('ew', 1),
 ('fefkfkfkkfkfkfkfk', 1),
 ('k', 2),
 ('fk', 1),
 ('fkfk', 2),
 ('fdpk', 1)]

In [44]:
result

PythonRDD[33] at collect at <ipython-input-43-e43ab5090625>:1

In [45]:
words.collect()

['"asdfg:Foef',
 'eF',
 'jeF',
 '',
 "w'fdpk",
 '',
 'deflfjdlfdk',
 'm',
 '',
 'wfefljgme',
 '',
 'wfefljgme',
 '',
 'efelfjelmfe;fkefkefke;fkdmv',
 '',
 'efelfjelmfeefefm',
 '',
 'ew',
 'fefkfkfkkfkfkfkfk',
 '',
 'k',
 'fkk',
 'fk',
 'kk',
 'fkfk',
 'kk',
 'k',
 'fdpk',
 'fkfk',
 'fkjfkfk']

In [48]:
# another histogram!

lines.map(lambda x: (x, 1)).collect()

[('"asdfg:Foef eF jeF  w\'fdpk ', 1),
 ('deflfjdlfdk m ', 1),
 ('wfefljgme  wfefljgme ', 1),
 ('efelfjelmfe;fkefkefke;fkdmv ', 1),
 ('efelfjelmfeefefm', 1),
 ('', 1),
 ('ew', 1),
 ('fefkfkfkkfkfkfkfk  k fkk fk kk fkfk kk k fdpk fkfk', 1),
 ('fkjfkfk', 1)]

In [50]:
# Example 2
new_rdd = sc.textFile("for_pyspark.txt")
words = new_rdd.map(lambda x: x.split(" "))
words.collect()  # gives us 2D array, each array is a line of the individual word tokens

[['this', 'is', 'a', 'book'], ['this', 'book', 'is', 'about', 'DS']]

### flatMap vs. map

flatMap will condense map into a single list

In [54]:
new_rdd = sc.textFile("for_pyspark.txt")
flat_words = new_rdd.flatMap(lambda x: x.split(" "))
flat_words.collect()  # gives us 2D array, each array is a line of the individual word tokens

['this', 'is', 'a', 'book', 'this', 'book', 'is', 'about', 'DS']

In [52]:
# Note: reduce by Key only works when you use tuples in the map function two cells above!


## Map Running Time

In [2]:
parPythonData = sc.parallelize(range(1000))

In [3]:
a = parPythonData.map(lambda x: x*x)

In [4]:
b = a.collect()

b[0:100]

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961,
 1024,
 1089,
 1156,
 1225,
 1296,
 1369,
 1444,
 1521,
 1600,
 1681,
 1764,
 1849,
 1936,
 2025,
 2116,
 2209,
 2304,
 2401,
 2500,
 2601,
 2704,
 2809,
 2916,
 3025,
 3136,
 3249,
 3364,
 3481,
 3600,
 3721,
 3844,
 3969,
 4096,
 4225,
 4356,
 4489,
 4624,
 4761,
 4900,
 5041,
 5184,
 5329,
 5476,
 5625,
 5776,
 5929,
 6084,
 6241,
 6400,
 6561,
 6724,
 6889,
 7056,
 7225,
 7396,
 7569,
 7744,
 7921,
 8100,
 8281,
 8464,
 8649,
 8836,
 9025,
 9216,
 9409,
 9604,
 9801]