In [1]:
import findspark
import os
import configparser
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame
import pymongo

### Configure spark context

In [3]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

### create RDD's in two methods sc.parallelize and sc.textFile

In [4]:
newRDD = sc.parallelize([1,2,3,4,5])

In [5]:
newRDD.count()

5

In [6]:
newRDD1 = sc.textFile("dataset/auto-data.csv")

In [7]:
newRDD1.count()

198

### Saving data to the local file

In [11]:
autoDataFile = open("output/auto-data-saved.csv", "w")

In [12]:
autoDataFile.write("\n".join(newRDD1.collect()))

11350

In [13]:
autoDataFile.close()

### Transformations

### MAP

In [9]:
# replacing every , with a tab space
tsvData = newRDD1.map(lambda x: x.replace(",","\t"))

In [13]:
tsvData.take(5)

['MAKE\tFUELTYPE\tASPIRE\tDOORS\tBODY\tDRIVE\tCYLINDERS\tHP\tRPM\tMPG-CITY\tMPG-HWY\tPRICE',
 'subaru\tgas\tstd\ttwo\thatchback\tfwd\tfour\t69\t4900\t31\t36\t5118',
 'chevrolet\tgas\tstd\ttwo\thatchback\tfwd\tthree\t48\t5100\t47\t53\t5151',
 'mazda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5000\t30\t31\t5195',
 'toyota\tgas\tstd\ttwo\thatchback\tfwd\tfour\t62\t4800\t35\t39\t5348']

### FILTER

In [14]:
# filtering the data which contains only subaru cars
subData = newRDD1.filter(lambda x: "subaru" in x)

In [21]:
subData.take(10)

['subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'subaru,gas,std,two,hatchback,fwd,four,73,4400,26,31,7053',
 'subaru,gas,std,four,sedan,fwd,four,82,4800,32,37,7126',
 'subaru,gas,std,four,wagon,fwd,four,82,4800,28,32,7463',
 'subaru,gas,std,two,hatchback,4wd,four,73,4400,26,31,7603',
 'subaru,gas,std,four,sedan,fwd,four,82,4400,28,33,7775',
 'subaru,gas,std,four,wagon,4wd,four,82,4800,23,29,8013',
 'subaru,gas,std,four,sedan,4wd,four,82,4800,24,25,9233',
 'subaru,gas,std,four,sedan,fwd,four,94,5200,26,32,9960',
 'subaru,gas,std,four,wagon,fwd,four,94,5200,25,31,10198']

### flatMap

In [18]:
words = subData.flatMap(lambda line: line.split(","))
words.count()


144

In [24]:
words.take(10)

['subaru', 'gas', 'std', 'two', 'hatchback', 'fwd', 'four', '69', '4900', '31']

### distinct

In [30]:
for numData in newRDD.distinct().collect():
    print(numData)

1
2
3
4
5


### Set operations
#### Union and Intersection

In [22]:
words1 = sc.parallelize(["hello","war","peace","world"])
words2 = sc.parallelize(["war","peace","universe"])

In [32]:
for unions in words1.union(words2).distinct().collect():
    print(unions)

peace
world
universe
hello
war


In [33]:
for intersects in words1.intersection(words2).collect():
    print(intersects)

peace
war


## Function

In [25]:
def cleanseRDD(autoStr):
    if isinstance(autoStr, int):
        return autoStr
    attList = autoStr.split(",")
    if attList[3] == "two":
        attList[3] = "2"
    else:
        attList[3] = "4"
    
    attList[5] = attList[5].upper()
    return ",".join(attList)

In [26]:
cleansedData = newRDD1.map(cleanseRDD)

In [27]:
cleansedData[""]

['MAKE,FUELTYPE,ASPIRE,4,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,2,hatchback,FWD,four,69,4900,31,36,5118',
 'chevrolet,gas,std,2,hatchback,FWD,three,48,5100,47,53,5151',
 'mazda,gas,std,2,hatchback,FWD,four,68,5000,30,31,5195',
 'toyota,gas,std,2,hatchback,FWD,four,62,4800,35,39,5348']

### Actions

In [29]:
a = newRDD.collect()

In [30]:
a[3] ="ahrish"

In [31]:
a

[1, 2, 3, 'ahrish', 5]

In [32]:
newRDD.reduce(lambda x,y : x+y)

15

In [33]:
#returning the shortest line in dataset
newRDD1.reduce(lambda x,y: x if len(x) < len(y) else y)

'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

In [42]:
def getMPG(autoStr):
    if isinstance(autoStr, int):
        return autoStr
    attList = autoStr.split(",")
    if attList[9].isdigit():
        return int(attList[9])
    else:
        return 0

In [44]:
newRDD1.reduce(lambda x,y : getMPG(x) + getMPG(y)) / (newRDD1.count()-1)

25.15228426395939

### Pair RDD: Type of RDDs that can store key value pairs.
- Most of the or all of the Tansformations are suppoerted by Pair RDDs
- Actions supported are countBykey, groupByKey, reduceByKey, aggregateByKey

In [45]:
cylData = newRDD1.map(lambda x: (x.split(",")[0], x.split(",")[7]))

In [46]:
cylData.take(5)

[('MAKE', 'HP'),
 ('subaru', '69'),
 ('chevrolet', '48'),
 ('mazda', '68'),
 ('toyota', '62')]

In [None]:
cylData.keys().collect()

### above output contains the Header in the results. To remove that run the following code
#### ('MAKE', "HP")

In [52]:
# stores the first line which is header
header = cylData.first()

In [53]:
#now filter that row
cylHPData = cylData.filter(lambda line: line != header)

### find average HP(horse power) by Brand

In [54]:
addOne = cylHPData.mapValues(lambda x: (x,1))
addOne.collect()

[('subaru', ('69', 1)),
 ('chevrolet', ('48', 1)),
 ('mazda', ('68', 1)),
 ('toyota', ('62', 1)),
 ('mitsubishi', ('68', 1)),
 ('honda', ('60', 1)),
 ('nissan', ('69', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('chevrolet', ('70', 1)),
 ('toyota', ('62', 1)),
 ('dodge', ('68', 1)),
 ('honda', ('58', 1)),
 ('toyota', ('62', 1)),
 ('honda', ('76', 1)),
 ('chevrolet', ('70', 1)),
 ('nissan', ('69', 1)),
 ('mitsubishi', ('68', 1)),
 ('dodge', ('68', 1)),
 ('plymouth', ('68', 1)),
 ('mazda', ('68', 1)),
 ('isuzu', ('78', 1)),
 ('mazda', ('68', 1)),
 ('nissan', ('69', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('62', 1)),
 ('toyota', ('70', 1)),
 ('mitsubishi', ('88', 1)),
 ('subaru', ('73', 1)),
 ('nissan', ('55', 1)),
 ('subaru', ('82', 1)),
 ('honda', ('76', 1)),
 ('toyota', ('70', 1)),
 ('honda', ('76', 1)),
 ('honda', ('76', 1)),
 ('nissan', ('69', 1)),
 ('nissan', ('69', 1)),
 

In [55]:
brandValues = addOne.reduceByKey(lambda x, y: (int(x[0]) + int(y[0]), x[1] + y[1]))

In [56]:
brandValues.collect()

[('subaru', (1035, 12)),
 ('chevrolet', (188, 3)),
 ('mazda', (1390, 16)),
 ('toyota', (2969, 32)),
 ('mitsubishi', (1353, 13)),
 ('honda', (1043, 13)),
 ('nissan', (1846, 18)),
 ('dodge', (675, 8)),
 ('plymouth', (607, 7)),
 ('isuzu', (168, 2)),
 ('volkswagen', (973, 12)),
 ('saab', (760, 6)),
 ('peugot', (1098, 11)),
 ('volvo', (1408, 11)),
 ('alfa-romero', (376, 3)),
 ('audi', (687, 6)),
 ('bmw', (1111, 8)),
 ('mercury', ('175', 1)),
 ('porsche', (764, 4)),
 ('mercedes-benz', (1170, 8)),
 ('jaguar', (614, 3))]

### BroadCast variables and Accumulators

In [57]:
#initialize accumulators
sedanCount = sc.accumulator(0)
hatchbackCount = sc.accumulator(0)

In [58]:
# set broadcast variables
sedanText = sc.broadcast("sedan")
hatchText = sc.broadcast("hatchback")

In [60]:
def splitlines(line):
    global sedanCount
    global hatchbackCount
    
    if sedanText.value in line:
        sedanCount += 1
    
    if hatchText.value in line:
        hatchbackCount += 1
    
    return line.split(",")

In [61]:
splitData = newRDD1.map(splitlines)

In [62]:
splitData.count()

198

In [63]:
print(sedanCount, hatchbackCount)

92 67


### Partitions: By default the no. of cores 

In [65]:
newRDD1.getNumPartitions()

1

In [69]:
# specifiying number of partitions
collData = sc.parallelize([3,5,4,3,2], 4)


In [70]:
collData.cache()

ParallelCollectionRDD[59] at parallelize at PythonRDD.scala:195

In [71]:
collData.getNumPartitions()

4

In [34]:
sc.stop()

In [5]:
# spark = SparkSession \
#     .builder \
#     .appName("Python Spark SQL basic example") \
#     .config("spark.some.config.option", "some-value") \
#     .getOrCreate()

SyntaxError: invalid syntax (<ipython-input-7-939bbb97c795>, line 1)