## Spark 101 Exercises

In [1]:
# imports
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import numpy as np

from pyspark.sql.functions import col, expr
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import lit
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import when
from pyspark.sql.functions import asc, desc
from pyspark.sql.functions import month, year, quarter
from pyspark.sql.functions import *
import pydataset

1. Create a spark data frame that contains your favorite programming languages.

In [2]:
# First we will create a pandas DataFrame
pandas_dataframe = pd.DataFrame(
    dict(language = ['Python', 'R', 'Java', 'C++', 'JavaScript', 'HTML'])
)
pandas_dataframe

Unnamed: 0,language
0,Python
1,R
2,Java
3,C++
4,JavaScript
5,HTML


1a. The name of the column should be language

In [3]:
df = spark.createDataFrame(pandas_dataframe)
df.show()

+----------+
|  language|
+----------+
|    Python|
|         R|
|      Java|
|       C++|
|JavaScript|
|      HTML|
+----------+



1b. View the schema of the dataframe

In [4]:
df.printSchema()

root
 |-- language: string (nullable = true)



1c. Output the shape of the dataframe

In [5]:
print(df.count(), len(df.columns))

6 1


In [6]:
# A better way...

print('DataFrame shape: ', df.count(), ' x ', len(df.columns))

DataFrame shape:  6  x  1


1d. Show the first 5 records in the dataframe

In [7]:
df.show(5)

+----------+
|  language|
+----------+
|    Python|
|         R|
|      Java|
|       C++|
|JavaScript|
+----------+
only showing top 5 rows



2. Load the mpg dataset as a spark dataframe.

In [8]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

In [9]:
# what does it look like?
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



2a. Create 1 column of output that contains a message like the one below: The 1999 audi a4 has a 4 cylinder engine. For each vehicle.


In [10]:
col = concat(lit('The '), 
             mpg.year, 
             lit(' '), 
             mpg.manufacturer, 
             lit(' '), 
             mpg.model, 
             lit(' has a '), 
             mpg.cyl,
             lit(' cylinder engine.'))

mpg_2 = mpg.select(col.alias('summary'))

mpg_2.show(truncate=False)

+--------------------------------------------------------------+
|summary                                                       |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [11]:
# Review notes...

mpg.select(
    concat(
        lit('The '),
        col('year'),
        lit(' '),
        col('manufacturer'),
        lit(' '),
        col('model'),
        lit(' has a '),
        col('cyl'),
        lit(' cylinder engine.'),
    ).alias('vehicle_cylinder_desc')
).show(truncate=False)

TypeError: 'Column' object is not callable

2b. Transform the trans column so that it only contains either manual or auto.

In [None]:
# using regex
col = mpg.select("trans",
    regexp_replace("trans", r"(\(\w+\))", "").alias("trans"),
).show(truncate=False)

In [None]:
new_mpg = mpg.select('manufacturer', 
                 'model',
                 'displ', 
                 'year', 
                 'cyl', 
                 regexp_replace("trans", r"(\(\w+\))", "").alias("trans"), 
                 'drv', 
                 'cty', 
                 'hwy', 
                 'fl', 
                 'class')
new_mpg.show(5)

In [None]:
# using SQL
mpg.select(mpg.trans,
          when((mpg.trans.contains('auto')), 'auto')
          .otherwise('manual')
          .alias('trans')).show(20)

3. Load the tips dataset as a spark dataframe.
    1. What percentage of observations are smokers?
    1. Create a column that contains the tip percentage
    1. Calculate the average tip percentage for each combination of sex and smoker.

In [None]:
tips = spark.createDataFrame(data('tips'))
tips.show(5)

3a. What percentage of observations are smokers?

In [None]:
smoker_percent = (tips.where(tips.smoker == "Yes").count() / tips.select('smoker').count() * 100)

In [None]:
print(f'The percentage of smokers in the tips dataset is: {smoker_percent:.2f}%')

In [None]:
# let's see how Zach did it...
tips.groupBy('smoker').count().show()

In [None]:
tips.groupBy('smoker').count().withColumn(
    'percent',
    concat(round((col('count') / tips.count() * 100), 0).cast('int'), lit('%')),
).show()

3b. Create a column that contains the tip percentage

In [None]:
tips.withColumn('percent_tip', col('tip') / col('total_bill')*100).show()

3c. Calculate the average tip percentage for each combination of sex and smoker.

In [None]:
(
    tips.withColumn('percent_tip', col('tip') / col('total_bill'))
    .groupby('sex')
    .pivot('smoker')
    .agg(round(mean('percent_tip')*100, 2))
    .show()
)

4. Use the seattle weather dataset referenced in the lesson to answer the questions below.
    1. Convert the temperatures to fahrenheit.
    1. Which month has the most rain, on average?
    1. Which year was the windiest?
    1. What is the most frequent type of weather in January?
    1. What is the average high and low temperature on sunny days in July in 2013 and 2014?
    1. What percentage of days were rainy in q3 of 2015?
    1. For each year, find what percentage of days it rained (had non-zero precipitation).

In [None]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)