### Initialization Script

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
import os 

# getting the directory where Spark was installed
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/opt/spark'
 
# python variable to store the root path for later reference
SPARK_HOME = os.environ['SPARK_HOME']

# adding pyspark and py4j packages paths to python path env variable
sys.path.insert(0, os.path.join(SPARK_HOME, 'python'))
sys.path.insert(0, os.path.join(SPARK_HOME, 'python', 'lib'))
sys.path.insert(0, os.path.join(SPARK_HOME, 'python', 'lib', 'py4j-0.10.9-src.zip'))
sys.path.insert(0, os.path.join(SPARK_HOME, 'python', 'lib', 'pyspark.zip'))

from pyspark import SparkContext
from pyspark import SparkConf

# configure spark settings
conf = SparkConf()
conf.set("spark.executer.memory","1g")
conf.set("spark.cores.max",'2')

# give name to your spark application
conf.setAppName("FirstSparkApp")

# create a spark context object 
# note: Execute only once otherwise results in Context Errors
sc = SparkContext('local',conf=conf)

### Once the above script is executed you can view the Spark instance info here http://localhost:4040

### Reading a csv file for analysis using Spark

In [2]:
# specifying the file path to read
autodata = sc.textFile('/home/ateeb/Downloads/automobile.csv')

# cache the autodata as soon as first action is called
autodata.cache()

# count the number of lines in autodata
# cache is also performed after this call (lazy evaluation)
autodata.count()

202

In [3]:
# print first row of data
autodata.first()

'symboling,normalized-losses,make,fuel-type,aspiration,num-of-doors,body-style,drive-wheels,engine-location,wheel-base,length,width,height,curb-weight,engine-type,num-of-cylinders,engine-size,fuel-system,bore,stroke,compression-ratio,horsepower,peak-rpm,city-mpg,highway-mpg,price'

In [38]:
# returns first n rows of data
autodata.take(2)

['symboling,normalized-losses,make,fuel-type,aspiration,num-of-doors,body-style,drive-wheels,engine-location,wheel-base,length,width,height,curb-weight,engine-type,num-of-cylinders,engine-size,fuel-system,bore,stroke,compression-ratio,horsepower,peak-rpm,city-mpg,highway-mpg,price',
 '3,122,alfa-romero,gas,std,two,convertible,rwd,front,88.6,168.8,64.1,48.8,2548,dohc,four,130,mpfi,3.47,2.68,9.0,111,5000.0,21,27,13495.0']

In [36]:
# loop to operate on each row/line of autodate
for line in autodata.collect():
    print(line[0:20])
    break

symboling,normalized


In [6]:
# creating RDD from python collection
collData = sc.parallelize([3,5,4,7,4])
collData.cache()
collData.count()

5

### Transformations

In [7]:
tsvData = autodata.map(lambda x: x.replace(",","\t"))
tsvData.take(5)

['symboling\tnormalized-losses\tmake\tfuel-type\taspiration\tnum-of-doors\tbody-style\tdrive-wheels\tengine-location\twheel-base\tlength\twidth\theight\tcurb-weight\tengine-type\tnum-of-cylinders\tengine-size\tfuel-system\tbore\tstroke\tcompression-ratio\thorsepower\tpeak-rpm\tcity-mpg\thighway-mpg\tprice',
 '3\t122\talfa-romero\tgas\tstd\ttwo\tconvertible\trwd\tfront\t88.6\t168.8\t64.1\t48.8\t2548\tdohc\tfour\t130\tmpfi\t3.47\t2.68\t9.0\t111\t5000.0\t21\t27\t13495.0',
 '3\t122\talfa-romero\tgas\tstd\ttwo\tconvertible\trwd\tfront\t88.6\t168.8\t64.1\t48.8\t2548\tdohc\tfour\t130\tmpfi\t3.47\t2.68\t9.0\t111\t5000.0\t21\t27\t16500.0',
 '1\t122\talfa-romero\tgas\tstd\ttwo\thatchback\trwd\tfront\t94.5\t171.2\t65.5\t52.4\t2823\tohcv\tsix\t152\tmpfi\t2.68\t3.47\t9.0\t154\t5000.0\t19\t26\t16500.0',
 '2\t164\taudi\tgas\tstd\tfour\tsedan\tfwd\tfront\t99.8\t176.6\t66.2\t54.3\t2337\tohc\tfour\t109\tmpfi\t3.19\t3.4\t10.0\t102\t5500.0\t24\t30\t13950.0']

In [8]:
toyotaData = autodata.filter(lambda x: 'toyota' in x)
toyotaData.take(5)

['1,87,toyota,gas,std,two,hatchback,fwd,front,95.7,158.7,63.6,54.5,1985,ohc,four,92,2bbl,3.05,3.03,9.0,62,4800.0,35,39,5348.0',
 '1,87,toyota,gas,std,two,hatchback,fwd,front,95.7,158.7,63.6,54.5,2040,ohc,four,92,2bbl,3.05,3.03,9.0,62,4800.0,31,38,6338.0',
 '1,74,toyota,gas,std,four,hatchback,fwd,front,95.7,158.7,63.6,54.5,2015,ohc,four,92,2bbl,3.05,3.03,9.0,62,4800.0,31,38,6488.0',
 '0,77,toyota,gas,std,four,wagon,fwd,front,95.7,169.7,63.6,59.1,2280,ohc,four,92,2bbl,3.05,3.03,9.0,62,4800.0,31,37,6918.0',
 '0,81,toyota,gas,std,four,wagon,4wd,front,95.7,169.7,63.6,59.1,2290,ohc,four,92,2bbl,3.05,3.03,9.0,62,4800.0,27,32,7898.0']

In [9]:
toyotaData.count()

32

In [10]:
words = autodata.flatMap(lambda x: x.split(","))
words.take(4)

['symboling', 'normalized-losses', 'make', 'fuel-type']

In [11]:
word1 =sc.parallelize(['hello','war','peace','world'])
word2 =sc.parallelize(['war','peace','universe'])

for word in word1.union(word2).distinct().collect():
    print(word)

peace
world
universe
hello
war


In [12]:
for word in word1.intersection(word2).collect():
    print(word)

peace
war


In [13]:
# using reduce operation to find sum operating across elements
collData.reduce(lambda x,y: x+y)

23

In [14]:
# finding the shortest line in the auto data
# this clearly require cross element comparision
# so we perform a reduce operation
autodata.reduce(lambda x,y: x if len(x)<len(y) else y)

'0,78,honda,gas,std,four,wagon,fwd,front,96.5,157.1,63.9,58.3,2024,ohc,four,92,1bbl,2.92,3.41,9.2,76,6000.0,30,34,7295.0'

### Aggreagations

In [15]:
# here this agg perform same operation as reduce but when there are
# more the one partition of the rdd SeqOp can be performed in parallel
# across the partition and the CombOp would have been performed in sequential
# manner with no of sequential opp equal to no. of partitions.
seqOp = (lambda x,y: x+y)
combOp = (lambda x,y: x+y)

collData.aggregate((0),seqOp,combOp)

23

In [16]:
# performing multiplication and addition at the same time
seqOp = (lambda x,y: (x[0]+y,x[1]*y))
combOp = (lambda x,y: (x[0]+y[0],x[1]*y[1]))
collData.aggregate((0,1),seqOp,combOp)

(23, 1680)

### Spark Functions

In [17]:
def cleanseRdd(autoStr):
    if isinstance(autoStr,int):
        return autoStr
    attlist = autoStr.split(",")
    # convert no of doors from word to a numbers
    attlist[5] = '2' if attlist[5] == 'two' else '4'
    
    # convert drive wheel to upper
    attlist[7] = attlist[7].upper()
    
    return ','.join(attlist)

cleanedData = autodata.map(cleanseRdd)
cleanedData.take(2)

['symboling,normalized-losses,make,fuel-type,aspiration,4,body-style,DRIVE-WHEELS,engine-location,wheel-base,length,width,height,curb-weight,engine-type,num-of-cylinders,engine-size,fuel-system,bore,stroke,compression-ratio,horsepower,peak-rpm,city-mpg,highway-mpg,price',
 '3,122,alfa-romero,gas,std,2,convertible,RWD,front,88.6,168.8,64.1,48.8,2548,dohc,four,130,mpfi,3.47,2.68,9.0,111,5000.0,21,27,13495.0']

In [18]:
def getMPG(autoStr):
    if isinstance(autoStr, int):
        return autoStr
    
    attlist = autoStr.split(",")
    
    if attlist[23].isdigit():
        return int(attlist[23])
    else:
        return 0

autodata.reduce(lambda x,y : getMPG(x) + getMPG(y))/(autodata.count() -1)


25.17910447761194

### Dataset Columns
0) symboling
1) normalized-losses
2) make
3) fuel-type
4) aspiration
5) num-of-doors
6) body-style
7) drive-wheels
8) engine-location
9) wheel-base
10) length
11) width
12) height
13) curb-weight
14) engine-type
15) num-of-cylinders
16) engine-size
17) fuel-system
18) bore
19) stroke
20) compression-ratio
21) horsepower
22) peak-rpm
23) city-mpg
24) highway-mpg
25) price

### Working with (key,value) RDDs

making a pair RDD with auto brand and horspower

In [20]:
cylData = autodata.map(lambda x: (x.split(",")[2],x.split(",")[21]))
cylData.take(5)

[('make', 'horsepower'),
 ('alfa-romero', '111'),
 ('alfa-romero', '111'),
 ('alfa-romero', '154'),
 ('audi', '102')]

In [21]:
cylData.keys().take(5)

['make', 'alfa-romero', 'alfa-romero', 'alfa-romero', 'audi']

In [22]:
# Removing Header Rows
header = cylData.first()
cylHpData = cylData.filter(lambda line: line!=header)
brandValues  = cylHpData.mapValues(lambda x: (x,1))

# Adding count to each record and then reducin to find totals
brandValues  = cylHpData.mapValues(lambda x: (x,1)).reduceByKey(lambda x,y: (float(x[0]) + float(y[0]), x[1]+y[1]))
brandValues.take(5)

[('alfa-romero', (376.0, 3)),
 ('audi', (687.0, 6)),
 ('bmw', (1111.0, 8)),
 ('chevrolet', (188.0, 3)),
 ('dodge', (777.0, 9))]

In [23]:
# finding the average horsepower for a particular brand
brandValues.mapValues(lambda x: (int(x[0])/int(x[1]))).take(6)

[('alfa-romero', 125.33333333333333),
 ('audi', 114.5),
 ('bmw', 138.875),
 ('chevrolet', 62.666666666666664),
 ('dodge', 86.33333333333333),
 ('honda', 80.23076923076923)]

### Accumulator and Broadcast Variables

In [26]:
# initialaize Accumulator object
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)

# set the Broadcast Variable
sedanText = sc.broadcast("sedan")
hatchbackText = sc.broadcast("hatchback")

def splitLines(line):
    global sedanCount
    global hatchbackCount
    
    # we will use broadcast variable for comparision
    # and incrmenent the accumulator accordingly
    if sedanText.value in line:
        sedanCount += 1
    elif hatchbackText.value in line:
        hatchbackCount += 1
    # not directly related to the accumulation but shows
    # that we can also perform side computation
    return line.split(",")

# performing a map
splitData = autodata.map(splitLines)

# to execute the map (lazy evaluation)
splitData.count()
print ("No. of Sedans: ", sedanCount," No of hatchbacks: ", hatchbackCount)

No. of Sedans:  94  No of hatchbacks:  68


### Advanced Spark: Partitions

In [29]:
collData.getNumPartitions()

1

Creating RDD with two partitions

In [30]:
collData = sc.parallelize([3,5,4,7,4],2)
collData.cache()
collData.count()

5

In [32]:
collData.getNumPartitions()

2