# Exercises
following Spark API Lession

In [1]:
# Create the spark session:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import numpy as np

> **1. Create a spark data frame that contains your favorite programming languages.**

- The name of the column should be `language`

In [2]:
pd_df = pd.DataFrame({'lanugage': ['Python', 'SQL', 'Java', 'HTML', 'C++']})
sp_df = spark.createDataFrame(pd_df)
sp_df

DataFrame[lanugage: string]

- View the schema of the dataframe

In [3]:
# like df.info()
sp_df.printSchema()

root
 |-- lanugage: string (nullable = true)



- Output the shape of the dataframe

In [4]:
sp_df.count(), len(sp_df.columns)

(5, 1)

- Show the first 5 records in the dataframe

In [5]:
sp_df.show(5)

+--------+
|lanugage|
+--------+
|  Python|
|     SQL|
|    Java|
|    HTML|
|     C++|
+--------+



> **2. Load the `mpg` dataset as a spark dataframe.**

In [6]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

- Create 1 column of output that contains a message like the one below:


  ```The 1999 audi a4 has a 4 cylinder engine.```
  for each vehicle.

In [8]:
from pyspark.sql.functions import col, concat, lit

mpg.select(concat(lit('The '), 
                  mpg.year,
                  lit(' '),
                  mpg.manufacturer,
                  lit(' '),
                  mpg.model, 
                  lit(' has a '), 
                  mpg.cyl, 
                  lit(' cylinder engine.'))
           .alias('string')).show(5, truncate=False)

+-----------------------------------------+
|string                                   |
+-----------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.|
|The 1999 audi a4 has a 4 cylinder engine.|
|The 2008 audi a4 has a 4 cylinder engine.|
|The 2008 audi a4 has a 4 cylinder engine.|
|The 1999 audi a4 has a 6 cylinder engine.|
+-----------------------------------------+
only showing top 5 rows



- Transform the `trans` column so that it only contains either `manual` or `auto`.

In [18]:
from pyspark.sql.functions import regexp_extract, regexp_replace

mpg.select(
    regexp_extract('trans', r'^(\w+)', 1).alias('trans')).show(5)

+------+
| trans|
+------+
|  auto|
|manual|
|manual|
|  auto|
|  auto|
+------+
only showing top 5 rows



> **3. Load the `tips` dataset as a spark dataframe.**

In [10]:
tips = spark.createDataFrame(data('tips'))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



- What percentage of observations are smokers?

In [21]:
from pyspark.sql.functions import count, avg

round(tips.filter(tips.smoker == 'Yes').count() / tips.count() * 100, 2)

38.11

- Create a column that contains the tip percentage

In [12]:
tips = tips.select('*', (tips.tip/tips.total_bill * 100).alias('tip_percentage'))
tips.select('tip_percentage').show(5)

+------------------+
|    tip_percentage|
+------------------+
|5.9446733372572105|
|16.054158607350097|
|16.658733936220845|
| 13.97804054054054|
|14.680764538430255|
+------------------+
only showing top 5 rows



- Calculate the average tip percentage for each combination of sex and smoker.

In [13]:
tips.groupBy('sex', 'smoker').agg(avg(tips.tip_percentage)).show(5)

+------+------+-------------------+
|   sex|smoker|avg(tip_percentage)|
+------+------+-------------------+
|  Male|    No|  16.06687151291298|
|  Male|   Yes| 15.277117520248513|
|Female|    No| 15.692097076918358|
|Female|   Yes|  18.21503526994103|
+------+------+-------------------+



> **4. Use the seattle weather dataset referenced in the lesson to answer the questions below.**

In [14]:
from vega_datasets import data
sw = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
sw = spark.createDataFrame(sw)
sw.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



- Convert the temperatures to farenheight.

In [15]:
# Fahrenheit = (Celsius * 9/5) + 32
sw.select((sw.temp_max * (9/5) + 32).alias('temp_max_f'),
          (sw.temp_min * (9/5) + 32).alias('temp_min_f')).show(5)

+------------------+----------+
|        temp_max_f|temp_min_f|
+------------------+----------+
|55.040000000000006|      41.0|
|             51.08|     37.04|
|             53.06|     44.96|
|             53.96|     42.08|
|48.019999999999996|     37.04|
+------------------+----------+
only showing top 5 rows



- Which month has the most rain, on average?

In [32]:
from pyspark.sql.functions import asc, desc, sum, max, when
from pyspark.sql.functions import month, year, quarter

In [22]:
### redo
#(
#    sw.withColumn("month", month("date"))
#    .groupBy("month")
#    .agg(avg("precipitation").alias("avg_rainfall"))
#    .sort(desc("avg_rainfall"))
#    .show()
#)

+-----+-------------------+
|month|       avg_rainfall|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548388|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



- Which year was the windiest?

In [24]:
(
    sw.withColumn('year', year('date'))
    .groupBy('year')
    .agg(sum('wind').alias('total_wind'))
    .sort(desc('total_wind'))
    .show()
)

+----+------------------+
|year|        total_wind|
+----+------------------+
|2012|1244.6999999999998|
|2014|1236.5000000000005|
|2015|            1153.3|
|2013|1100.8000000000002|
+----+------------------+



- What is the most frequent type of weather in January?

In [30]:
(
    sw.withColumn('weather', sw.weather)
    .filter(month('date') == 1)
    .groupBy('weather')
    .agg(count('weather').alias('freq_weather'))
    .sort(desc('freq_weather'))
    .show()
)

+-------+------------+
|weather|freq_weather|
+-------+------------+
|    fog|          38|
|   rain|          35|
|    sun|          33|
|drizzle|          10|
|   snow|           8|
+-------+------------+



- What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [26]:
(
    sw.filter(((year('date') == '2013') 
        | (year('date') == '2014')) 
        & (month('date') == '07'))
        .where(sw.weather == 'sun')
        .groupBy(month('date'), year('date'))
        .agg(avg('temp_min'), avg('temp_max'))
        .show()
)

+-----------+----------+------------------+------------------+
|month(date)|year(date)|     avg(temp_min)|     avg(temp_max)|
+-----------+----------+------------------+------------------+
|          7|      2013|13.981481481481483|26.585185185185193|
|          7|      2014|14.400000000000002|            27.092|
+-----------+----------+------------------+------------------+



- What percentage of days were rainy in q3 of 2015?

In [27]:
#q1: 01-03, q2: 04-06, q3: 07-09, q4: 10-12
rainy_days = (
    sw.withColumn('2015', year('date'))
    .filter(year('date') == '2015').alias('2015')
    .filter((month('date') == '07')
           |(month('date') == '08')
           |(month('date') == '09'))
    .groupBy('2015')
    .agg(count(sw.weather == 'rain').alias('num_days'))
)

rainy_days.select((rainy_days.num_days / sw.count() * 100).alias('pct_rainy')).show() 

+------------------+
|         pct_rainy|
+------------------+
|6.2970568104038325|
+------------------+



- For each year, find what percentage of days it rained (had non-zero precipitation).

In [34]:
(
    sw.withColumn('year', year('date'))
    .select(when(col('precipitation') > 0, 1).otherwise(0).alias('rain'), 'year')
    .groupBy('year')
    .agg(avg('rain'))
    .show()
)

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

