---
# Spark API Exercises
---

In [1]:
import pandas as pd
import numpy as np
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *
# import pyspark.sql.functions as F

## 1.

Create a spark data frame that contains your favorite programming languages.
- The name of the column should be language

In [2]:
pandas_df = pd.DataFrame({'language' : ['Python', 'SQL']})

In [3]:
df = spark.createDataFrame(pandas_df)

- View the schema of the dataframe

In [4]:
df.printSchema()

root
 |-- language: string (nullable = true)



- Output the shape of the dataframe

In [5]:
pandas_df.shape

(2, 1)

In [6]:
df.count(), len(df.columns)

(2, 1)

- Show the first 5 records in the dataframe

In [7]:
df.show(5)

+--------+
|language|
+--------+
|  Python|
|     SQL|
+--------+



## 2.

Load the mpg dataset as a spark dataframe.

In [8]:
from pydataset import data

In [9]:
mpg_pandas = data('mpg')

In [10]:
mpg = spark.createDataFrame(mpg_pandas)

- Create 1 column of output that contains a message like the one below for each vehicle:
    - `The 1999 audi a4 has a 4 cylinder engine.`

In [11]:
mpg.show(2)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 2 rows



In [12]:
mpg.select(concat(lit('The '), 'year', lit(' '), 'manufacturer', lit(' has a '), 'cyl', lit(' cylinder engine.'))).show(5, truncate=False)

+--------------------------------------------------------------------+
|concat(The , year,  , manufacturer,  has a , cyl,  cylinder engine.)|
+--------------------------------------------------------------------+
|The 1999 audi has a 4 cylinder engine.                              |
|The 1999 audi has a 4 cylinder engine.                              |
|The 2008 audi has a 4 cylinder engine.                              |
|The 2008 audi has a 4 cylinder engine.                              |
|The 1999 audi has a 6 cylinder engine.                              |
+--------------------------------------------------------------------+
only showing top 5 rows



- Transform the trans column so that it only contains either manual or auto.

In [13]:
mpg.select(regexp_extract('trans', r"^(\w+?)\(", 1).alias('trans_type')).show()

+----------+
|trans_type|
+----------+
|      auto|
|    manual|
|    manual|
|      auto|
|      auto|
|    manual|
|      auto|
|    manual|
|      auto|
|    manual|
|      auto|
|      auto|
|    manual|
|      auto|
|    manual|
|      auto|
|      auto|
|      auto|
|      auto|
|      auto|
+----------+
only showing top 20 rows



## 3.

Load the tips dataset as a spark dataframe.

In [14]:
tips_pandas = data('tips')

In [15]:
tips = spark.createDataFrame(tips_pandas)

In [16]:
tips.show(2)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 2 rows



- What percentage of observations are smokers?

In [17]:
tips.groupBy('smoker').count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



In [18]:
tips.rollup('smoker').count().show()

+------+-----+
|smoker|count|
+------+-----+
|   Yes|   93|
|    No|  151|
|  null|  244|
+------+-----+



In [19]:
(tips.filter(col('smoker') == 'Yes').count() / (tips.count())) * 100

38.114754098360656

- Create a column that contains the tip percentage

In [20]:
tips.withColumn('tip_percentage', tips.tip / tips.total_bill).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

- Calculate the average tip percentage for each combination of sex and smoker.

In [21]:
(
    tips.withColumn('tip_percentage', tips.tip / tips.total_bill)
    .groupBy('sex', 'smoker')
    .agg(mean(col('tip_percentage')).alias('mean_tip_percentage'))
    .show()
)

+------+------+-------------------+
|   sex|smoker|mean_tip_percentage|
+------+------+-------------------+
|  Male|    No| 0.1606687151291298|
|  Male|   Yes| 0.1527711752024851|
|Female|    No| 0.1569209707691836|
|Female|   Yes|0.18215035269941035|
+------+------+-------------------+



## 4.

Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [22]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



- Convert the temperatures to fahrenheit.

In [23]:
def T_to_F(temp):
    return (temp * (9/5) + 32)

In [24]:
(
    weather.withColumn('temp_max_F', T_to_F(weather.temp_max))
    .withColumn('temp_min_F', T_to_F(weather.temp_min))
    .show()
)

+----------+-------------+--------+--------+----+-------+------------------+------------------+
|      date|precipitation|temp_max|temp_min|wind|weather|        temp_max_F|        temp_min_F|
+----------+-------------+--------+--------+----+-------+------------------+------------------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.040000000000006|              41.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|             51.08|             37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|             53.06|             44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|             53.96|             42.08|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.019999999999996|             37.04|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|             39.92|             35.96|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|             44.96|             37.04|
|2012-01-08|          0.0|    10.0|     

- Which month has the most rain, on average?

In [25]:
(
    weather.withColumn('month', month('date'))
    .groupBy('month').
    agg(mean('precipitation').alias('mean_precipitation'))
    .sort(desc('mean_precipitation'))
    .show()
)

+-----+-------------------+
|month| mean_precipitation|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548388|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



- Which year was the windiest?

In [26]:
(
    weather.withColumn('year', year('date'))
    .groupBy('year')
    .agg(sum('wind').alias('Total Wind'))
    .show()
)

+----+------------------+
|year|        Total Wind|
+----+------------------+
|2012|1244.6999999999998|
|2013|1100.8000000000002|
|2014|1236.5000000000005|
|2015|            1153.3|
+----+------------------+



- What is the most frequent type of weather in January?

In [27]:
(
    weather.withColumn('month', month('date'))
    .filter(col('month') == 1)
    .groupBy('weather')
    .count()
    .sort(desc('count'))
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



- What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [28]:
(
    weather.withColumn('month', month('date'))
    .withColumn('year', year('date'))
    .filter(((col('year') == 2013) | (col('year') == 2014)) & (col('month') == 7))
    .filter(weather.weather == 'sun')
    .select(mean('temp_max').alias('average_high_temp_C'))
    .show()
)

+-------------------+
|average_high_temp_C|
+-------------------+
| 26.828846153846158|
+-------------------+



In [29]:
(
    weather.withColumn('month', month('date'))
    .withColumn('year', year('date'))
    .filter(((col('year') == 2013) | (col('year') == 2014)) & (col('month') == 7))
    .filter(weather.weather == 'sun')
    .select(mean('temp_min').alias('average_low_temp_C'))
    .show()
)

+------------------+
|average_low_temp_C|
+------------------+
| 14.18269230769231|
+------------------+



In [30]:
(
    weather.withColumn('temp_max_F', T_to_F(weather.temp_max))
    .withColumn('temp_min_F', T_to_F(weather.temp_min))
    .withColumn('month', month('date'))
    .withColumn('year', year('date'))
    .filter(((col('year') == 2013) | (col('year') == 2014)) & (col('month') == 7))
    .filter(weather.weather == 'sun')
    .select(mean('temp_max_F').alias('average_high_temp_F'))
    .show()
)

+-------------------+
|average_high_temp_F|
+-------------------+
|  80.29192307692308|
+-------------------+



In [31]:
(
    weather.withColumn('temp_max_F', T_to_F(weather.temp_max))
    .withColumn('temp_min_F', T_to_F(weather.temp_min))
    .withColumn('month', month('date'))
    .withColumn('year', year('date'))
    .filter(((col('year') == 2013) | (col('year') == 2014)) & (col('month') == 7))
    .filter(weather.weather == 'sun')
    .select(mean('temp_min_F').alias('average_low_temp_F'))
    .show()
)

+------------------+
|average_low_temp_F|
+------------------+
| 57.52884615384615|
+------------------+



- What percentage of days were rainy in q3 of 2015?

In [32]:
(
    weather.withColumn('month', month('date'))
    .withColumn('year', year('date'))
    .filter(((col('month') == 7) | (col('month') == 8) | (col('month') == 9)) & (col('year') == 2015))
    .rollup('weather')
    .count()
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   21|
|    sun|   64|
|drizzle|    5|
|   rain|    2|
|   null|   92|
+-------+-----+



In [33]:
(2 / 92) * 100

2.1739130434782608

- For each year, find what percentage of days it rained (had non-zero precipitation).

In [34]:
(
    weather.withColumn('year', year('date'))
    .groupBy('year')
    .pivot('weather')
    .count()
    .show()
)

+----+-------+---+----+----+---+
|year|drizzle|fog|rain|snow|sun|
+----+-------+---+----+----+---+
|2015|      7|173|   5|null|180|
|2013|     16| 82|  60|   2|205|
|2014|   null|151|   3|null|211|
|2012|     31|  5| 191|  21|118|
+----+-------+---+----+----+---+



In [35]:
(
    weather.withColumn('year', year('date'))
    .crosstab('year', 'weather')
    .show()
)

+------------+-------+---+----+----+---+
|year_weather|drizzle|fog|rain|snow|sun|
+------------+-------+---+----+----+---+
|        2012|     31|  5| 191|  21|118|
|        2013|     16| 82|  60|   2|205|
|        2014|      0|151|   3|   0|211|
|        2015|      7|173|   5|   0|180|
+------------+-------+---+----+----+---+



In [36]:
rainy_days = (
    weather.withColumn('year', year('date'))
    .select('*', when(weather.precipitation > 0, 1).otherwise(0).alias('rained'))
    .filter(col('rained') == 1)
    .groupBy('year')
    .agg(count('date').alias('rain_count'))
)

In [37]:
total_days = (
    weather.withColumn('year', year('date'))
    .groupBy('year')
    .agg(count('date').alias('total_count'))
)

In [38]:
(
    rainy_days.join(total_days, on=rainy_days.year == total_days.year)
    .withColumn('proportion', col('rain_count') / col('total_count'))
    .show()
)

+----+----------+----+-----------+-------------------+
|year|rain_count|year|total_count|         proportion|
+----+----------+----+-----------+-------------------+
|2012|       177|2012|        366|0.48360655737704916|
|2013|       152|2013|        365|0.41643835616438357|
|2014|       150|2014|        365|  0.410958904109589|
|2015|       144|2015|        365|0.39452054794520547|
+----+----------+----+-----------+-------------------+



In [39]:
(
    rainy_days.join(total_days, on=rainy_days.year == total_days.year)
    .withColumn('percentage', (col('rain_count') / col('total_count')*100))
    .show()
)

+----+----------+----+-----------+-----------------+
|year|rain_count|year|total_count|       percentage|
+----+----------+----+-----------+-----------------+
|2012|       177|2012|        366|48.36065573770492|
|2013|       152|2013|        365|41.64383561643836|
|2014|       150|2014|        365| 41.0958904109589|
|2015|       144|2015|        365|39.45205479452055|
+----+----------+----+-----------+-----------------+

