# Exercises

Create a directory named `spark` within your `ds-methodologies` repository. This is where you will do the exercises for this module.

Create a jupyter notebook or python script named `spark101` for this exercise.

1. Create a spark data frame that contains your favorite programming languages.
    - The name of the column should be language
    - View the schema of the dataframe
    - Output the shape of the dataframe
    - Show the first 5 records in the dataframe.

In [1]:
import pyspark

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

In [3]:
import numpy as np
import pandas as pd

In [4]:
def language_spark_df(language_list):
    if type(language_list) != list:
        language_list = list(language_list)
    language_dict = dict(language = language_list)
    language_df = pd.DataFrame(language_dict)
    df = spark.createDataFrame(language_df)
    return df

In [5]:
df = language_spark_df(['Python', 'R', 'Julia', 'Scala'])

In [6]:
df.show(5)

+--------+
|language|
+--------+
|  Python|
|       R|
|   Julia|
|   Scala|
+--------+



In [7]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [8]:
df_shape = (df.count(), df.columns)
df_shape

(4, ['language'])

2. Load the `mpg` dataset as a spark dataframe.

    a. Create 1 column of output that contains a message like the one below:
    
        `The 1999 audi a4 has a 4 cylinder engine`
    
      for each vehicle

In [9]:
from pydataset import data

In [10]:
data('mpg')

Unnamed: 0,manufacturer,model,displ,year,cyl,trans,drv,cty,hwy,fl,class
1,audi,a4,1.8,1999,4,auto(l5),f,18,29,p,compact
2,audi,a4,1.8,1999,4,manual(m5),f,21,29,p,compact
3,audi,a4,2.0,2008,4,manual(m6),f,20,31,p,compact
4,audi,a4,2.0,2008,4,auto(av),f,21,30,p,compact
5,audi,a4,2.8,1999,6,auto(l5),f,16,26,p,compact
6,audi,a4,2.8,1999,6,manual(m5),f,18,26,p,compact
7,audi,a4,3.1,2008,6,auto(av),f,18,27,p,compact
8,audi,a4 quattro,1.8,1999,4,manual(m5),4,18,26,p,compact
9,audi,a4 quattro,1.8,1999,4,auto(l5),4,16,25,p,compact
10,audi,a4 quattro,2.0,2008,4,manual(m6),4,20,28,p,compact


In [11]:
def create_spark_df(dataset_string):
    df = spark.createDataFrame(data(dataset_string))
    return df

In [12]:
mpg = create_spark_df('mpg')
mpg

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

In [13]:
from pyspark.sql.functions import concat

In [14]:
from pyspark.sql.functions import lit

In [15]:
(
    mpg.select(concat(lit('The '), 
                      mpg.year, 
                      lit(' '), 
                      mpg.manufacturer,
                      lit(' '),
                      mpg.model,
                      lit(' has a '),
                      mpg.cyl,
                      lit(' cylinder engine.')))
    .show(truncate=False)
)

+------------------------------------------------------------------------------+
|concat(The , year,  , manufacturer,  , model,  has a , cyl,  cylinder engine.)|
+------------------------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                                     |
|The 1999 audi a4 has a 4 cylinder engine.                                     |
|The 2008 audi a4 has a 4 cylinder engine.                                     |
|The 2008 audi a4 has a 4 cylinder engine.                                     |
|The 1999 audi a4 has a 6 cylinder engine.                                     |
|The 1999 audi a4 has a 6 cylinder engine.                                     |
|The 2008 audi a4 has a 6 cylinder engine.                                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.                             |
|The 1999 audi a4 quattro has a 4 cylinder engine.                             |
|The 2008 audi a4 quattro ha

b. Transform the `trans` column so that the only container is either `manual` or `auto`.

In [16]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [17]:
(
    mpg.select(
        'trans',
        regexp_extract('trans', r'^(\w+)',1)
        .alias('trans_type'))
    .show()
)

+----------+----------+
|     trans|trans_type|
+----------+----------+
|  auto(l5)|      auto|
|manual(m5)|    manual|
|manual(m6)|    manual|
|  auto(av)|      auto|
|  auto(l5)|      auto|
|manual(m5)|    manual|
|  auto(av)|      auto|
|manual(m5)|    manual|
|  auto(l5)|      auto|
|manual(m6)|    manual|
|  auto(s6)|      auto|
|  auto(l5)|      auto|
|manual(m5)|    manual|
|  auto(s6)|      auto|
|manual(m6)|    manual|
|  auto(l5)|      auto|
|  auto(s6)|      auto|
|  auto(s6)|      auto|
|  auto(l4)|      auto|
|  auto(l4)|      auto|
+----------+----------+
only showing top 20 rows



3. Load the `tips` dataset as a spark dataframe.

In [18]:
tips = create_spark_df('tips')
tips

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

In [19]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [20]:
tips.describe()

DataFrame[summary: string, total_bill: string, tip: string, sex: string, smoker: string, day: string, time: string, size: string]

In [21]:
tips.describe().show()

+-------+------------------+-----------------+------+------+----+------+------------------+
|summary|        total_bill|              tip|   sex|smoker| day|  time|              size|
+-------+------------------+-----------------+------+------+----+------+------------------+
|  count|               244|              244|   244|   244| 244|   244|               244|
|   mean|19.785942622950813| 2.99827868852459|  null|  null|null|  null| 2.569672131147541|
| stddev| 8.902411954856856|1.383638189001182|  null|  null|null|  null|0.9510998047322344|
|    min|              3.07|              1.0|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|             10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+-----------------+------+------+----+------+------------------+



a. what percentage of the observations are smokers?

In [22]:
from pyspark.sql.functions import sum, count

In [23]:
tips.select(count(tips.smoker))

DataFrame[count(smoker): bigint]

In [24]:
smokers = tips.filter(tips.smoker == 'Yes')
smokers

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

In [25]:
smokers.show()

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     38.01| 3.0|  Male|   Yes| Sat|Dinner|   4|
|     11.24|1.76|  Male|   Yes| Sat|Dinner|   2|
|     20.29|3.21|  Male|   Yes| Sat|Dinner|   2|
|     13.81| 2.0|  Male|   Yes| Sat|Dinner|   2|
|     11.02|1.98|  Male|   Yes| Sat|Dinner|   2|
|     18.29|3.76|  Male|   Yes| Sat|Dinner|   4|
|      3.07| 1.0|Female|   Yes| Sat|Dinner|   1|
|     15.01|2.09|  Male|   Yes| Sat|Dinner|   2|
|     26.86|3.14|Female|   Yes| Sat|Dinner|   2|
|     25.28| 5.0|Female|   Yes| Sat|Dinner|   2|
|     17.92|3.08|  Male|   Yes| Sat|Dinner|   2|
|     19.44| 3.0|  Male|   Yes|Thur| Lunch|   2|
|     32.68| 5.0|  Male|   Yes|Thur| Lunch|   2|
|     28.97| 3.0|  Male|   Yes| Fri|Dinner|   2|
|      5.75| 1.0|Female|   Yes| Fri|Dinner|   2|
|     16.32| 4.3|Female|   Yes| Fri|Dinner|   2|
|     40.17|4.73|  Male|   Yes| Fri|Dinner|   4|
|     27.28| 4.0|  M

In [26]:
smokers.select(count(smokers.smoker)).show()

+-------------+
|count(smoker)|
+-------------+
|           93|
+-------------+



In [27]:
from pyspark.sql.functions import when

In [28]:
smoker_df = tips.select(tips.smoker, when(tips.smoker == 'No', 0).when(tips.smoker == 'Yes', 1).alias('smoker_mask'))

In [29]:
smoker_df.show()

+------+-----------+
|smoker|smoker_mask|
+------+-----------+
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
|    No|          0|
+------+-----------+
only showing top 20 rows



In [30]:
smoker_df.describe().show()

+-------+------+-------------------+
|summary|smoker|        smoker_mask|
+-------+------+-------------------+
|  count|   244|                244|
|   mean|  null|0.38114754098360654|
| stddev|  null|0.48666699935945246|
|    min|    No|                  0|
|    max|   Yes|                  1|
+-------+------+-------------------+



In [31]:
from pyspark.sql.functions import avg

In [32]:
smoker_df.select((avg('smoker_mask')*100).alias('%_of_smokers')).show()

+------------------+
|      %_of_smokers|
+------------------+
|38.114754098360656|
+------------------+



b. Create a column that contains the tip percentage

In [33]:
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [34]:
from pyspark.sql.functions import expr

In [35]:
tip_percent_col = tips.select(expr('(tip/total_bill)*100').alias('tip_percent'))

In [36]:
tips = tips.select('total_bill', 'tip', expr('(tip/total_bill)*100').alias('tip_percent'),'sex', 'smoker','day', 'time', 'size')

In [37]:
tips.describe().show()

+-------+------------------+-----------------+-----------------+------+------+----+------+------------------+
|summary|        total_bill|              tip|      tip_percent|   sex|smoker| day|  time|              size|
+-------+------------------+-----------------+-----------------+------+------+----+------+------------------+
|  count|               244|              244|              244|   244|   244| 244|   244|               244|
|   mean|19.785942622950813| 2.99827868852459|16.08025817225047|  null|  null|null|  null| 2.569672131147541|
| stddev| 8.902411954856856|1.383638189001182|6.107220419157192|  null|  null|null|  null|0.9510998047322344|
|    min|              3.07|              1.0|3.563813585135547|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|             10.0| 71.0344827586207|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+-----------------+-----------------+------+------+----+------+------------------+



c. Calculate the average tip percentage for each combination of sex and smoker.

In [38]:
tips.show()

+----------+----+------------------+------+------+---+------+----+
|total_bill| tip|       tip_percent|   sex|smoker|day|  time|size|
+----------+----+------------------+------+------+---+------+----+
|     16.99|1.01|5.9446733372572105|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|16.054158607350097|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|16.658733936220845|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31| 13.97804054054054|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|14.680764538430255|Female|    No|Sun|Dinner|   4|
|     25.29|4.71| 18.62396204033215|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0| 22.80501710376283|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|11.607142857142858|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|13.031914893617023|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|21.853856562922868|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71| 16.65043816942551|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|14.180374361883155|Female|    No|Sun|Dinner| 

In [39]:
tips.groupBy(tips.sex, tips.smoker).agg(avg('tip_percent')).show()

+------+------+------------------+
|   sex|smoker|  avg(tip_percent)|
+------+------+------------------+
|  Male|    No| 16.06687151291298|
|  Male|   Yes|15.277117520248513|
|Female|    No|15.692097076918358|
|Female|   Yes| 18.21503526994103|
+------+------+------------------+



In [40]:
tips.crosstab('smoker', 'sex').show()

+----------+------+----+
|smoker_sex|Female|Male|
+----------+------+----+
|        No|    54|  97|
|       Yes|    33|  60|
+----------+------+----+



In [41]:
tips.groupBy('smoker').pivot('sex').avg('tip_percent').show()

+------+------------------+------------------+
|smoker|            Female|              Male|
+------+------------------+------------------+
|    No|15.692097076918358| 16.06687151291298|
|   Yes| 18.21503526994103|15.277117520248513|
+------+------------------+------------------+



4. Use the Seattle weather dataset referenced in the lesson to answer the questions below.

In [45]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



- Convert the temperatures to farenheight

In [55]:
weather = weather.select('date', 
               'precipitation',
               expr('(temp_max *9/5)+100').alias('temp_max_F'), 
               expr('(temp_min * 9/5) + 100').alias('temp_min_F'),
               'wind',
               'weather')



In [56]:
weather.show()

+----------+-------------+------------------+----------+----+-------+
|      date|precipitation|        temp_max_F|temp_min_F|wind|weather|
+----------+-------------+------------------+----------+----+-------+
|2012-01-01|          0.0|123.03999999999999|     109.0| 4.7|drizzle|
|2012-01-02|         10.9|            119.08|    105.04| 4.5|   rain|
|2012-01-03|          0.8|            121.06|    112.96| 2.3|   rain|
|2012-01-04|         20.3|121.96000000000001|    110.08| 4.7|   rain|
|2012-01-05|          1.3|116.02000000000001|    105.04| 6.1|   rain|
|2012-01-06|          2.5|            107.92|    103.96| 2.2|   rain|
|2012-01-07|          0.0|            112.96|    105.04| 2.3|   rain|
|2012-01-08|          0.0|             118.0|    105.04| 2.0|    sun|
|2012-01-09|          4.3|            116.92|     109.0| 3.4|   rain|
|2012-01-10|          1.0|            110.98|    101.08| 3.4|   rain|
|2012-01-11|          0.0|            110.98|     98.02| 5.1|    sun|
|2012-01-12|        

- Which month has the most rain on average?

In [65]:
from pyspark.sql.functions import month, year, quarter, max, min

In [95]:
rainfall = (
    weather.withColumn('month', month('date'))
    .groupBy('month')
    .agg(avg('precipitation')
    .alias('avg_rainfall'))
    .sort('avg_rainfall')
)

In [96]:
rainfall.show()

+-----+-------------------+
|month|       avg_rainfall|
+-----+-------------------+
|    7|0.38870967741935486|
|    6| 1.1075000000000002|
|    8| 1.3201612903225806|
|    5| 1.6733870967741935|
|    9| 1.9624999999999997|
|    4|  3.128333333333333|
|    2|  3.734513274336283|
|    1| 3.7580645161290316|
|   10|  4.059677419354839|
|    3|  4.888709677419355|
|   12|  5.021774193548388|
|   11|  5.354166666666667|
+-----+-------------------+



In [108]:
rainfall.createOrReplaceTempView('rainfall')

In [117]:
spark.sql(
    """
    SELECT * 
    FROM rainfall
    WHERE avg_rainfall = (SELECT MAX(avg_rainfall) FROM rainfall)
    """).show()

+-----+-----------------+
|month|     avg_rainfall|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+



 - Which year was the windiest?

In [129]:
wind = (weather
 .withColumn('year', year('date'))
 .groupBy('year')
 .agg(sum('wind').alias('sum_of_wind'))
)

In [130]:
wind.show()

+----+------------------+
|year|       sum_of_wind|
+----+------------------+
|2015|            1153.3|
|2013|1100.8000000000002|
|2014|1236.5000000000005|
|2012|1244.6999999999998|
+----+------------------+



In [131]:
wind.createOrReplaceTempView('wind')

In [133]:
spark.sql(
    """
    SELECT *
    FROM wind
    WHERE sum_of_wind = (SELECT MAX(sum_of_wind) from wind)
    """).show()

+----+------------------+
|year|       sum_of_wind|
+----+------------------+
|2012|1244.6999999999998|
+----+------------------+



- What is the most frequent type of weather in January?

In [164]:
jan_weather = (
    weather
    .filter(month('date') == 1)
    .groupBy('weather')
    .agg(count('weather')
    .alias('days')))

In [165]:
jan_weather.createOrReplaceTempView('jan_weather')
jan_weather.show()

+-------+----+
|weather|days|
+-------+----+
|    fog|  38|
|drizzle|  10|
|   rain|  35|
|    sun|  33|
|   snow|   8|
+-------+----+



In [166]:
spark.sql(
    """
    SELECT *
    FROM jan_weather
    WHERE days = (SELECT MAX(days) FROM jan_weather)
    """).show()

+-------+----+
|weather|days|
+-------+----+
|    fog|  38|
+-------+----+



- What percentage of days were rainy in Q3 of 2015

In [199]:
(
    weather
    .withColumn('quarter', quarter('date'))
    .withColumn('year', year('date'))
    .filter(expr('year = 2015'))
    .filter(expr('quarter = 3'))
    .select(weather.precipitation, when(weather.precipitation > 0, 1).otherwise(0).alias('was_rainy'))
    .agg(avg('was_rainy')*100)
).show()

+----------------------+
|(avg(was_rainy) * 100)|
+----------------------+
|    18.478260869565215|
+----------------------+



- For each year, find what percentage of days it rained (had non-zero percipitation).

In [221]:
(
    weather
    .withColumn('year', year('date'))
#     .groupBy('year')
#     .agg(sum('precipitation'))
    .select(weather.precipitation, when(weather.precipitation > 0, 1).otherwise(0).alias('was_rainy'),'year')
    .groupBy('year')
    .agg((avg('was_rainy')*100).alias('percentage_days_rainy'))
    .sort('year')
    
).show()

+----+---------------------+
|year|percentage_days_rainy|
+----+---------------------+
|2012|    48.36065573770492|
|2013|    41.64383561643836|
|2014|     41.0958904109589|
|2015|    39.45205479452055|
+----+---------------------+

