In [27]:
import pandas as pd
import numpy as np
import pyspark
import pyspark.sql.functions as F
spark = pyspark.sql.SparkSession.builder.getOrCreate()

#### 1. Create a spark data frame that contains your favorite programming languages.

In [2]:
sdf = spark.createDataFrame(pd.DataFrame({'language': ['python', 
                                                       'sql', 
                                                       'javascript', 
                                                       'C++', 
                                                       'java']}))

#### view the schma of the dataframe

In [3]:
sdf.schema

StructType(List(StructField(language,StringType,true)))

#### output the shape of the dataframe

In [28]:
sdf.count(), len(sdf.columns)

(1461, 8)

#### Show the first 5 records in the dataframe

In [5]:
sdf.show(5)

+----------+
|  language|
+----------+
|    python|
|       sql|
|javascript|
|       C++|
|      java|
+----------+



#### 2. Load the mpg dataset as a spark dataframe

In [6]:
from pydataset import data
sdf = spark.createDataFrame(data('mpg'))
sdf.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



#### 2a. Create 1 column of output that contains a message like the one below
> The 1999 audi a4 has a 4 cylinder engine

In [7]:
sdf.message = F.concat(F.lit('The '), 
                       sdf.year, 
                       F.lit(' '), 
                       sdf.model, 
                       F.lit(' has a '),
                       sdf.cyl, 
                       F.lit(' cylinder engine.'))
sdf.select(sdf.message).show(10, truncate=False)

+-------------------------------------------------------------+
|concat(The , year,  , model,  has a , cyl,  cylinder engine.)|
+-------------------------------------------------------------+
|The 1999 a4 has a 4 cylinder engine.                         |
|The 1999 a4 has a 4 cylinder engine.                         |
|The 2008 a4 has a 4 cylinder engine.                         |
|The 2008 a4 has a 4 cylinder engine.                         |
|The 1999 a4 has a 6 cylinder engine.                         |
|The 1999 a4 has a 6 cylinder engine.                         |
|The 2008 a4 has a 6 cylinder engine.                         |
|The 1999 a4 quattro has a 4 cylinder engine.                 |
|The 1999 a4 quattro has a 4 cylinder engine.                 |
|The 2008 a4 quattro has a 4 cylinder engine.                 |
+-------------------------------------------------------------+
only showing top 10 rows



#### 2b. Transform the trans column so that it only contains either manual or auto.


In [8]:
sdf.select(F.regexp_extract('trans', r'(\w+)', 1)).show()

+-------------------------------+
|regexp_extract(trans, (\w+), 1)|
+-------------------------------+
|                           auto|
|                         manual|
|                         manual|
|                           auto|
|                           auto|
|                         manual|
|                           auto|
|                         manual|
|                           auto|
|                         manual|
|                           auto|
|                           auto|
|                         manual|
|                           auto|
|                         manual|
|                           auto|
|                           auto|
|                           auto|
|                           auto|
|                           auto|
+-------------------------------+
only showing top 20 rows



## 3. Load the tips dataset as a spark dataframe

In [9]:
sdf = spark.createDataFrame(data('tips'))
sdf.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



#### 3a. What percentage of observations are smokers?

In [10]:
smoker_num = F.when(sdf.smoker == 'Yes', 1).otherwise(0)
sdf.select(F.mean(smoker_num)).show()

+-----------------------------------------------+
|avg(CASE WHEN (smoker = Yes) THEN 1 ELSE 0 END)|
+-----------------------------------------------+
|                            0.38114754098360654|
+-----------------------------------------------+



#### 3b. Create a column that contains the tip percentage

In [11]:
tip_pct = sdf.tip / sdf.total_bill
sdf.select(tip_pct).show(3)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
+-------------------+
only showing top 3 rows



#### 3c. Calculate the average tip percentage for each combination of sex and smoker.

In [29]:
sdf.groupby('sex', 'smoker').agg(F.mean(tip_pct)).show()

AnalysisException: cannot resolve 'sex' given input columns: [date, month, precipitation, temp_max, temp_min, weather, wind, year];
'Aggregate ['sex, 'smoker], ['sex, 'smoker, avg((tip#95 / total_bill#94)) AS avg((tip / total_bill))#536]
+- Project [date#189, precipitation#190, temp_max#226, temp_min#233, wind#193, weather#194, month#265, substring(date#189, 1, 4) AS year#370]
   +- Project [date#189, precipitation#190, temp_max#226, temp_min#233, wind#193, weather#194, substring(date#189, 6, 2) AS month#265]
      +- Project [date#189, precipitation#190, temp_max#226, ((temp_min#192 * 1.8) + cast(32 as double)) AS temp_min#233, wind#193, weather#194]
         +- Project [date#189, precipitation#190, ((temp_max#191 * 1.8) + cast(32 as double)) AS temp_max#226, temp_min#192, wind#193, weather#194]
            +- LogicalRDD [date#189, precipitation#190, temp_max#191, temp_min#192, wind#193, weather#194], false


## 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [13]:
from vega_datasets import data
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
sdf = spark.createDataFrame(weather)
sdf.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



#### 4a. Convert the temperatures to fahrenheit.

(0°C × 9/5) + 32 = 32°F

In [14]:
sdf = sdf.withColumn('temp_max', sdf.temp_max * (9/5) + 32)
sdf = sdf.withColumn('temp_min', sdf.temp_min * (9/5) + 32)
sdf.show(5)

+----------+-------------+------------------+--------+----+-------+
|      date|precipitation|          temp_max|temp_min|wind|weather|
+----------+-------------+------------------+--------+----+-------+
|2012-01-01|          0.0|55.040000000000006|    41.0| 4.7|drizzle|
|2012-01-02|         10.9|             51.08|   37.04| 4.5|   rain|
|2012-01-03|          0.8|             53.06|   44.96| 2.3|   rain|
|2012-01-04|         20.3|             53.96|   42.08| 4.7|   rain|
|2012-01-05|          1.3|48.019999999999996|   37.04| 6.1|   rain|
+----------+-------------+------------------+--------+----+-------+
only showing top 5 rows



#### 4b. Which month has the most rain, on average?

In [None]:
sdf = sdf.withColumn('month', sdf.date.substr(6, 2))
rainy_days = sdf.filter((sdf.weather == 'rain') | (sdf.weather == 'drizzle'))
rain_by_month = rainy_days.groupby('month').agg(F.mean('precipitation'))
rain_by_month = rain_by_month.withColumnRenamed('avg(precipitation)', 'avg_rain')
rain_by_month.show()

In [None]:
max_rain = rain_by_month.select(F.max('avg_rain')).head()[0]
rain_by_month.filter(rain_by_month.avg_rain == max_rain).show()

In [None]:
rain_by_month.filter(rain_by_month.avg_rain == max_rain).select('month').head()[0]

In [18]:
rain_by_month.createOrReplaceTempView('rain_by_month')
spark.sql('''SELECT month, avg_rain
               FROM rain_by_month
               WHERE avg_rain = (SELECT MAX(avg_rain)
                                   FROM rain_by_month)''').show()

+-----+--------+
|month|avg_rain|
+-----+--------+
|   10|  8.0625|
+-----+--------+



#### 4c. Which year was the windiest?

In [19]:
sdf = sdf.withColumn('year', sdf.date.substr(1, 4))

In [20]:
wind_by_year = sdf.groupby('year').agg(F.mean('wind'))
wind_by_year = wind_by_year.withColumnRenamed('avg(wind)', 'avg_wind')
wind_by_year.show()

+----+------------------+
|year|          avg_wind|
+----+------------------+
|2012| 3.400819672131148|
|2013|3.0158904109589058|
|2014| 3.387671232876714|
|2015| 3.159726027397261|
+----+------------------+



In [21]:
max_wind = wind_by_year.select(F.max('avg_wind')).head()[0]
wind_by_year.filter(wind_by_year.avg_wind == max_wind).show()

+----+-----------------+
|year|         avg_wind|
+----+-----------------+
|2012|3.400819672131148|
+----+-----------------+



In [22]:
wind_by_year.filter(wind_by_year.avg_wind == max_wind).select('year').head()[0]

'2012'

#### 4d. What is the most frequent type of weather in January?

In [30]:
jan = sdf.filter(sdf.month == '01').groupby('weather').count()
jan = jan.withColumnRenamed('count', 'n_days')
jan.show()

+-------+------+
|weather|n_days|
+-------+------+
|drizzle|    10|
|   rain|    35|
|    sun|    33|
|   snow|     8|
|    fog|    38|
+-------+------+



In [31]:
max_days = jan.select(F.max('n_days')).head()[0]
jan.filter(jan.n_days == max_days).show()

+-------+------+
|weather|n_days|
+-------+------+
|    fog|    38|
+-------+------+



In [32]:
jan.filter(jan.n_days == max_days).select('weather').head()[0]

'fog'

#### 4e. What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [48]:
sun_13_14 = sdf.filter(((sdf.year == 2013) | (sdf.year == 2014)) & (sdf.weather == 'sun'))
sun_13_14.select(F.mean('temp_max')).show()

+-----------------+
|    avg(temp_max)|
+-----------------+
|65.37874999999998|
+-----------------+



In [50]:
sun_13_14.select(F.mean('temp_min')).show()

+-----------------+
|    avg(temp_min)|
+-----------------+
|48.28913461538461|
+-----------------+



#### 4f. What percentage of days were rainy in q3 of 2015?

In [60]:
sdf = sdf.withColumn('quarter', F.when(sdf.month.isin(['01', '02', '03']), 1)
                                 .when(sdf.month.isin(['04', '05', '06']), 2)
                                 .when(sdf.month.isin(['07', '08', '09']), 3)
                                 .otherwise(4))

In [74]:
q3_15 = sdf.filter((sdf.quarter == 3) & (sdf.year == '2015'))
q3_15 = q3_15.withColumn('rainy', F.when(q3_15.weather == 'rain', 1)
                                   .otherwise(0))
q3_15.select(F.mean(q3_15.rainy)).show()

+--------------------+
|          avg(rainy)|
+--------------------+
|0.021739130434782608|
+--------------------+



#### 4g. For each year, find what percentage of days it rained (had non-zero precipitation).

In [None]:
sdf.withColumn('had_precip', F.when(sdf.precipitation > 0, 1)
                              .otherwise(0))