Create a jupyter notebook or python script named spark101 for this exercise.

In [1]:
import pyspark
import pandas as pd
import numpy as np



from pyspark.sql.functions import col, expr, concat, sum, avg, min, max, count, mean, round 
from pyspark.sql.functions import lit, regexp_extract, regexp_replace, when, asc, desc, month, year, quarter
pd.set_option('display.max_colwidth', 10000)

spark = pyspark.sql.SparkSession.builder.getOrCreate()

Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language

In [2]:
# Create Dataframe
lang_df = pd.DataFrame([['yes', 'no', 'yes', 'no', 'no', 'yes'], 
                        ['yes', 'no', 'yes', 'yes', 'no', 'yes'],
                       ], 
                        index = ['csv', 'json'], 
                        columns = ['python', 'javascript', 'c#', 'java', 'css', 'html'])
lang_df

Unnamed: 0,python,javascript,c#,java,css,html
csv,yes,no,yes,no,no,yes
json,yes,no,yes,yes,no,yes


In [3]:
# Convert to spark
df = spark.createDataFrame(lang_df)
df.show()

+------+----------+---+----+---+----+
|python|javascript| c#|java|css|html|
+------+----------+---+----+---+----+
|   yes|        no|yes|  no| no| yes|
|   yes|        no|yes| yes| no| yes|
+------+----------+---+----+---+----+



- View the schema of the dataframe

In [4]:
df.printSchema()

root
 |-- python: string (nullable = true)
 |-- javascript: string (nullable = true)
 |-- c#: string (nullable = true)
 |-- java: string (nullable = true)
 |-- css: string (nullable = true)
 |-- html: string (nullable = true)



- Output the shape of the dataframe

In [5]:
print((df.count(), len(df.columns)))

(2, 6)


- Show the first 5 records in the dataframe

In [6]:
df.show(5)

+------+----------+---+----+---+----+
|python|javascript| c#|java|css|html|
+------+----------+---+----+---+----+
|   yes|        no|yes|  no| no| yes|
|   yes|        no|yes| yes| no| yes|
+------+----------+---+----+---+----+



Load the mpg dataset as a spark dataframe.

In [7]:
from pydataset import data
mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



- Create 1 column of output that contains a message like the one below:
        The 1999 audi a4 has a 4 cylinder engine.

In [8]:
mpg.select(concat(lit('The '), mpg.year, lit(' '), mpg['manufacturer'], lit(' '), mpg.model, lit(' has a '), mpg.cyl, lit(" cylinder"), lit(' engine')).alias("Discription")).show(30, False)

+-------------------------------------------------------------+
|Discription                                                  |
+-------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine                     |
|The 1999 audi a4 has a 4 cylinder engine                     |
|The 2008 audi a4 has a 4 cylinder engine                     |
|The 2008 audi a4 has a 4 cylinder engine                     |
|The 1999 audi a4 has a 6 cylinder engine                     |
|The 1999 audi a4 has a 6 cylinder engine                     |
|The 2008 audi a4 has a 6 cylinder engine                     |
|The 1999 audi a4 quattro has a 4 cylinder engine             |
|The 1999 audi a4 quattro has a 4 cylinder engine             |
|The 2008 audi a4 quattro has a 4 cylinder engine             |
|The 2008 audi a4 quattro has a 4 cylinder engine             |
|The 1999 audi a4 quattro has a 6 cylinder engine             |
|The 1999 audi a4 quattro has a 6 cylind

- For each vehicle:
    - Transform the trans column so that it only contains either manual or auto.

In [9]:
mpg = mpg.withColumn("trans", when(mpg.trans.startswith("a"), "auto").otherwise("manual")).show(5)

+------------+-----+-----+----+---+------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl| trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+------+---+---+---+---+-------+
only showing top 5 rows



Load the tips dataset as a spark dataframe.

In [10]:
tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



- What percentage of observations are smokers?

In [11]:
tips.groupby(tips.smoker).agg(count(tips.smoker)).show()


tips.groupBy('smoker').agg(round(count(tips.smoker)/ tips.count(),2)).show()

+------+-------------+
|smoker|count(smoker)|
+------+-------------+
|    No|          151|
|   Yes|           93|
+------+-------------+

+------+-------------------------------+
|smoker|round((count(smoker) / 244), 2)|
+------+-------------------------------+
|    No|                           0.62|
|   Yes|                           0.38|
+------+-------------------------------+



- Create a column that contains the tip percentage

In [13]:
tips = tips.withColumn('tip_percentage', expr('Round((tip/total_bill) * 100)'))
tips.show()

+----------+----+------+------+---+------+----+--------------+
|total_bill| tip|   sex|smoker|day|  time|size|tip_percentage|
+----------+----+------+------+---+------+----+--------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|           6.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|          16.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|          17.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|          14.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|          15.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|          19.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|          23.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|          12.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|          13.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|          22.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|          17.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|          14.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        

- Calculate the average tip percentage for each combination of sex and smoker.

In [14]:
tips.groupBy(tips.smoker, tips.sex).agg(round(avg(tips.tip/tips.total_bill * 100), 2)).show()

+------+------+-----------------------------------------+
|smoker|   sex|round(avg(((tip / total_bill) * 100)), 2)|
+------+------+-----------------------------------------+
|    No|Female|                                    15.69|
|    No|  Male|                                    16.07|
|   Yes|  Male|                                    15.28|
|   Yes|Female|                                    18.22|
+------+------+-----------------------------------------+



Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [15]:
from vega_datasets import data
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



- Convert the temperatures to farenheight.

In [16]:
weather = weather.withColumn("f_temp_max", round(weather.temp_max*9/5 + 32, 1))
weater = weather.withColumn("f_temp_min", round(weather.temp_min*9/5 + 32, 1))
weather.show()

+----------+-------------+--------+--------+----+-------+----------+
|      date|precipitation|temp_max|temp_min|wind|weather|f_temp_max|
+----------+-------------+--------+--------+----+-------+----------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|      55.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|      51.1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|      53.1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|      54.0|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|      48.0|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|      39.9|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|      45.0|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|      50.0|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|      48.9|
|2012-01-10|          1.0|     6.1|     0.6| 3.4|   rain|      43.0|
|2012-01-11|          0.0|     6.1|    -1.1| 5.1|    sun|      43.0|
|2012-01-12|          0.0|     6.1

- Which month has the most rain, on average?

In [17]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12|             622.7|
+-----+------------------+



- Which year was the windiest?

In [26]:

(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(avg("wind").alias("windiest_year"))
    .sort("windiest_year", ascending=False)
).show(1)

+----+-----------------+
|year|    windiest_year|
+----+-----------------+
|2012|3.400819672131147|
+----+-----------------+
only showing top 1 row



- What is the most frequent type of weather in January?

In [19]:
(
    weather.filter(month("date") == 1)
    .groupBy("weather")
    .agg(count("weather"))
    .show()
)

+-------+--------------+
|weather|count(weather)|
+-------+--------------+
|    fog|            38|
|drizzle|            10|
|   rain|            35|
|    sun|            33|
|   snow|             8|
+-------+--------------+



- What is the average high and low temperature on sunny days in July in 2013 and 2014?


In [20]:
df= (
    weather.filter(year("date") == 2015)
    .filter(weather.precipitation > 0)
    .withColumn('quarter', quarter('date'))
    .where("quarter ==3")
)
df.show()

+----------+-------------+--------+--------+----+-------+----------+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|f_temp_max|quarter|
+----------+-------------+--------+--------+----+-------+----------+-------+
|2015-07-24|          0.3|    22.8|    13.3| 3.8|    fog|      73.0|      3|
|2015-07-26|          2.0|    22.2|    13.9| 2.6|    fog|      72.0|      3|
|2015-08-12|          7.6|    28.3|    16.7| 2.7|   rain|      82.9|      3|
|2015-08-14|         30.5|    18.3|    15.0| 5.2|   rain|      64.9|      3|
|2015-08-20|          2.0|    22.8|    14.4| 4.2|    fog|      73.0|      3|
|2015-08-28|          0.5|    23.3|    15.6| 2.6|    fog|      73.9|      3|
|2015-08-29|         32.5|    22.2|    13.3| 5.8|    fog|      72.0|      3|
|2015-08-30|         10.2|    20.0|    12.8| 4.7|    fog|      68.0|      3|
|2015-09-01|          5.8|    19.4|    13.9| 5.0|    fog|      66.9|      3|
|2015-09-05|          0.3|    20.6|     8.9| 3.5|    sun|      69.1|      3|

- What percentage of days were rainy in q3 of 2015?

In [21]:
(weather.withColumn("quarter", quarter("date"))).filter(quarter('date') == '3').filter(year('date') == 2015)\
.select(when(col('weather') == 'rain', 1)\
    .otherwise(0).alias('rain')).agg(mean('rain')).show()

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



- For each year, find what percentage of days it rained (had non-zero precipitation).

In [30]:
(weather.withColumn('year', year('date'))).select('year',when(col('precipitation') > 0, 1)\
    .otherwise(0).alias('rain')).groupBy('year')\
        .agg(mean('rain')*100).show()

+----+-----------------+
|year|(avg(rain) * 100)|
+----+-----------------+
|2015|39.45205479452055|
|2013|41.64383561643836|
|2014| 41.0958904109589|
|2012|48.36065573770492|
+----+-----------------+

