In [40]:
import env
import pandas as pd
import pyspark

from pyspark.sql.functions import concat, sum, avg, min, max, count, mean, lit, when, asc, desc, expr

spark = pyspark.sql.SparkSession.builder.getOrCreate()

1. Create a spark data frame that contains your favorite programming languages.
* The name of the column should be language

In [14]:
languages_pandas_df = pd.DataFrame(["java", "python", "javascript", "html", "css", "sql"])
languages_pandas_df.columns = ['language']
languages_pandas_df

Unnamed: 0,language
0,java
1,python
2,javascript
3,html
4,css
5,sql


In [16]:
langauges_df = spark.createDataFrame(languages_pandas_df)

In [16]:
langauges_df.select("*").show()

+----------+
|  language|
+----------+
|      java|
|    python|
|javascript|
|      html|
|       css|
|       sql|
+----------+



* View the schema of the dataframe

In [17]:
langauges_df.printSchema()

root
 |-- language: string (nullable = true)



* Output the shape of the dataframe

In [13]:
langauges_df.select(langauges_df.language).describe().show()

+-------+--------+
|summary|language|
+-------+--------+
|  count|       6|
|   mean|    null|
| stddev|    null|
|    min|     css|
|    max|     sql|
+-------+--------+



* Show the first 5 records in the dataframe

In [19]:
langauges_df.select("*").show(5)

+----------+
|  language|
+----------+
|      java|
|    python|
|javascript|
|      html|
|       css|
+----------+
only showing top 5 rows



2. Load the mpg dataset as a spark dataframe.

In [20]:
from pydataset import data

In [21]:
mpg_pandas_df = data('mpg')

In [22]:
mpg_df = spark.createDataFrame(mpg_pandas_df)
mpg_df.select("*").show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



* Create 1 column of output that contains a message for each vehicle like the one below:
> `The 1999 audi a4 has a 4 cylinder engine.`

In [39]:
mpg_df = mpg_df.select( "*",
    concat(
        lit("The "), mpg_df.year, lit(" "), mpg_df.manufacturer, lit(" "), mpg_df.model, lit(" has a "), mpg_df.cyl, lit(" cylinder engine.")
    ).alias("engine_description")
)

mpg_df.show(5, truncate=False)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+-----------------------------------------+
|manufacturer|model|displ|year|cyl|trans     |drv|cty|hwy|fl |class  |engine_description                       |
+------------+-----+-----+----+---+----------+---+---+---+---+-------+-----------------------------------------+
|audi        |a4   |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The 1999 audi a4 has a 4 cylinder engine.|
|audi        |a4   |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The 1999 audi a4 has a 4 cylinder engine.|
|audi        |a4   |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The 2008 audi a4 has a 4 cylinder engine.|
|audi        |a4   |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The 2008 audi a4 has a 4 cylinder engine.|
|audi        |a4   |2.8  |1999|6  |auto(l5)  |f  |16 |26 |p  |compact|The 1999 audi a4 has a 6 cylinder engine.|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+--------------------------

* Transform the trans column so that it only contains either manual or auto.

In [50]:
mpg_df.select(when(expr("trans LIKE 'auto%'"), "auto").otherwise("manual").alias("trans")).show()

+------+
| trans|
+------+
|  auto|
|manual|
|manual|
|  auto|
|  auto|
|manual|
|  auto|
|manual|
|  auto|
|manual|
|  auto|
|  auto|
|manual|
|  auto|
|manual|
|  auto|
|  auto|
|  auto|
|  auto|
|  auto|
+------+
only showing top 20 rows



3. Load the tips dataset as a spark dataframe.

In [51]:
tips_pandas_df = data("tips")
tips_df = spark.createDataFrame(tips_pandas_df)

* What percentage of observations are smokers?

In [70]:
print(f'{round((tips_df.where(tips_df.smoker == "Yes").count() / tips_df.count()) * 100, 2)}% are smokers')

38.11% are smokers


* Create a column that contains the tip percentage

In [78]:
tips_df = tips_df.select("*",
    (tips_df.tip / tips_df.total_bill).alias('tip_pct')
)

tips_df.select("*").show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

* Calculate the average tip percentage for each combination of sex and smoker.

In [85]:
tips_df.groupBy('sex', 'smoker').mean('tip_pct').alias('avg_tip_pct').show()

+------+------+-------------------+
|   sex|smoker|       avg(tip_pct)|
+------+------+-------------------+
|  Male|    No| 0.1606687151291298|
|  Male|   Yes| 0.1527711752024851|
|Female|    No| 0.1569209707691836|
|Female|   Yes|0.18215035269941035|
+------+------+-------------------+



4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [91]:
from vega_datasets import data as vdata

weather_pandas_df = vdata.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather_df = spark.createDataFrame(weather_pandas_df)

* Convert the temperatures to fahrenheight.

In [96]:
weather_df = weather_df.select("*",
    (weather_df.temp_max * 1.8 + 32).alias('temp_max_F'),
    (weather_df.temp_min * 1.8 + 32).alias('temp_min_F')
)

weather_df.show()

+----------+-------------+--------+--------+----+-------+------------------+------------------+
|      date|precipitation|temp_max|temp_min|wind|weather|        temp_max_F|        temp_min_F|
+----------+-------------+--------+--------+----+-------+------------------+------------------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.040000000000006|              41.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|             51.08|             37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|             53.06|             44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|             53.96|             42.08|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.019999999999996|             37.04|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|             39.92|             35.96|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|             44.96|             37.04|
|2012-01-08|          0.0|    10.0|     

* Which month has the most rain, on average?

In [103]:
from pyspark.sql.functions import month, year, quarter

weather_df = weather_df.withColumn('month', month('date'))
weather_df.show()

+----------+-------------+--------+--------+----+-------+------------------+------------------+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|        temp_max_F|        temp_min_F|month|
+----------+-------------+--------+--------+----+-------+------------------+------------------+-----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.040000000000006|              41.0|    1|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|             51.08|             37.04|    1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|             53.06|             44.96|    1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|             53.96|             42.08|    1|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.019999999999996|             37.04|    1|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|             39.92|             35.96|    1|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|             44.96|      

In [116]:
weather_df.groupby('month').avg('precipitation').sort(desc('avg(precipitation)')).show()

+-----+-------------------+
|month| avg(precipitation)|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548388|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



November is the rainiest month on average.

* Which year was the windiest?

In [118]:
weather_df = weather_df.withColumn('year', year('date'))

In [126]:
weather_df.groupby('year').avg('wind').sort(desc('avg(wind)')).show()

+----+------------------+
|year|         avg(wind)|
+----+------------------+
|2012| 3.400819672131147|
|2014|3.3876712328767136|
|2015|  3.15972602739726|
|2013|3.0158904109589044|
+----+------------------+



2012 was the windiest year on average

* What is the most frequent type of weather in January?

In [131]:
weather_df.where(weather_df.month == 1).crosstab('month', 'weather').show()

+-------------+-------+---+----+----+---+
|month_weather|drizzle|fog|rain|snow|sun|
+-------------+-------+---+----+----+---+
|            1|     10| 38|  35|   8| 33|
+-------------+-------+---+----+----+---+



Fog is the most frequent weather in January

* What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [141]:
sunny_july_13_14_df = weather_df\
    .where(weather_df.weather == "sun")\
    .where(weather_df.month == 7)\
    .where((weather_df.year == 2013) | (weather_df.year == 2014))

sunny_july_13_14_df.show()

+----------+-------------+--------+--------+----+-------+-----------------+------------------+-----+----+
|      date|precipitation|temp_max|temp_min|wind|weather|       temp_max_F|        temp_min_F|month|year|
+----------+-------------+--------+--------+----+-------+-----------------+------------------+-----+----+
|2013-07-01|          0.0|    31.7|    18.3| 2.3|    sun|            89.06|             64.94|    7|2013|
|2013-07-02|          0.0|    28.3|    15.6| 3.0|    sun|            82.94|             60.08|    7|2013|
|2013-07-03|          0.0|    26.1|    16.7| 3.2|    sun|            78.98|             62.06|    7|2013|
|2013-07-05|          0.0|    23.3|    13.9| 2.6|    sun|            73.94|57.019999999999996|    7|2013|
|2013-07-06|          0.0|    26.1|    13.3| 2.2|    sun|            78.98|             55.94|    7|2013|
|2013-07-07|          0.0|    23.9|    13.9| 2.9|    sun|            75.02|57.019999999999996|    7|2013|
|2013-07-08|          0.0|    26.7|    13.3| 2

In [143]:
sunny_july_13_14_df.select(avg('temp_max_F').alias('avg_max_temp_F'), avg('temp_min_F').alias('avg_min_temp_F')).show()

+-----------------+-----------------+
|   avg_max_temp_F|   avg_min_temp_F|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



* What percentage of days were rainy in q3 of 2015?

In [145]:
q3_2015_df = weather_df.where(weather_df.year == 2015).where((weather_df.month == 7) | (weather_df.month == 8) | (weather_df.month == 9))

In [147]:
q3_2015_df.where(q3_2015_df.weather == "rain").count() / q3_2015_df.count()

0.021739130434782608

* For each year, find what percentage of days it rained (had non-zero precipitation).

In [168]:
days_of_rain_df = weather_df.where(weather_df.precipitation > 0).groupby('year').count()

In [174]:
days_of_rain_df.select((days_of_rain_df['count'] / 365).alias('pct_year_with_rain')).show()

+-------------------+
| pct_year_with_rain|
+-------------------+
|0.39452054794520547|
|0.41643835616438357|
|  0.410958904109589|
| 0.4849315068493151|
+-------------------+

