### Create a jupyter notebook or python script named `spark101` for this exercise.

#### 1. Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

#### 2. Load the mpg dataset as a spark dataframe.

    a. Create 1 column of output that contains a message like the one below:


`The 1999 audi a4 has a 4 cylinder engine.`
For each vehicle.

    b. Transform the trans column so that it only contains either manual or auto.

#### 3. Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

#### 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [1]:
import pyspark
import pydataset
import pandas as pd
from pyspark.sql.functions import *
from vega_datasets import data as vega_data

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.createDataFrame(pd.DataFrame({'language':['python','java','c++','r','javascript','scala','html']}))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/09 08:36:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df.schema

StructType([StructField('language', StringType(), True)])

In [4]:
print(df.count(),'x',len(df.columns))

[Stage 0:>                                                          (0 + 8) / 8]

7 x 1


                                                                                

In [5]:
df.show(5)

+----------+
|  language|
+----------+
|    python|
|      java|
|       c++|
|         r|
|javascript|
+----------+
only showing top 5 rows



#### 2. Load the mpg dataset as a spark dataframe.

    a. Create 1 column of output that contains a message like the one below:


`The 1999 audi a4 has a 4 cylinder engine.`
For each vehicle.

    b. Transform the trans column so that it only contains either manual or auto.

In [6]:
df = spark.createDataFrame(pydataset.data('mpg'))

In [7]:
df.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [8]:
message = (concat(lit('The '),'year',lit(' '),'manufacturer',lit(' '),'model',lit(' has a '), 'cyl', lit(' cylinder engine.')))

df.select('*',message.alias('message')).show(7, truncate = False)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+-----------------------------------------+
|manufacturer|model|displ|year|cyl|trans     |drv|cty|hwy|fl |class  |message                                  |
+------------+-----+-----+----+---+----------+---+---+---+---+-------+-----------------------------------------+
|audi        |a4   |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The 1999 audi a4 has a 4 cylinder engine.|
|audi        |a4   |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The 1999 audi a4 has a 4 cylinder engine.|
|audi        |a4   |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The 2008 audi a4 has a 4 cylinder engine.|
|audi        |a4   |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The 2008 audi a4 has a 4 cylinder engine.|
|audi        |a4   |2.8  |1999|6  |auto(l5)  |f  |16 |26 |p  |compact|The 1999 audi a4 has a 6 cylinder engine.|
|audi        |a4   |2.8  |1999|6  |manual(m5)|f  |18 |26 |p  |compact|The 1999 audi a4 has a 6 c

In [9]:
df.select(
    "trans",
    regexp_extract('trans',r'(\w{4,6})',1)
).show(5)

+----------+-----------------------------------+
|     trans|regexp_extract(trans, (\w{4,6}), 1)|
+----------+-----------------------------------+
|  auto(l5)|                               auto|
|manual(m5)|                             manual|
|manual(m6)|                             manual|
|  auto(av)|                               auto|
|  auto(l5)|                               auto|
+----------+-----------------------------------+
only showing top 5 rows



#### 3. Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

In [10]:
df = spark.createDataFrame(pydataset.data('tips'))

In [11]:
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [12]:
is_smoker = when(df.smoker == 'Yes',1).otherwise(0)
df.select(avg(is_smoker).alias('percent_smokers')).show(5)

+-------------------+
|    percent_smokers|
+-------------------+
|0.38114754098360654|
+-------------------+



In [13]:
df.groupby('smoker').count().withColumn(
    'percent', col('count')/df.count()).show()



+------+-----+-------------------+
|smoker|count|            percent|
+------+-----+-------------------+
|    No|  151| 0.6188524590163934|
|   Yes|   93|0.38114754098360654|
+------+-----+-------------------+



                                                                                

In [14]:
tip_percentage = (df.tip / df.total_bill).alias('tip_percentage')
df = df.select('*', tip_percentage)

In [15]:
df.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [16]:
df.groupby('sex','smoker').agg(avg('tip_percentage')).show()

[Stage 20:>                                                         (0 + 8) / 8]

+------+------+-------------------+
|   sex|smoker|avg(tip_percentage)|
+------+------+-------------------+
|  Male|    No| 0.1606687151291298|
|Female|    No| 0.1569209707691836|
|  Male|   Yes|0.15277117520248512|
|Female|   Yes|0.18215035269941032|
+------+------+-------------------+



                                                                                

#### 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [17]:
weather = vega_data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [18]:
def convert_c_to_f(temp):
    """ Converts Fahrenheit to Celcius """
    return temp * (9/5) + 32

# convert python function to pyspark udf
udf_convert = udf(lambda temp: convert_c_to_f(temp))

In [19]:
weather = weather.withColumn('temp_max_f', 
                   udf_convert(col('temp_max'))).withColumn('temp_min_f', 
                   udf_convert(col('temp_min')))

In [20]:
month = regexp_extract(weather.date, r'-(\d+)-',1).alias('month')
year = regexp_extract(weather.date, r'(\d{4})',1).alias('year')

In [21]:
weather = weather.select('*',month, year)
weather.show(5)

[Stage 24:>                                                         (0 + 1) / 1]                                                                                

+----------+-------------+--------+--------+----+-------+------------------+----------+-----+----+
|      date|precipitation|temp_max|temp_min|wind|weather|        temp_max_f|temp_min_f|month|year|
+----------+-------------+--------+--------+----+-------+------------------+----------+-----+----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.040000000000006|      41.0|   01|2012|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|             51.08|     37.04|   01|2012|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|             53.06|     44.96|   01|2012|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|             53.96|     42.08|   01|2012|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.019999999999996|     37.04|   01|2012|
+----------+-------------+--------+--------+----+-------+------------------+----------+-----+----+
only showing top 5 rows



Month with most rain on average

In [23]:
weather.groupby('month','year').agg(sum('precipitation').alias('total_monthly_precipitation')).groupby('month').agg(mean('total_monthly_precipitation').alias('avg_monthly_precipitation')).sort(desc('avg_monthly_precipitation')).show(1)


[Stage 25:>                                                         (0 + 8) / 8]

+-----+-------------------------+
|month|avg_monthly_precipitation|
+-----+-------------------------+
|   11|                  160.625|
+-----+-------------------------+
only showing top 1 row



                                                                                

Windiest year (by average wind)

In [24]:
(
    weather.groupby('year')
    .agg(avg('wind')
    .alias('average_wind'))
    .sort(desc('average_wind'))
    .show(1)
)

[Stage 31:>                                                         (0 + 8) / 8]

+----+-----------------+
|year|     average_wind|
+----+-----------------+
|2012|3.400819672131148|
+----+-----------------+
only showing top 1 row



                                                                                

Most frequent type of weather in January

In [25]:
(
    weather.filter((month == '01'))
    .groupby('weather')
    .count()
    .sort(desc('count'))
    .show(1)
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
+-------+-----+
only showing top 1 row



Average high and low temp on sunny days in July in 2013 and 2014

In [26]:
weather.filter((weather.weather == 'sun')&(month == '07')&(year == '2013')|(year == '2014')).select(
    avg('temp_max_f'),
    avg('temp_min_f')
).show()

[Stage 37:>                                                         (0 + 8) / 8]

+-----------------+-----------------+
|  avg(temp_max_f)|  avg(temp_min_f)|
+-----------------+-----------------+
|63.78147959183674|48.25188775510203|
+-----------------+-----------------+



                                                                                

In [27]:
weather = weather.withColumn('quarter',quarter(weather.date))

In [28]:
subset = weather.filter((weather.quarter == '3')&(weather.year == '2015'))#.groupby('weather').agg(
    #(count('weather')).alias('weather_count'))

In [29]:
subset.groupby('weather').agg(
    (
        round(count('weather') / subset.count(), 2)
    ).alias('ratio')
).filter(subset.weather == 'rain').show()

                                                                                

+-------+-----+
|weather|ratio|
+-------+-----+
|   rain| 0.02|
+-------+-----+



For each year, find what percentage of days it rained (had non-zero precipitation).

In [30]:
# Create a column for whether there is non zero precipitation (1) or not (0)
weather = weather.withColumn('non_zero_precip', when(weather.precipitation>0,1).otherwise(0))

In [31]:
weather.show(5)

+----------+-------------+--------+--------+----+-------+------------------+----------+-----+----+-------+---------------+
|      date|precipitation|temp_max|temp_min|wind|weather|        temp_max_f|temp_min_f|month|year|quarter|non_zero_precip|
+----------+-------------+--------+--------+----+-------+------------------+----------+-----+----+-------+---------------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.040000000000006|      41.0|   01|2012|      1|              0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|             51.08|     37.04|   01|2012|      1|              1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|             53.06|     44.96|   01|2012|      1|              1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|             53.96|     42.08|   01|2012|      1|              1|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.019999999999996|     37.04|   01|2012|      1|              1|
+----------+----

In [32]:
weather.groupby('year').agg(
    (
        round(sum('non_zero_precip') / count('non_zero_precip'), 2)
    ).alias('ratio')
).show()

[Stage 47:>                                                         (0 + 8) / 8]

+----+-----+
|year|ratio|
+----+-----+
|2012| 0.48|
|2013| 0.42|
|2014| 0.41|
|2015| 0.39|
+----+-----+



                                                                                