In [27]:
# Imports needed

import pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from pydataset import data
from vega_datasets import data

from pyspark.sql.functions import col, expr, concat, sum, avg, min, max, count, mean, round 
from pyspark.sql.functions import lit, regexp_extract, regexp_replace, when, asc, desc, month, year, quarter

In [None]:
# Start a spark session

spark = pyspark.sql.SparkSession.builder.getOrCreate()

### 1. Create a spark data frame that contains your favorite programming languages.
- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
# Create pandas dataframe by columns using a dictionary-like object

pd_df = pd.DataFrame({'language': ['Python', 'JavaScript', 'HTML', 'Java', 'C']})
# convert the pandas df to a Spark df
sp_df = spark.createDataFrame(pd_df)

In [3]:
# .printSchema()
sp_df.printSchema()

root
 |-- language: string (nullable = true)



In [47]:
sp_df.describe().show()

+-------+--------+
|summary|language|
+-------+--------+
|  count|       5|
|   mean|    null|
| stddev|    null|
|    min|       C|
|    max|  Python|
+-------+--------+



In [5]:
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

In [6]:
# First 5 records
sp_df.show()

+----------+
|  language|
+----------+
|    Python|
|JavaScript|
|      HTML|
|      Java|
|         C|
+----------+



### 2. Load the mpg dataset as a spark dataframe.

- Create 1 column of output that contains a message like the one below:
- The 1999 audi a4 has a 4 cylinder engine.
- For each vehicle.
- Transform the trans column so that it only contains either manual or auto.

In [7]:
# Create pandas df

mpg_pd = data("mpg")
mpg_pd.head(5)

Unnamed: 0,manufacturer,model,displ,year,cyl,trans,drv,cty,hwy,fl,class
1,audi,a4,1.8,1999,4,auto(l5),f,18,29,p,compact
2,audi,a4,1.8,1999,4,manual(m5),f,21,29,p,compact
3,audi,a4,2.0,2008,4,manual(m6),f,20,31,p,compact
4,audi,a4,2.0,2008,4,auto(av),f,21,30,p,compact
5,audi,a4,2.8,1999,6,auto(l5),f,16,26,p,compact


In [8]:
# Transform to spark df

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [9]:
# The 1999 audi a4 has a 4 cylinder engine
mpg.select(concat(lit("The "), mpg.year, mpg.manufacturer, mpg.model, 
                  lit("has a "), mpg.cyl, lit(" cylinder engine")).alias("Description")).show(10, truncate=False)

+---------------------------------------------+
|Description                                  |
+---------------------------------------------+
|The 1999audia4has a 4 cylinder engine        |
|The 1999audia4has a 4 cylinder engine        |
|The 2008audia4has a 4 cylinder engine        |
|The 2008audia4has a 4 cylinder engine        |
|The 1999audia4has a 6 cylinder engine        |
|The 1999audia4has a 6 cylinder engine        |
|The 2008audia4has a 6 cylinder engine        |
|The 1999audia4 quattrohas a 4 cylinder engine|
|The 1999audia4 quattrohas a 4 cylinder engine|
|The 2008audia4 quattrohas a 4 cylinder engine|
+---------------------------------------------+
only showing top 10 rows



In [10]:
from pyspark.sql.functions import when

# Transform the trans column so that it only contains either manual or auto.
mpg.select(mpg.trans, when(mpg.trans.startswith("a"), "auto").otherwise("manual")).show(10)

+----------+--------------------------------------------------------+
|     trans|CASE WHEN startswith(trans, a) THEN auto ELSE manual END|
+----------+--------------------------------------------------------+
|  auto(l5)|                                                    auto|
|manual(m5)|                                                  manual|
|manual(m6)|                                                  manual|
|  auto(av)|                                                    auto|
|  auto(l5)|                                                    auto|
|manual(m5)|                                                  manual|
|  auto(av)|                                                    auto|
|manual(m5)|                                                  manual|
|  auto(l5)|                                                    auto|
|manual(m6)|                                                  manual|
+----------+--------------------------------------------------------+
only showing top 10 

In [11]:
# Df "trans" column containing only manual or auto
mpg = mpg.withColumn("trans", when(mpg.trans.startswith("a"), "auto").otherwise("manual")).show(10)

+------------+----------+-----+----+---+------+---+---+---+---+-------+
|manufacturer|     model|displ|year|cyl| trans|drv|cty|hwy| fl|  class|
+------------+----------+-----+----+---+------+---+---+---+---+-------+
|        audi|        a4|  1.8|1999|  4|  auto|  f| 18| 29|  p|compact|
|        audi|        a4|  1.8|1999|  4|manual|  f| 21| 29|  p|compact|
|        audi|        a4|  2.0|2008|  4|manual|  f| 20| 31|  p|compact|
|        audi|        a4|  2.0|2008|  4|  auto|  f| 21| 30|  p|compact|
|        audi|        a4|  2.8|1999|  6|  auto|  f| 16| 26|  p|compact|
|        audi|        a4|  2.8|1999|  6|manual|  f| 18| 26|  p|compact|
|        audi|        a4|  3.1|2008|  6|  auto|  f| 18| 27|  p|compact|
|        audi|a4 quattro|  1.8|1999|  4|manual|  4| 18| 26|  p|compact|
|        audi|a4 quattro|  1.8|1999|  4|  auto|  4| 16| 25|  p|compact|
|        audi|a4 quattro|  2.0|2008|  4|manual|  4| 20| 28|  p|compact|
+------------+----------+-----+----+---+------+---+---+---+---+-

### 3. Load the tips dataset as a spark dataframe.
- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker

In [12]:
# Create sparks df
tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [13]:

print(tips.count(), "rows", len(tips.columns), "columns")

244 rows 7 columns


In [14]:
# What percentage of observations are smokers?
tips.groupBy("smoker").count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



In [15]:
# What percentage of observations are smokers?
tips.filter(tips.smoker == "Yes").count()/tips.count()

0.38114754098360654

In [16]:
# Create a column that contains the tip percentage
tips = tips.withColumn('tip_percentage', (tips.tip/tips.total_bill))

In [17]:
# Calculate the average tip percentage for each combination of sex and smoker.
# tips.show()
tips.groupby("sex").pivot("smoker").mean("tip_percentage").show()

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941032|
|  Male|0.1606687151291298|0.15277117520248512|
+------+------------------+-------------------+



### 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

-  Convert the temperatures to fahrenheit.
-  Which month has the most rain, on average?
-  Which year was the windiest?
-  What is the most frequent type of weather in January?
-  What is the average high and low temperature on sunny days in July in 2013 and 2014?
-  What percentage of days were rainy in q3 of 2015?
-  For each year, find what percentage of days it rained (had non-zero precipitation).

In [44]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [34]:
# formula to convert to fahrenheit = (0°C × 9/5) + 32 = 32°F
weather = weather.withColumn('temp_max', (round(weather.temp_max * (9/5) + 32, 1)))
weather = weather.withColumn('temp_min', (round(weather.temp_min * (9/5) + 32, 1)))

In [35]:
weather.show(10)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    55.0|    41.0| 4.7|drizzle|
|2012-01-02|         10.9|    51.1|    37.0| 4.5|   rain|
|2012-01-03|          0.8|    53.1|    45.0| 2.3|   rain|
|2012-01-04|         20.3|    54.0|    42.1| 4.7|   rain|
|2012-01-05|          1.3|    48.0|    37.0| 6.1|   rain|
|2012-01-06|          2.5|    39.9|    36.0| 2.2|   rain|
|2012-01-07|          0.0|    45.0|    37.0| 2.3|   rain|
|2012-01-08|          0.0|    50.0|    37.0| 2.0|    sun|
|2012-01-09|          4.3|    48.9|    41.0| 3.4|   rain|
|2012-01-10|          1.0|    43.0|    33.1| 3.4|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 10 rows



In [36]:
# Average rain by month
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12| 622.7000000000002|
+-----+------------------+



In [37]:
# What year was the windiest
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(avg("wind").alias("year_wind"))
    .sort("year_wind", ascending=False)
).show(1)

+----+-----------------+
|year|        year_wind|
+----+-----------------+
|2012|3.400819672131148|
+----+-----------------+
only showing top 1 row



In [38]:
# The most frequent type of weather in January
(
    weather.filter(month("date") == 1)
    .groupBy("weather")
    .agg(count("weather"))
    .show()
)

+-------+--------------+
|weather|count(weather)|
+-------+--------------+
|    fog|            38|
|drizzle|            10|
|   rain|            35|
|    sun|            33|
|   snow|             8|
+-------+--------------+



In [39]:
# The average high and low temp in July of 2013 and 2014
(
    weather.filter(month("date") == 7)
    .filter(year("date") == 2013 & 2014)
    .filter(weather.weather == 'sun')
    .agg(mean("temp_min"))
    .show()
)

(
    weather.filter(month("date") == 7)
    .filter(year("date") == 2013 % 2014)
    .filter(weather.weather == 'sun')
    .agg(mean("temp_max"))
    .show()
)

+-----------------+
|    avg(temp_min)|
+-----------------+
|55.71666666666667|
+-----------------+

+-----------------+
|    avg(temp_max)|
+-----------------+
|79.85555555555555|
+-----------------+



In [41]:
# What percentage of days were rainy in q3 of 2015
df = (
    weather.filter(year("date") == 2015)
    .filter(weather.precipitation > 0)
    .withColumn('quarter', quarter('date'))
    .where("quarter ==3")
)
df.show()

+----------+-------------+--------+--------+----+-------+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|quarter|
+----------+-------------+--------+--------+----+-------+-------+
|2015-07-24|          0.3|    73.0|    55.9| 3.8|    fog|      3|
|2015-07-26|          2.0|    72.0|    57.0| 2.6|    fog|      3|
|2015-08-12|          7.6|    82.9|    62.1| 2.7|   rain|      3|
|2015-08-14|         30.5|    64.9|    59.0| 5.2|   rain|      3|
|2015-08-20|          2.0|    73.0|    57.9| 4.2|    fog|      3|
|2015-08-28|          0.5|    73.9|    60.1| 2.6|    fog|      3|
|2015-08-29|         32.5|    72.0|    55.9| 5.8|    fog|      3|
|2015-08-30|         10.2|    68.0|    55.0| 4.7|    fog|      3|
|2015-09-01|          5.8|    66.9|    57.0| 5.0|    fog|      3|
|2015-09-05|          0.3|    69.1|    48.0| 3.5|    sun|      3|
|2015-09-06|          5.3|    61.0|    53.1| 2.4|    fog|      3|
|2015-09-07|          0.3|    70.0|    55.9| 1.5|    fog|      3|
|2015-09-1

In [45]:
(weather.withColumn("quarter", quarter("date"))).filter(quarter('date') == '3').filter(year('date') == 2015)\
    .select(when(col('weather') == 'rain', 1)\
    .otherwise(0).alias('rain')).agg(mean('rain')).show()

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



In [46]:
# Find the percentage of days it rained
(weather.withColumn('year', year('date'))).select('year',when(col('precipitation') > 0, 1)\
    .otherwise(0).alias('rain')).groupBy('year')\
     .agg(mean('rain')).show()

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

