In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import numpy as np
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import lit
from pyspark.sql.functions import *

print("Intalized")

Intalized


Create a spark data frame that contains your favorite programming languages.

* The name of the column should be language
* View the schema of the dataframe
* Output the shape of the dataframe
* Show the first 5 records in the dataframe


In [2]:
favorite_languages = ['python', 'c#', 'java', 'php', 'html']
rank = [1, 5, 3, 4, 2]
df = pd.DataFrame(favorite_languages, rank).reset_index()
df.columns=['rank', 'language']
df = spark.createDataFrame(df)
df.show()

+----+--------+
|rank|language|
+----+--------+
|   1|  python|
|   5|      c#|
|   3|    java|
|   4|     php|
|   2|    html|
+----+--------+



In [3]:
df.printSchema()

root
 |-- rank: long (nullable = true)
 |-- language: string (nullable = true)



In [4]:
df.describe().show()

+-------+------------------+--------+
|summary|              rank|language|
+-------+------------------+--------+
|  count|                 5|       5|
|   mean|               3.0|    null|
| stddev|1.5811388300841898|    null|
|    min|                 1|      c#|
|    max|                 5|  python|
+-------+------------------+--------+



Load the mpg dataset as a spark dataframe For each vehicle..

* Create 1 column of output that contains a message like the one below:
>The 1999 audi a4 has a 4 cylinder engine.
* Transform the trans column so that it only contains either manual or auto.

In [5]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [6]:
mpg.select(
    concat(lit("The "), col("year"), lit(" "), 
    col("manufacturer"), lit(" "),
    col("model"), lit(" has a "),
    col("cyl"), lit(" cylinder engine"))
).show(truncate=False)

+-----------------------------------------------------------------------------+
|concat(The , year,  , manufacturer,  , model,  has a , cyl,  cylinder engine)|
+-----------------------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine                                     |
|The 1999 audi a4 has a 4 cylinder engine                                     |
|The 2008 audi a4 has a 4 cylinder engine                                     |
|The 2008 audi a4 has a 4 cylinder engine                                     |
|The 1999 audi a4 has a 6 cylinder engine                                     |
|The 1999 audi a4 has a 6 cylinder engine                                     |
|The 2008 audi a4 has a 6 cylinder engine                                     |
|The 1999 audi a4 quattro has a 4 cylinder engine                             |
|The 1999 audi a4 quattro has a 4 cylinder engine                             |
|The 2008 audi a4 quattro has a 4 cylind

In [7]:
mpg.select(
    concat(lit("The "), "year", lit(" "), 
    "manufacturer", lit(" "),
    "model", lit(" has a "),
    "cyl", lit(" cylinder engine"))
).show(truncate=False)

+-----------------------------------------------------------------------------+
|concat(The , year,  , manufacturer,  , model,  has a , cyl,  cylinder engine)|
+-----------------------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine                                     |
|The 1999 audi a4 has a 4 cylinder engine                                     |
|The 2008 audi a4 has a 4 cylinder engine                                     |
|The 2008 audi a4 has a 4 cylinder engine                                     |
|The 1999 audi a4 has a 6 cylinder engine                                     |
|The 1999 audi a4 has a 6 cylinder engine                                     |
|The 2008 audi a4 has a 6 cylinder engine                                     |
|The 1999 audi a4 quattro has a 4 cylinder engine                             |
|The 1999 audi a4 quattro has a 4 cylinder engine                             |
|The 2008 audi a4 quattro has a 4 cylind

In [8]:
mpg.select(regexp_replace("trans", r"\([^)]*\)", "").alias("transonly")).show()

+---------+
|transonly|
+---------+
|     auto|
|   manual|
|   manual|
|     auto|
|     auto|
|   manual|
|     auto|
|   manual|
|     auto|
|   manual|
|     auto|
|     auto|
|   manual|
|     auto|
|   manual|
|     auto|
|     auto|
|     auto|
|     auto|
|     auto|
+---------+
only showing top 20 rows



In [9]:
mpg.withColumn("trans", when(mpg.trans.startswith("a"), "auto").otherwise("manual")).show(6)

+------------+-----+-----+----+---+------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl| trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto|  f| 16| 26|  p|compact|
|        audi|   a4|  2.8|1999|  6|manual|  f| 18| 26|  p|compact|
+------------+-----+-----+----+---+------+---+---+---+---+-------+
only showing top 6 rows



Load the tips dataset as a spark dataframe.


* Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [10]:
tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



What percentage of observations are smokers?

In [11]:
tips.filter(tips.smoker == "Yes").count()/tips.count()

0.38114754098360654

Create a column that contains the tip percentage


In [12]:
tips.createOrReplaceTempView("tips")

In [13]:
spark.sql(
    """
SELECT tip, total_bill, round((tip/total_bill) * 100, 2) AS tip_percent
FROM tips
"""
).show(5)

+----+----------+-----------+
| tip|total_bill|tip_percent|
+----+----------+-----------+
|1.01|     16.99|       5.94|
|1.66|     10.34|      16.05|
| 3.5|     21.01|      16.66|
|3.31|     23.68|      13.98|
|3.61|     24.59|      14.68|
+----+----------+-----------+
only showing top 5 rows



In [14]:
tips.select(tips.tip/tips.total_bill).show()

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
|0.18623962040332148|
|0.22805017103762829|
|0.11607142857142858|
|0.13031914893617022|
| 0.2185385656292287|
| 0.1665043816942551|
|0.14180374361883155|
|0.10181582360570687|
|0.16277807921866522|
|0.20364126770060686|
|0.18164967562557924|
| 0.1616650532429816|
|0.22774708410067526|
|0.20624631703005306|
|0.16222760290556903|
+-------------------+
only showing top 20 rows



* Calculate the average tip percentage for each combination of sex and smoker.

In [15]:
tips.groupBy(tips.smoker, tips.sex).agg(round(avg(tips.tip/tips.total_bill * 100), 2)).show()

+------+------+-----------------------------------------+
|smoker|   sex|round(avg(((tip / total_bill) * 100)), 2)|
+------+------+-----------------------------------------+
|    No|Female|                                    15.69|
|    No|  Male|                                    16.07|
|   Yes|  Male|                                    15.28|
|   Yes|Female|                                    18.22|
+------+------+-----------------------------------------+



In [16]:
spark.sql(
"""
SELECT smoker, sex, avg(round((tip/total_bill) * 100, 2)) AS avg_tip_percent
FROM tips
group by smoker, sex
"""
).show(5)

+------+------+------------------+
|smoker|   sex|   avg_tip_percent|
+------+------+------------------+
|    No|Female|15.691111111111113|
|    No|  Male| 16.06659793814433|
|   Yes|  Male|15.276666666666666|
|   Yes|Female| 18.21454545454545|
+------+------+------------------+



# Seattle


In [17]:
from vega_datasets import data
data.seattle_weather()

Unnamed: 0,date,precipitation,temp_max,temp_min,wind,weather
0,2012-01-01,0.0,12.8,5.0,4.7,drizzle
1,2012-01-02,10.9,10.6,2.8,4.5,rain
2,2012-01-03,0.8,11.7,7.2,2.3,rain
3,2012-01-04,20.3,12.2,5.6,4.7,rain
4,2012-01-05,1.3,8.9,2.8,6.1,rain
...,...,...,...,...,...,...
1456,2015-12-27,8.6,4.4,1.7,2.9,fog
1457,2015-12-28,1.5,5.0,1.7,1.3,fog
1458,2015-12-29,0.0,7.2,0.6,2.6,fog
1459,2015-12-30,0.0,5.6,-1.0,3.4,sun


In [18]:
temp = spark.createDataFrame(data("seattle_weather"))
temp.show(5)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|     8.9|     2.8| 6.1|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 5 rows



Convert the temperatures to farenheight.


In [19]:
temp = temp.withColumn("temp_max", round(temp.temp_max*9/5 + 32, 2))
temp = temp.withColumn("temp_min", round(temp.temp_min*9/5 + 32, 2))
temp.show()

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|   55.04|    41.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|   51.08|   37.04| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|   53.06|   44.96| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|   53.96|   42.08| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|   48.02|   37.04| 6.1|   rain|
|2012-01-06 00:00:00|          2.5|   39.92|   35.96| 2.2|   rain|
|2012-01-07 00:00:00|          0.0|   44.96|   37.04| 2.3|   rain|
|2012-01-08 00:00:00|          0.0|    50.0|   37.04| 2.0|    sun|
|2012-01-09 00:00:00|          4.3|   48.92|    41.0| 3.4|   rain|
|2012-01-10 00:00:00|          1.0|   42.98|   33.08| 3.4|   rain|
|2012-01-11 00:00:00|          0.0|   42.98|   30.02| 5.1|    sun|
|2012-01-12 00:00:00|          0.0|   42.98|   28.94| 1.9|    

In [20]:
temp = spark.createDataFrame(data("seattle_weather"))
temp.show(5)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|     8.9|     2.8| 6.1|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 5 rows



Which month has the most rain, on average?


In [21]:
temp.createOrReplaceTempView("temp")

In [37]:
# Avg across month
(
    temp.withColumn("month", month("date"))
    .groupBy("month")
    .agg(avg("precipitation").alias("avg_rainfall"))
    .sort("avg_rainfall", ascending=False)
).show(1)

+-----+-----------------+
|month|     avg_rainfall|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+
only showing top 1 row



In [36]:
# sumed than averaged
(
    temp.withColumn("month", month("date"))
    .groupBy("month")
    .agg(avg("precipitation").alias("avg_rainfall"))
    .sort("avg_rainfall", ascending=False)
).show(1)

+-----+------------+
|month|avg_rainfall|
+-----+------------+
|   11|     160.625|
+-----+------------+
only showing top 1 row



Which year was the windiest?

In [23]:
(
    temp.withColumn("year", year("date"))
    .groupBy("year")
    .agg(avg("wind").alias("year_wind"))
    .sort("year_wind", ascending=False)
).show(1)

+----+-----------------+
|year|        year_wind|
+----+-----------------+
|2012|3.400819672131147|
+----+-----------------+
only showing top 1 row



In [24]:
# Using sum
(
    temp.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("year_wind"))
    .sort("year_wind", ascending=False)
).show(1)

+----+------------------+
|year|         year_wind|
+----+------------------+
|2012|1244.6999999999998|
+----+------------------+
only showing top 1 row



What is the most frequent type of weather in January?

In [25]:
# Using sum
(
    temp.withColumn("month", month("date"))
    .groupBy("month")
    .count(temp.weather)
    .sort("year_wind", ascending=False)
).show(1)

TypeError: _api() takes 1 positional argument but 2 were given

In [26]:
spark.sql(
"""
SELECT weather, count(weather)
FROM temp
group by weather
"""
).show(5)

+-------+--------------+
|weather|count(weather)|
+-------+--------------+
|    fog|           411|
|drizzle|            54|
|   rain|           259|
|    sun|           714|
|   snow|            23|
+-------+--------------+



What is the average high and low tempurature on sunny days in July in 2013 and 2014?


In [27]:
spark.sql(
"""
SELECT year(date) as year, month(date) as month, round(avg(temp_max), 2) as avg_max, round(avg(temp_min), 2) as avg_min
FROM temp
where month(date) = 7 and (year(date) = 2013 or year(date) = 2014)
group by year(date), month(date)

"""
).show(5)

+----+-----+-------+-------+
|year|month|avg_max|avg_min|
+----+-----+-------+-------+
|2013|    7|  26.09|  13.93|
|2014|    7|   26.9|  14.43|
+----+-----+-------+-------+



What percentage of days were rainy in q3 of 2015?


In [28]:
spark.sql(
"""
SELECT year(date) as year, 
quarter(date) as quarter, 
(select count(*) from temp where where  quarter(date) == 3 and year(date) == 2015 and weather == 'rain')/count(*) as percent_of_rain
FROM temp
where  quarter(date) == 3 and year(date) == 2015
group by year(date), quarter(date)
"""
).show(5)

+----+-------+--------------------+
|year|quarter|     percent_of_rain|
+----+-------+--------------------+
|2015|      3|0.021739130434782608|
+----+-------+--------------------+



For each year, find what percentage of days it rained (had non-zero precipitation).

In [34]:
spark.sql(
"""
select year(date) as year, round(sum(case when precipitation > 0 then 1 else 0 end)/count(*) * 100, 2) as percent_rainy_days
from temp
group by year(date)
order by year(date)
"""
).show(5)

+----+------------------+
|year|percent_rainy_days|
+----+------------------+
|2012|             48.36|
|2013|             41.64|
|2014|              41.1|
|2015|             39.45|
+----+------------------+

