In [139]:
import pandas as pd

import pyspark
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import *
spark = pyspark.sql.SparkSession.builder.getOrCreate()

### 1. Create a spark data frame that contains your favorite programming languages.

In [2]:
df = pd.DataFrame(data=['python', 'HTML', 'Java', 'Solidity', 'R'], columns=['language'])

* The name of the column should be language

In [3]:
spark_df = spark.createDataFrame(df)
spark_df

DataFrame[language: string]

* View the schema of the dataframe

In [4]:
spark_df.printSchema()

root
 |-- language: string (nullable = true)



* Output the shape of the dataframe

In [5]:
print((spark_df.count(), len(spark_df.columns)))

(5, 1)


* Show the first 5 records in the dataframe

In [6]:
spark_df.show(5)

+--------+
|language|
+--------+
|  python|
|    HTML|
|    Java|
|Solidity|
|       R|
+--------+



### 2. Load the mpg dataset as a spark dataframe.

In [7]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

* Create 1 column of output that contains a message like the one below: The 1999 audi a4 has a 4 cylinder engine.  For each vehicle


In [8]:
from pyspark.sql.functions import lit
mpg.show(1)

+------------+-----+-----+----+---+--------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|   trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+--------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|auto(l5)|  f| 18| 29|  p|compact|
+------------+-----+-----+----+---+--------+---+---+---+---+-------+
only showing top 1 row



In [9]:
mpg2 = mpg.select('*', concat(lit('The '), mpg.year,lit(' '), mpg.manufacturer, lit(' '), mpg.model, lit(' has a '), mpg.cyl, lit('cylinder engine.')).alias('desc')).show(truncate=False)

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+-------------------------------------------------------------+
|manufacturer|model             |displ|year|cyl|trans     |drv|cty|hwy|fl |class  |desc                                                         |
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+-------------------------------------------------------------+
|audi        |a4                |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The 1999 audi a4 has a 4cylinder engine.                     |
|audi        |a4                |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The 1999 audi a4 has a 4cylinder engine.                     |
|audi        |a4                |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The 2008 audi a4 has a 4cylinder engine.                     |
|audi        |a4                |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The 2008 audi a4 has a 4cylinder engine. 

* Transform the trans column so that it only contains either manual or auto.

In [10]:
from pyspark.sql.functions import when

In [11]:
mpg.select(mpg.trans, when((mpg.trans == 'manual(m5)') | (mpg.trans == 'manual(m6)'), 'manual').otherwise("auto")).show()

+----------+----------------------------------------------------------------------------------+
|     trans|CASE WHEN ((trans = manual(m5)) OR (trans = manual(m6))) THEN manual ELSE auto END|
+----------+----------------------------------------------------------------------------------+
|  auto(l5)|                                                                              auto|
|manual(m5)|                                                                            manual|
|manual(m6)|                                                                            manual|
|  auto(av)|                                                                              auto|
|  auto(l5)|                                                                              auto|
|manual(m5)|                                                                            manual|
|  auto(av)|                                                                              auto|
|manual(m5)|                            

### Load the tips dataset as a spark dataframe

In [12]:
from pydataset import data

tips = spark.createDataFrame(data("tips"))

Create a column that contains the tip percentage. Load the tips dataset as a spark dataframe.

In [13]:
tip_pct = tips.tip / tips.total_bill

In [14]:
tips = tips.select('*', tip_pct.alias('tip_pct'))
tips.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



What percentage of observations are smokers?

In [15]:
tips.groupBy("smoker").count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



In [16]:
tips.groupBy('smoker').count().withColumn('perc', ((col('count'))/(tips.count()))).show()

+------+-----+-------------------+
|smoker|count|               perc|
+------+-----+-------------------+
|    No|  151| 0.6188524590163934|
|   Yes|   93|0.38114754098360654|
+------+-----+-------------------+



Calculate the average tip percentage for each combination of sex and smoker.

In [17]:
tips.groupBy('smoker').pivot('sex').agg(mean('tip_pct')).show()

+------+-------------------+-------------------+
|smoker|             Female|               Male|
+------+-------------------+-------------------+
|    No| 0.1569209707691836| 0.1606687151291298|
|   Yes|0.18215035269941032|0.15277117520248512|
+------+-------------------+-------------------+



### 4.Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [151]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



* Convert the temperatures to fahrenheit.

In [152]:
weather2 = weather.withColumn('temp_max', col('temp_max') * 1.8 + 32).withColumn('temp_min', col('temp_min') * 1.8 +32).show()

+----------+-------------+------------------+------------------+----+-------+
|      date|precipitation|          temp_max|          temp_min|wind|weather|
+----------+-------------+------------------+------------------+----+-------+
|2012-01-01|          0.0|55.040000000000006|              41.0| 4.7|drizzle|
|2012-01-02|         10.9|             51.08|             37.04| 4.5|   rain|
|2012-01-03|          0.8|             53.06|             44.96| 2.3|   rain|
|2012-01-04|         20.3|             53.96|             42.08| 4.7|   rain|
|2012-01-05|          1.3|48.019999999999996|             37.04| 6.1|   rain|
|2012-01-06|          2.5|             39.92|             35.96| 2.2|   rain|
|2012-01-07|          0.0|             44.96|             37.04| 2.3|   rain|
|2012-01-08|          0.0|              50.0|             37.04| 2.0|    sun|
|2012-01-09|          4.3|             48.92|              41.0| 3.4|   rain|
|2012-01-10|          1.0|42.980000000000004|             33.08|

* Which month has the most rain, on average?

In [156]:
weather.withColumn("month", month("date")).groupBy("month").agg(mean("precipitation").alias("avg_rainfall")).sort(desc("avg_rainfall")).show()

+-----+-------------------+
|month|       avg_rainfall|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548389|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



* Which year was the windiest?

In [157]:
(
weather.withColumn('year', year('date'))
.groupby('year')
.agg(sum('wind').alias('total_wind'))
.sort(col("total_wind").desc())
.show()
)

+----+------------------+
|year|        total_wind|
+----+------------------+
|2012|            1244.7|
|2014|1236.5000000000007|
|2015|1153.3000000000002|
|2013|1100.8000000000006|
+----+------------------+



* What is the most frequent type of weather in January?

In [158]:
(
weather.filter(month('date') == 1)
.groupby('weather')
.agg(count('weather'))
.sort(col("count(weather)").desc())
.show()
)

+-------+--------------+
|weather|count(weather)|
+-------+--------------+
|    fog|            38|
|   rain|            35|
|    sun|            33|
|drizzle|            10|
|   snow|             8|
+-------+--------------+



* What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [136]:
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

In [173]:
(
weather.filter(month('date')==7)
.filter(year('date')>2012)
.filter(year('date')<2015)
.filter(col('weather')==lit('sun'))
.agg(avg("temp_max").alias("average_high_temp"), avg("temp_min").alias("average_low_temp"))
.show()
)


+------------------+-----------------+
| average_high_temp| average_low_temp|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



* What percentage of days were rainy in q3 of 2015?

In [191]:
(
weather.filter(year('date')==2015)
.filter(month('date')>6)
.filter(month('date')<10)
.select(when(col("weather") == "rain", 1).otherwise(0).alias('rainy'))
.agg(mean("rainy"))
.show()
)

+--------------------+
|          avg(rainy)|
+--------------------+
|0.021739130434782608|
+--------------------+



* For each year, find what percentage of days it rained (had non-zero precipitation).

In [206]:
(
weather.withColumn('year', year('date'))
.select(when(col("precipitation") > 0, 1).otherwise(0).alias('prec'), 'year')
.groupby('year')
.agg(mean('prec'))
.sort(desc(col('avg(prec)')))
.show()
)

+----+-------------------+
|year|          avg(prec)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

