# Distributed ML and Spark

## Doing Work in Parallel

- Spark parallelizes the work that it does, to the extent that it can. What this means is that multiple things are done at the same time, as opposed to doing one thing after another.


- There are two levels to how work is parallelized in spark:


- All of the executors work together at the same time.


- Within each executor, the data is divided into partitions that can be processed at the same time. Generally speaking, the number of partitions is equal to the number of available CPU cores on the executor.

## Transformations and Actions

- Spark dataframe manipulation can be broken down into two categories:


- transformations: A function that selects a subset of the data, transforms each value, changes the order of the records, or performs some sort of aggregation.


- actions: transformations that actually do something; something that necessitate that the specified transformations are applied. For example, counting the number of rows, or viewing the first 10 records.


- Often times, you will hear spark referred to as lazy. What this means is that we can specify many different transformations, but none of the transformations will be applied until we specify an action.

## Shuffling

- A shuffle occurs when a transformation requires looking at data that is in another partition, or another executor. Let's take a look at a few examples:


- Performing arithmetic on each number in a column does not require a shuffle as each number can be processed independently of the others.


- Sorting the dataframe by the numbers in a single column does require shuffling, as the overall order is determined by all of the data within all of the partitions.


- Selecting a subset of the data, for example, selecting only the rows where a condition matches, does not require a shuffle, as each row can be processed independently.


- Calculating the overall average for a numeric column does require shuffling, as the overall average depends on data from all the partitions.


- Shuffles get increasingly more expensive as the size of the data grows, and when a shuffle is performed is one of the largest considerations in optimizing spark code for performance.

In [1]:
import pandas as pd
import numpy as np
from pydataset import data

import pyspark
import pyspark.sql.functions as F


In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.range(3).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



# Create a pandas DataFrame

In [3]:
np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


# Convert pandas dataset to a Spark object or a lazy dataframe

In [4]:
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[n: bigint, group: string]

## Use .show() to see the first few columns of the Spark df

- .show() is the transformation

- spark dataframes are immutable

In [5]:
df.show(5)

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



In [6]:
df.select(df.n).show(5)

+---+
|  n|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In [7]:
n_incremented = df.n + 1

In [8]:
df.select(n_incremented).show(5)

+-------+
|(n + 1)|
+-------+
|      1|
|      2|
|      3|
|      4|
|      5|
+-------+
only showing top 5 rows



## .describe() with .show() to get info about spark df

In [9]:
df.describe()

DataFrame[summary: string, n: string, group: string]

In [10]:
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



In [11]:
df.describe().select('n').show()

+-----------------+
|                n|
+-----------------+
|               20|
|              9.5|
|5.916079783099616|
|                0|
|               19|
+-----------------+



In [12]:
# .describe() creates the summary column here

df.describe().select('n', 'summary').show()

+-----------------+-------+
|                n|summary|
+-----------------+-------+
|               20|  count|
|              9.5|   mean|
|5.916079783099616| stddev|
|                0|    min|
|               19|    max|
+-----------------+-------+



## mpg pydataset and spark

In [13]:
mpg = spark.createDataFrame(data('mpg'))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



## Create a spark Column object

In [14]:
mpg.hwy

Column<b'hwy'>

## .select() similar to a SQL SELECT

In [15]:
mpg.select(mpg.hwy, mpg.cty, mpg.model)

DataFrame[hwy: bigint, cty: bigint, model: string]

In [16]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(10)

+---+---+----------+
|hwy|cty|     model|
+---+---+----------+
| 29| 18|        a4|
| 29| 21|        a4|
| 31| 20|        a4|
| 30| 21|        a4|
| 26| 16|        a4|
| 26| 18|        a4|
| 27| 18|        a4|
| 26| 18|a4 quattro|
| 25| 16|a4 quattro|
| 28| 20|a4 quattro|
+---+---+----------+
only showing top 10 rows



In [17]:
mpg.select(mpg.model, mpg.manufacturer, mpg.hwy).show()

+------------------+------------+---+
|             model|manufacturer|hwy|
+------------------+------------+---+
|                a4|        audi| 29|
|                a4|        audi| 29|
|                a4|        audi| 31|
|                a4|        audi| 30|
|                a4|        audi| 26|
|                a4|        audi| 26|
|                a4|        audi| 27|
|        a4 quattro|        audi| 26|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 28|
|        a4 quattro|        audi| 27|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a6 quattro|        audi| 24|
|        a6 quattro|        audi| 25|
|        a6 quattro|        audi| 23|
|c1500 suburban 2wd|   chevrolet| 20|
|c1500 suburban 2wd|   chevrolet| 15|
+------------------+------------+---+
only showing top 20 rows



In [18]:
# you can also use the strings for cols in .select()

mpg.select('model', 'manufacturer', 'hwy').show()

+------------------+------------+---+
|             model|manufacturer|hwy|
+------------------+------------+---+
|                a4|        audi| 29|
|                a4|        audi| 29|
|                a4|        audi| 31|
|                a4|        audi| 30|
|                a4|        audi| 26|
|                a4|        audi| 26|
|                a4|        audi| 27|
|        a4 quattro|        audi| 26|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 28|
|        a4 quattro|        audi| 27|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a6 quattro|        audi| 24|
|        a6 quattro|        audi| 25|
|        a6 quattro|        audi| 23|
|c1500 suburban 2wd|   chevrolet| 20|
|c1500 suburban 2wd|   chevrolet| 15|
+------------------+------------+---+
only showing top 20 rows



In [19]:
mpg.select('model', 'manufacturer', 'hwy').show()

+------------------+------------+---+
|             model|manufacturer|hwy|
+------------------+------------+---+
|                a4|        audi| 29|
|                a4|        audi| 29|
|                a4|        audi| 31|
|                a4|        audi| 30|
|                a4|        audi| 26|
|                a4|        audi| 26|
|                a4|        audi| 27|
|        a4 quattro|        audi| 26|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 28|
|        a4 quattro|        audi| 27|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a4 quattro|        audi| 25|
|        a6 quattro|        audi| 24|
|        a6 quattro|        audi| 25|
|        a6 quattro|        audi| 23|
|c1500 suburban 2wd|   chevrolet| 20|
|c1500 suburban 2wd|   chevrolet| 15|
+------------------+------------+---+
only showing top 20 rows



## use a .alias('')

In [20]:
mpg.select(mpg.model, mpg.manufacturer, mpg.hwy.alias('highway_mileage')).show()

+------------------+------------+---------------+
|             model|manufacturer|highway_mileage|
+------------------+------------+---------------+
|                a4|        audi|             29|
|                a4|        audi|             29|
|                a4|        audi|             31|
|                a4|        audi|             30|
|                a4|        audi|             26|
|                a4|        audi|             26|
|                a4|        audi|             27|
|        a4 quattro|        audi|             26|
|        a4 quattro|        audi|             25|
|        a4 quattro|        audi|             28|
|        a4 quattro|        audi|             27|
|        a4 quattro|        audi|             25|
|        a4 quattro|        audi|             25|
|        a4 quattro|        audi|             25|
|        a4 quattro|        audi|             25|
|        a6 quattro|        audi|             24|
|        a6 quattro|        audi|             25|


In [21]:
avg_mileage_column = ((mpg.cty + mpg.hwy) / 2).alias('avg_mileage')
mpg.select(avg_mileage_column)

DataFrame[avg_mileage: double]

In [22]:
# select all '*' and the new calculated col we just made above and stored in a variable

mpg.select('*', avg_mileage_column).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+-----------+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|avg_mileage|
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+-----------+
|        audi|                a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|       23.5|
|        audi|                a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|       25.0|
|        audi|                a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|       25.5|
|        audi|                a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|       25.5|
|        audi|                a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|       21.0|
|        audi|                a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|       22.0|
|        audi|                a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|       22.5|
|        audi|        a4 quattro|  1.8|1

## .printSchema()

In [23]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



## Create a column

- create a column object that is independent of the dataframe

In [24]:
from pyspark.sql.functions import col, expr

col

<function pyspark.sql.functions._create_function.<locals>._(col)>

In [25]:
col('hwy')

Column<b'hwy'>

In [26]:
mpg.select(col('hwy'))

DataFrame[hwy: bigint]

In [27]:
mpg.select(mpg.hwy.alias('highway_mileage')).show(5)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
|             31|
|             30|
|             26|
+---------------+
only showing top 5 rows



In [28]:
col1 = mpg.hwy.alias('highway_mileage')
col2 = (mpg.hwy / 2).alias('highway_mileage_halved')
mpg.select(col1, col2).show(5)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
|             29|                  14.5|
|             31|                  15.5|
|             30|                  15.0|
|             26|                  13.0|
+---------------+----------------------+
only showing top 5 rows



In [29]:
avg_column = (col("hwy") + col("cty")) / 2

mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_column.alias("avg_mileage"),
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



# Exercises 1 and 2

## Create a pandas df of your fav languages

In [30]:
data = {'languages': ['python', 'go', 'java', 'r']}

df = pd.DataFrame(data)

## Convert a pandas df to a spark df object

In [31]:
df = spark.createDataFrame(df)
df

DataFrame[languages: string]

## View the schema of a spark df

In [60]:
df.printSchema()

root
 |-- languages: string (nullable = true)



In [33]:
df.dtypes

[('languages', 'string')]

## Output the shape of a spark df

In [34]:
df.count(), len(df.columns)

(4, 1)

## Show the first 5 records in the df

In [35]:
df.show(5)

+---------+
|languages|
+---------+
|   python|
|       go|
|     java|
|        r|
+---------+



## mpg spark df

In [47]:
from pydataset import data

spark = pyspark.sql.SparkSession.builder.getOrCreate()
mpg = spark.createDataFrame(data('mpg'))

mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [58]:
mpg.show(5, vertical=True)

-RECORD 0------------------
 manufacturer | audi       
 model        | a4         
 displ        | 1.8        
 year         | 1999       
 cyl          | 4          
 trans        | auto(l5)   
 drv          | f          
 cty          | 18         
 hwy          | 29         
 fl           | p          
 class        | compact    
-RECORD 1------------------
 manufacturer | audi       
 model        | a4         
 displ        | 1.8        
 year         | 1999       
 cyl          | 4          
 trans        | manual(m5) 
 drv          | f          
 cty          | 21         
 hwy          | 29         
 fl           | p          
 class        | compact    
-RECORD 2------------------
 manufacturer | audi       
 model        | a4         
 displ        | 2.0        
 year         | 2008       
 cyl          | 4          
 trans        | manual(m6) 
 drv          | f          
 cty          | 20         
 hwy          | 31         
 fl           | p          
 class        | comp

## Exercise 2

- Create 1 column of output that contains a message like the one below:

        The 1999 audi a4 has a 4 cylinder engine.


- The 1999 audi a4 has a 4 cylinder engine for each vehicle.


- Transform the trans column so that it only contains either manual or auto.

## Using expr 

- expr allows us to express manipulations to the column within the string that defines the column.

In [57]:
# mpg.select(
#     expr("hwy"),  # the same as `col`
#     expr("hwy + 1"),  # an arithmetic expression
#     expr("hwy AS highway_mileage"),  # using an alias
#     expr("hwy + 1 AS highway_incremented"),  # a combination of the above
# ).show(5)

In [49]:
mpg.show(2)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 2 rows



In [55]:
from pyspark.sql.functions import lit

mpg.select(concat(lit('The'),(mpg.year), (mpg.manufacturer), (mpg.model), )

## Transform the trans column so it is only manual or auto

In [54]:
mpg.show(2)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 2 rows



# Exercises 3

In [48]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
tips = spark.createDataFrame(data('tips'))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows

