In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Local RESTWS ML").master("local[*]").config("spark.ui.port", "11112").getOrCreate()
sc=spark.sparkContext

#  <font color='red'>OR</font>

In [2]:
from pyspark.sql import SparkSession
import pyspark

number_cores = 8
memory_gb = 24
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
        .set("spark.ui.port", "11112")
)
sc = pyspark.SparkContext(conf=conf)

In [3]:
spark

# Ch.4

## Basic actions and transformations

In [33]:
pythonList = [2.3,3.4,4.3,2.4,2.3,4.0]
pythonList

[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]

In [34]:
#parallelization
parPythonData = sc.parallelize(pythonList,2) #the second argument indicates the number of distributed chunks of data we want

* actions

In [35]:
#get all the data on the driver
parPythonData.collect()

[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]

In [37]:
2.3 in parPythonData.collect()

True

In [13]:
#get the first element
parPythonData.first()

2.3

In [14]:
#get the first two elements
parPythonData.take(2)

[2.3, 3.4]

In [15]:
#Getting the Number of Partitions in the RDD
parPythonData.getNumPartitions()

2

In [35]:
spark.stop()

* transformations

In [4]:
#fahrenheit to celcius
tempData = [59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]
parTempData = sc.parallelize(tempData,2)
parTempData.collect()

[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]

In [5]:
def FahrenheitToCentigrade(temperature):
    centigrade = (temperature-32)*5/9
    return centigrade

In [6]:
parCentigradeData = parTempData.map(FahrenheitToCentigrade)

In [7]:
parCentigradeData.collect()

[15.0, 14.000000000000002, 12.0, 13.0, 10.999999999999998, 12.0, 13.0]

In [8]:
#Filtering 
#Temperatures Greater than 13 C

#define a predicate
def tempMoreThanThirteen(temperature):
    return temperature >=13

filteredTemprature = parCentigradeData.filter(tempMoreThanThirteen)
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

In [9]:
#lambda function instead of predicate
filteredTemprature = parCentigradeData.filter(lambda x : x>=13)
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

## Basic Data Manipulation (average, ordering, filtering) (by line)

In [10]:
# read in Student Grades

studentMarksData = [["si1","year1",62.08,62.4],
                    ["si1","year2",75.94,76.75],
                    ["si2","year1",68.26,72.95],
                    ["si2","year2",85.49,75.8],
                    ["si3","year1",75.08,79.84],
                    ["si3","year2",54.98,87.72],
                    ["si4","year1",50.03,66.85],
                    ["si4","year2",71.26,69.77],
                    ["si5","year1",52.74,76.27],
                    ["si5","year2",50.39,68.58],
                    ["si6","year1",74.86,60.8],
                    ["si6","year2",58.29,62.38],
                    ["si7","year1",63.95,74.51],
                    ["si7","year2",66.69,56.92]]


In [11]:
studentMarksDataRDD = sc.parallelize(studentMarksData,4)

As we know, the collect() function takes the whole RDD to the driver. If the RDD
size is very large, the driver may face a memory issue. In order to fetch k first elements of
an RDD, we can use the take() function with n as input to take(). As an example, in the
following line of code, we are fetching two elements of our RDD.

In [12]:
studentMarksDataRDD.take(2)

[['si1', 'year1', 62.08, 62.4], ['si1', 'year2', 75.94, 76.75]]

In [None]:
# Average grade over two semesters by student by year
studentMarksMean = studentMarksDataRDD.map(lambda x : [x[0],x[1],(x[2]+x[3])/2])
studentMarksMean.take(2)

In [None]:
# top 3 students by 2nd year average
secondYearMarks = studentMarksMean.filter(lambda x : x[1]=='year2')
#secondYearMarks = studentMarksMean.filter(lambda x : "year2" in x)
secondYearMarks.take(3)

In [None]:
#we can now either sort the averages and keep the first 3 entries:
sortedMarksData = secondYearMarks.sortBy(keyfunc = lambda x : -x[2])
sortedMarksData.take(3)

In [None]:
#In order to get top-three data, we are sorting the whole list. We can optimize this by using the takeOrdered() function
#This function takes two arguments: the number of elements we require, and key, which uses a lambda function to determine how to take the data out.

topThreeStudents = secondYearMarks.takeOrdered(num=3, key = lambda x :-x[2])
topThreeStudents

# !!! In order to print the result, we are not using the collect() function to get the data.
#Remember that transformation creates another RDD, so we require the collect() function
#to collect data. But an action will directly fetch the data to the driver, and collect() is not
#required. So you can conclude that the takeOrdered() function is an action.

In [None]:
#Finding the Bottom Three Students
bottomThreeStudents = secondYearMarks.takeOrdered(num=3, key = lambda x : x[2])
bottomThreeStudents

In [None]:
# students with more than 80% average in 2nd year
moreThan80Marks = secondYearMarks.filter(lambda x : x[2] > 80)
moreThan80Marks.collect()

## Run Set Operations

In [None]:
#list of research projects each year
data2001 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
data2002 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
data2003 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']

In [None]:
parData2001 = sc.parallelize(data2001,2)
parData2002 = sc.parallelize(data2002,2)
parData2003 = sc.parallelize(data2003,2)

In [None]:
#get all unique projects
unionOf20012002 = parData2001.union(parData2002)
allResearchs = unionOf20012002.union(parData2003)
allUniqueResearchs = allResearchs.distinct()
allUniqueResearchs.collect()
allResearchs.distinct().count()

In [None]:
#in one go:
parData2001.union(parData2002).union(parData2003).distinct().count()

In [None]:
#Projects Completed the First Year
firstYearCompletion = parData2001.subtract(parData2002)
firstYearCompletion.collect()

In [None]:
#Projects Completed in the First Two Years
parData2001.union(parData2002).subtract(parData2003).distinct().collect()

In [None]:
#Projects Started in 2001 and Continued Through 2003
parData2001.intersection(parData2002).intersection(parData2003).collect()

## Summary Statistics

In [None]:
airVelocityKMPH = [12,13,15,12,11,12,11]
parVelocityKMPH = sc.parallelize(airVelocityKMPH,2)

In [None]:
parVelocityKMPH.count()

In [None]:
parVelocityKMPH.sum()

In [None]:
parVelocityKMPH.mean()

In [None]:
parVelocityKMPH.variance()

In [None]:
parVelocityKMPH.sampleVariance()

In [None]:
parVelocityKMPH.stdev()

In [None]:
parVelocityKMPH.sampleStdev()

In [None]:
type(parVelocityKMPH.stats())

In [None]:
parVelocityKMPH.stats() #all above

In [None]:
parVelocityKMPH.stats().asDict() #dictionary

In [None]:
parVelocityKMPH.stats().mean() #to pick a specific metric

# Ch.5

## Create a Paired RDD

In [13]:
#Create an RDD with Single Elements
pythonList = ['b' , 'd', 'm', 't', 'e', 'u']
RDD1 = sc.parallelize(pythonList,2)
RDD1.collect()

['b', 'd', 'm', 't', 'e', 'u']

In [14]:
#check for consonants
def vowelCheckFunction(data) :
    if data in ['a','e','i','o','u']:
        return 1
    else :
        return 0

In [15]:
#check
vowelCheckFunction("a")

1

In [16]:
RDD1.map(vowelCheckFunction).collect()

[0, 0, 0, 0, 1, 1]

In [17]:
#paired RDD
RDD2 = RDD1.map(lambda data : (data, vowelCheckFunction(data)))
RDD2.collect()

[('b', 0), ('d', 0), ('m', 0), ('t', 0), ('e', 1), ('u', 1)]

In [18]:
a=[1,2,3]
b=[5,6,7]
(a,b)
type((a,b))

tuple

In [19]:
#Fetch Keys from a Paired RDD
RDD2Keys = RDD2.keys()
RDD2Keys.collect()

['b', 'd', 'm', 't', 'e', 'u']

In [None]:
#Fetch Values from a Paired RDD
RDD2Values = RDD2.values()
RDD2Values.collect()

## Aggregate Data

In [None]:
filDataSingle = [['filamentA','100W',605],
                 ['filamentB','100W',683],
                 ['filamentB','100W',691],
                 ['filamentB','200W',561],
                 ['filamentA','200W',530],
                 ['filamentA','100W',619],
                 ['filamentB','100W',686],
                 ['filamentB','200W',600],
                 ['filamentB','100W',696],
                 ['filamentA','200W',579],
                 ['filamentA','200W',520],
                 ['filamentA','100W',622],
                 ['filamentA','100W',668],
                 ['filamentB','200W',569],
                 ['filamentB','200W',555],
                 ['filamentA','200W',541]]

In [None]:
filDataSingleRDD = sc.parallelize(filDataSingle,2)

In [None]:
#to calculate the mean lifetime of bulbs based on their filament type, start by creating a paired RDD:
filDataPairedRDD1 = filDataSingleRDD.map(lambda x : (x[0], x[2]))
filDataPairedRDD1.take(4)

#  <font color='red'>break</font>

# Ch.8

# Dataframes

## Create / Basic Calculations

In [20]:
# nested list
filamentData = [['filamentA','100W',605],
                ['filamentB','100W',683],
                ['filamentB','100W',691],
                ['filamentB','200W',561],
                ['filamentA','200W',530],
                ['filamentA','100W',619],
                ['filamentB','100W',686],
                ['filamentB','200W',600],
                ['filamentB','100W',696],
                ['filamentA','200W',579],
                ['filamentA','200W',520],
                ['filamentA','100W',622],
                ['filamentA','100W',668],
                ['filamentB','200W',569],
                ['filamentB','200W',555],
                ['filamentA','200W',541]]

# RDD
filamentDataRDD = sc.parallelize(filamentData, 4)

In [21]:
# create schema

# define columns
from pyspark.sql.types import *

FilamentTypeColumn = StructField("FilamentType",StringType(),True)
BulbPowerColumn = StructField("BulbPower",StringType(),True)
LifeInHoursColumn = StructField("LifeInHours",StringType(),True)

In [22]:
FilamentDataFrameSchema = StructType([FilamentTypeColumn, BulbPowerColumn, LifeInHoursColumn])
FilamentDataFrameSchema

StructType(List(StructField(FilamentType,StringType,true),StructField(BulbPower,StringType,true),StructField(LifeInHours,StringType,true)))

In [23]:
# Create an RDD of rows
from pyspark.sql import Row
filamentRDDofROWs = filamentDataRDD.map(lambda x:Row(str(x[0]), str(x[1]), str(x[2])))
filamentRDDofROWs.take(4)

[<Row('filamentA', '100W', '605')>,
 <Row('filamentB', '100W', '683')>,
 <Row('filamentB', '100W', '691')>,
 <Row('filamentB', '200W', '561')>]

In [24]:
# Create the DataFrame from schema and RDD
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
filamentDataFrameRaw = sqlContext.createDataFrame(filamentRDDofROWs, FilamentDataFrameSchema)
filamentDataFrameRaw.show(4)

+------------+---------+-----------+
|FilamentType|BulbPower|LifeInHours|
+------------+---------+-----------+
|   filamentA|     100W|        605|
|   filamentB|     100W|        683|
|   filamentB|     100W|        691|
|   filamentB|     200W|        561|
+------------+---------+-----------+
only showing top 4 rows



In [55]:
filamentDataFrame.count()

16

In [48]:
filamentDataFrameRaw.printSchema()

root
 |-- FilamentType: string (nullable = true)
 |-- BulbPower: string (nullable = true)
 |-- LifeInHours: string (nullable = true)



In [49]:
# Change the Data Type of a Column

# The withColumn() function returns a DataFrame by adding a new column to it. But if that column is already in the DataFrame, 
# the withColumn() function will replace the existing column:
filamentDataFrame = filamentDataFrameRaw.withColumn('LifeInHours',filamentDataFrameRaw.LifeInHours.cast(FloatType()))
filamentDataFrame.printSchema()

root
 |-- FilamentType: string (nullable = true)
 |-- BulbPower: string (nullable = true)
 |-- LifeInHours: float (nullable = true)



In [43]:
# column names
filamentDataFrame.columns

['FilamentType', 'BulbPower', 'LifeInHours']

In [57]:
# filtering
wattlist=['100W','200W']
filamentDataFrame100Watt = filamentDataFrame.filter(filamentDataFrame.BulbPower.isin(wattlist))
filamentDataFrame100Watt.show()

+------------+---------+-----------+
|FilamentType|BulbPower|LifeInHours|
+------------+---------+-----------+
|   filamentA|     100W|      605.0|
|   filamentB|     100W|      683.0|
|   filamentB|     100W|      691.0|
|   filamentB|     200W|      561.0|
|   filamentA|     200W|      530.0|
|   filamentA|     100W|      619.0|
|   filamentB|     100W|      686.0|
|   filamentB|     200W|      600.0|
|   filamentB|     100W|      696.0|
|   filamentA|     200W|      579.0|
|   filamentA|     200W|      520.0|
|   filamentA|     100W|      622.0|
|   filamentA|     100W|      668.0|
|   filamentB|     200W|      569.0|
|   filamentB|     200W|      555.0|
|   filamentA|     200W|      541.0|
+------------+---------+-----------+



In [28]:
# Selecting (filtering) Data
filamentData100WGreater650 = filamentDataFrame.filter((filamentDataFrame.BulbPower == '100W') & (filamentDataFrame.LifeInHours > 650.0))
filamentData100WGreater650.show()

+------------+---------+-----------+
|FilamentType|BulbPower|LifeInHours|
+------------+---------+-----------+
|   filamentB|     100W|      683.0|
|   filamentB|     100W|      691.0|
|   filamentB|     100W|      686.0|
|   filamentB|     100W|      696.0|
|   filamentA|     100W|      668.0|
+------------+---------+-----------+



## Exploratory Data Analysis

In [None]:
from pyspark.sql.types import *
FilamentTypeColumn = StructField("FilamentType",StringType(),True)
BulbPowerColumn = StructField("BulbPower",StringType(),True)
LifeInHoursColumn = StructField("LifeInHours",DoubleType(),True)

In [None]:
FilamentDataFrameSchema = StructType([FilamentTypeColumn, BulbPowerColumn, LifeInHoursColumn])

In [None]:
filamentDataFrame = spark.read.csv('filamentData.csv',header=True, sep =',', schema = FilamentDataFrameSchema)
filamentDataFrame.show(10)

In [None]:
filamentDataFrame.printSchema()

In [50]:
filamentDataFrame.select("LifeInHours").describe().show()

+-------+------------------+
|summary|       LifeInHours|
+-------+------------------+
|  count|                16|
|   mean|          607.8125|
| stddev|61.116521225170096|
|    min|             520.0|
|    max|             696.0|
+-------+------------------+



In [51]:
# frequency of Distinct Values in the FilamentType Categorical Column
filamentDataFrame.filter(filamentDataFrame.FilamentType == 'filamentA').count()

8

In [None]:
# frequency of distinct values in combination of FilamentType and Bulbpower
filamentDataFrame.filter((filamentDataFrame.FilamentType == 'filamentB') & (filamentDataFrame.BulbPower == '100W')).count()

## Aggregation Operations

In [25]:
censusDataFrame = spark.read.csv('adultData.csv',header=True, inferSchema = True)
censusDataFrame.show(5)

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education-num|    marital-status|       occupation| relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-i

In [26]:
censusDataFrame.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [27]:
# no. of records
censusDataFrame.count()

32561

In [28]:
censusDataFrame.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

In [29]:
# we've seen:
censusDataFrame.filter((censusDataFrame.income=='>50K')).count()
censusDataFrame.filter((censusDataFrame.income=='<=50K')).count()

24720

In [30]:
# now we try:
groupedByIncome = censusDataFrame.groupBy('income').count()
groupedByIncome.show()

+------+-----+
|income|count|
+------+-----+
| <=50K|24720|
|  >50K| 7841|
+------+-----+



In [31]:
# summary stats
censusDataFrame.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             32561|
|   mean| 38.58164675532078|
| stddev|13.640432553581356|
|    min|                17|
|    max|                90|
+-------+------------------+



In [32]:
#mean age by sex
censusDataFrame.groupBy('sex').mean('age').show()

+------+-----------------+
|   sex|         avg(age)|
+------+-----------------+
|Female|36.85823043357163|
|  Male|39.43354749885268|
+------+-----------------+



In [33]:
# Finding Out Whether High Salaries are More Frequent for Males or Females
censusDataFrame.groupBy(['income', 'sex']).count().show()

+------+------+-----+
|income|   sex|count|
+------+------+-----+
|  >50K|  Male| 6662|
|  >50K|Female| 1179|
| <=50K|  Male|15128|
| <=50K|Female| 9592|
+------+------+-----+



In [34]:
# Finding the Highest-Paid Job
groupedByOccupationIncome = censusDataFrame.groupBy(['occupation','income'])
groupedByOccupationIncome.count().sort(['income','count'], ascending= 0).show(5)

+---------------+------+-----+
|     occupation|income|count|
+---------------+------+-----+
|Exec-managerial|  >50K| 1968|
| Prof-specialty|  >50K| 1859|
|          Sales|  >50K|  983|
|   Craft-repair|  >50K|  929|
|   Adm-clerical|  >50K|  507|
+---------------+------+-----+
only showing top 5 rows



***
# <br> Backup

In [None]:
#from pyspark.sql import SparkSession
#
#spark=SparkSession.builder.master("local").appName("Test Spark").getOrCreate()
#
#sc=spark.sparkContext

In [10]:
import pyspark.sql.functions as F

In [11]:
key = (F.col('id') % 3).alias('key')

In [12]:
value = (F.randn(42) + key*10).alias('value')

In [13]:
df = spark.range(0, 1000, 1, 1).select(key, value)

In [17]:
df.show()

+---+--------------------+
|key|               value|
+---+--------------------+
|  0|   2.384479054241165|
|  1|  10.192093404129352|
|  2|  20.733733653328656|
|  0| -0.5224480195716871|
|  1|  12.060084179317831|
|  2|   20.20963383826634|
|  0|  -0.548526047771831|
|  1|  11.907968332171668|
|  2|   19.58294134171013|
|  0|  1.0872260173387922|
|  1|  10.952245128862655|
|  2|  20.851099321073143|
|  0|-0.14118066384892058|
|  1|  11.747713204892616|
|  2|  20.101499268535783|
|  0|  0.9645306446729486|
|  1|  11.099625011136304|
|  2|  19.121528921881442|
|  0| -0.6581329650674823|
|  1|   9.473145943186166|
+---+--------------------+
only showing top 20 rows



In [15]:
df.select(

    F.percentile_approx("value", [0.25, 0.5, 0.75], 1000000).alias("quantiles")

).printSchema()

AttributeError: module 'pyspark.sql.functions' has no attribute 'magic_percentile'