## Lab 13: Pyspark

Resilient distributed dataset (RDD)

- Dataset contains a collection of elements of any type
- Dataset can be partitioned and distributed across multiple nodes
- RDDs are immutable

Lazy evaluation

Spark will not load or transform data unless an action is performed

- Load file into RDD
- filter the RDD
- count the number of elements (load and filter happen now)

In [None]:
from pyspark import SparkContext
import random
sc = SparkContext(master="local")


In [None]:
data = sc.textFile('auto-data.csv')

In [None]:
#Returns the RDD as a list
data.collect() 

In [None]:
data.count()  #Number of lines in rdd

In [None]:
print(data.first()) #Prints first line
print()
print(data.take(5)) #Prints first 5 lines

## Transformations

- Perform operations on an RDD and create a new RDD
- Lazy evaluation
- Can be distributed across multiple nodes

### Map

newRDD = rdd.map(function)

- Result can be a different type

In [None]:
#Map and create a new RDD
tsvdata = data.map(lambda x: x.replace(',','\t'))
tsvdata.take(5)

### Filter

newRDD = rdd.filter(function)

- Filter an RDD to select elements that match a condition
- Result RDD smaller than original RDD
- Function should return true/false for each element

In [None]:
#Filter and create a new RDD
toyotadata = data.filter(lambda x: 'toyota' in x)
print(toyotadata.count())

### Set operations


In [None]:
words1 = sc.parallelize(['knees','weak','arms','heavy'])
words2 = sc.parallelize(['new','words','arms','knees'])

In [None]:
print(words1.union(words2).distinct().collect())

In [None]:
print(words1.intersection(words2).collect())

### Reduce

- Perform an operation across all elements of an RDD (sum, count, etc)
- Operation is a function with two inputs
- Function is called for every element in the RDD

rdd = [a,b,c,d,e] and function is f(x,y)

   - f(f(f(f(a,b),c),d),e)

In [None]:
newdata = sc.parallelize(range(10))
print(newdata.reduce(lambda x,y : x+y))

In [None]:
#Shortest line in RDD
line = data.reduce(lambda x,y: x if len(x) < len(y) else y)
print(line)

### Using functions

In [None]:
#Data Preprocessing
def preprocess(line):
    lis = line.split(',')
    
    #Convert doors to integer
    if lis[3] == 'two':
        lis[3] = '2'
    else:
        lis[3] = '4'
    
    #Convert drive to uppercase
    lis[5] = lis[5].upper()
    return ','.join(lis)

In [None]:
preprocessed_data = data.map(preprocess)
preprocessed_data.collect()

In [None]:
#Compute average miles per gallon (9th column)
def mpgavg(line):
    if isinstance(line,int):
        return line
    lis = line.split(',')
    
    if lis[9].isdigit():
        return int(lis[9])
    
    return 0 #Missing values

In [None]:
data.reduce(lambda x,y: mpgavg(x) + mpgavg(y)) / (data.count()-1)

### Shared variables

- __Broadcast variables__ are distributed to all workers, but are read-only. These variables can be used as lookup tables or stopword lists.
- __Accumulators__ are variables that workers can “add” to using associative operations and are typically used as counters.

In [None]:
#Initialize accumulator
sedans = sc.accumulator(0)
hatchbacks = sc.accumulator(0)

#Initialize broadcast variables
stext = sc.broadcast('sedan')
htext = sc.broadcast('hatchback')

In [None]:
def split_lines(line):
    
    global sedans
    global hatchbacks
    
    if stext.value in line:
        sedans += 1
    if htext.value in line:
        hatchbacks += 1
    
    return line.split(',')

In [None]:
split_data = data.map(split_lines).count()

In [None]:
print(sedans, hatchbacks)

### SparkSQL

- Library that supports sql like data and operations
- Dataframe - Collections of data opganized as rows and columns

Operations supported by dataframes

- filter - filter data based on a condition
- join - join two dataframes based on a column value
- groupby - group data grames by specific column values

etc

In [None]:
from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

In [None]:
emdf = sqlcontext.read.json('customerData.json')
emdf.show()

In [None]:
#SQL queries

emdf.select('name').show()

In [None]:
emdf.filter(emdf['age'] == 40).show()

In [None]:
emdf.groupBy('gender').count().show()