In [1]:
import pyspark
import pandas as pd
import numpy as np
# create spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

### Create a spark data frame that contains your favorite programming languages.
- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
# create pandas dataframe

pd_df = pd.DataFrame({'language': ['python', 'julia', 'ruby', 'R', 'C++', 'Javascript', 'Typescript', 'Swift', 'Rust']})

In [3]:
# create spark dataframe from panadas dataframe
sp_df = spark.createDataFrame(pd_df)
sp_df

DataFrame[language: string]

In [4]:
# View the shema of the dataframe
sp_df.printSchema()

root
 |-- language: string (nullable = true)



In [5]:
# another way to print schema
sp_df.dtypes

[('language', 'string')]

In [6]:
#output the shape of the dataframe
sp_df.describe().show()
print("DataFrame shape: ", sp_df.count(), " x ", len(sp_df.columns))

+-------+--------+
|summary|language|
+-------+--------+
|  count|       9|
|   mean|    null|
| stddev|    null|
|    min|     C++|
|    max|    ruby|
+-------+--------+

DataFrame shape:  9  x  1


In [7]:
sp_df.show(5)

+--------+
|language|
+--------+
|  python|
|   julia|
|    ruby|
|       R|
|     C++|
+--------+
only showing top 5 rows



### Load the mpg dataset as a spark dataframe.
- Create 1 column of output that contains a message like the one below:
>- The 1999 audi a4 has a 4 cylinder engine.
>- For each vehicle.
>- Transform the trans column so that it only contains either manual or auto.

In [8]:
#Spark dataframe
import pydataset

mpg = spark.createDataFrame(pydataset.data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [9]:
mpg.select(
    mpg.year.alias("year"), col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"), mpg.cyl.alias("cylinders"))

DataFrame[year: bigint, highway_mileage: bigint, city_mileage: bigint, cylinders: bigint]

In [10]:
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

In [11]:
# concatenate string columns to create the desired output column
description = mpg.select(concat(lit("The "), mpg.year, (lit(" ")), mpg.manufacturer, (lit(" ")), mpg.model, (lit(" has a")), mpg.cyl, (lit(" engine"))))
description.show(20, False)

+-------------------------------------------------------------------+
|concat(The , year,  , manufacturer,  , model,  has a, cyl,  engine)|
+-------------------------------------------------------------------+
|The 1999 audi a4 has a4 engine                                     |
|The 1999 audi a4 has a4 engine                                     |
|The 2008 audi a4 has a4 engine                                     |
|The 2008 audi a4 has a4 engine                                     |
|The 1999 audi a4 has a6 engine                                     |
|The 1999 audi a4 has a6 engine                                     |
|The 2008 audi a4 has a6 engine                                     |
|The 1999 audi a4 quattro has a4 engine                             |
|The 1999 audi a4 quattro has a4 engine                             |
|The 2008 audi a4 quattro has a4 engine                             |
|The 2008 audi a4 quattro has a4 engine                             |
|The 1999 audi a4 qu

In [12]:
#Transform the trans column so that it only contains either manual or auto.
mpg.select(
    regexp_extract('trans', r'^(\w+)\(', 1).alias('trans_extract'),
    regexp_replace('trans', r'\(.+$', '').alias('trans_replace'),
    when(mpg.trans.like('auto%'), 'auto').otherwise('manual').alias('trans_when')
).show()

+-------------+-------------+----------+
|trans_extract|trans_replace|trans_when|
+-------------+-------------+----------+
|         auto|         auto|      auto|
|       manual|       manual|    manual|
|       manual|       manual|    manual|
|         auto|         auto|      auto|
|         auto|         auto|      auto|
|       manual|       manual|    manual|
|         auto|         auto|      auto|
|       manual|       manual|    manual|
|         auto|         auto|      auto|
|       manual|       manual|    manual|
|         auto|         auto|      auto|
|         auto|         auto|      auto|
|       manual|       manual|    manual|
|         auto|         auto|      auto|
|       manual|       manual|    manual|
|         auto|         auto|      auto|
|         auto|         auto|      auto|
|         auto|         auto|      auto|
|         auto|         auto|      auto|
|         auto|         auto|      auto|
+-------------+-------------+----------+
only showing top

### Load the tips dataset as a spark dataframe.
- What percentage of observations are smokers?
-  Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

In [13]:
# Load the tips dataset

tips = spark.createDataFrame(pydataset.data('tips'))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [14]:
# tips.groupBy('smoker').count().withColumn('percent',
#                                          concat(round((col('count')/tips.count()*100), 0).cast("int"), 
# lit("%"))).show()

In [15]:
# What percentage of observations are smokers
tips.groupBy('smoker').count().withColumn('percent', 
                                          concat(round((col('count')/tips.count()*100), 0).cast("int"), 
                                                 lit("%"))).show()

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|    62%|
|   Yes|   93|    38%|
+------+-----+-------+



In [16]:
# Create a column that contains the tip percent
tip_percent = (col("tip") / col("total_bill"))*100

tips.select(
    col("total_bill").alias("total"),
    tips.tip.alias("tip_amount"),
    tip_percent.alias("tip_percent"),
).show(5)

+-----+----------+------------------+
|total|tip_amount|       tip_percent|
+-----+----------+------------------+
|16.99|      1.01|5.9446733372572105|
|10.34|      1.66|16.054158607350097|
|21.01|       3.5|16.658733936220845|
|23.68|      3.31| 13.97804054054054|
|24.59|      3.61|14.680764538430255|
+-----+----------+------------------+
only showing top 5 rows



In [17]:
tips = tips.withColumn('tip_percent', round((col('tip')/col('total_bill')*100), 2))

In [18]:
tips.groupBy('sex', 'smoker').agg(mean(col('tip_percent')).alias("avg_tip_percent")).show()

+------+------+------------------+
|   sex|smoker|   avg_tip_percent|
+------+------+------------------+
|  Male|    No| 16.06659793814433|
|  Male|   Yes|15.276666666666667|
|Female|    No| 15.69111111111111|
|Female|   Yes|18.214545454545455|
+------+------+------------------+



In [19]:
avg_tip = ((col("tip") / col("total_bill"))*100)


In [23]:
tips.select(
    col("total_bill").alias("total"),
    tips.tip.alias("tip_amount"),
    tip_percent.alias("tip_percent")).show()


+-----+----------+------------------+
|total|tip_amount|       tip_percent|
+-----+----------+------------------+
|16.99|      1.01|5.9446733372572105|
|10.34|      1.66|16.054158607350097|
|21.01|       3.5|16.658733936220845|
|23.68|      3.31| 13.97804054054054|
|24.59|      3.61|14.680764538430255|
|25.29|      4.71| 18.62396204033215|
| 8.77|       2.0| 22.80501710376283|
|26.88|      3.12|11.607142857142858|
|15.04|      1.96|13.031914893617023|
|14.78|      3.23|21.853856562922868|
|10.27|      1.71| 16.65043816942551|
|35.26|       5.0|14.180374361883155|
|15.42|      1.57|10.181582360570687|
|18.43|       3.0|16.277807921866522|
|14.83|      3.02|20.364126770060686|
|21.58|      3.92|18.164967562557923|
|10.33|      1.67| 16.16650532429816|
|16.29|      3.71|22.774708410067525|
|16.97|       3.5|20.624631703005306|
|20.65|      3.35|16.222760290556902|
+-----+----------+------------------+
only showing top 20 rows



### Use the seattle weather dataset referenced in the lesson to answer the questions below.
- Convert the temperatures to farenheight.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low tempurature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation)

In [24]:
from vega_datasets import data

weather = data.seattle_weather()
weather = spark.createDataFrame(weather)
weather.show(4)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 4 rows



In [25]:
# Convert temperatures to F
weather = (weather
           .withColumn('temp_max', (col('temp_max') * 9/5 + 32))
           .withColumn('temp_min', (col('temp_min') * 9/5 + 32)))
weather.show()

+-------------------+-------------+------------------+------------------+----+-------+
|               date|precipitation|          temp_max|          temp_min|wind|weather|
+-------------------+-------------+------------------+------------------+----+-------+
|2012-01-01 00:00:00|          0.0|             55.04|              41.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|             51.08|             37.04| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|             53.06|             44.96| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|             53.96|             42.08| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|             48.02|             37.04| 6.1|   rain|
|2012-01-06 00:00:00|          2.5|             39.92|             35.96| 2.2|   rain|
|2012-01-07 00:00:00|          0.0|             44.96|             37.04| 2.3|   rain|
|2012-01-08 00:00:00|          0.0|              50.0|             37.04| 2.0|    sun|
|2012-01-09 00:00:00|          4.3|        

In [26]:
# which month had the most rain on average
(weather
 .withColumn('month', month('date')) #separate date column into month and year
 .withColumn('year', year('date'))
 .groupBy('month', 'year') # group each month for each year combo
 .agg(sum('precipitation').alias('total_monthly_precipitation')) # sum the total precipitation for each month
 .groupBy('month') #groupby month
 .agg(mean('total_monthly_precipitation').alias('avg_monthly_rain')) #take the mean
 .sort(col('avg_monthly_rain').desc()) # sort the columns
 .first()
)

Row(month=11, avg_monthly_rain=160.625)

In [27]:
# Which year is the windiest
(weather
 .withColumn('year', year('date'))
 .groupBy('year')
 .agg(sum('wind').alias('total_winds'))
 .sort(col('total_winds').desc())
 .first()
)

Row(year=2012, total_winds=1244.7)

In [28]:
# What is the most frequent type of weather in January?
(weather
 .withColumn('month', month('date'))
 .filter(col('month') == 1)
 .groupBy('weather')
 .count()
 .sort(col('count').desc())
 .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



In [29]:
# What is the average high and low tempurature on sunny days in July in 2013 and 2014?
(weather
 .filter(month('date') == 7)
 .filter(year('date') > 2012)
 .filter(year('date') < 2015)
 .filter(col('weather') == lit('sun'))
 .agg(avg('temp_max').alias('average_high_temp'), avg('temp_min').alias('average_low_temp'))
 .show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



In [30]:
# What percentage of days were rainy in q3 of 2015?
(weather
 .filter(year('date') == 2015)
 .filter(quarter('date') == 3)
 .select(when(col('weather') == 'rain', 1).otherwise(0).alias('rain'))
 .agg(mean('rain'))
 .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



In [31]:
# For each year, find what percentage of days it rained (had non-zero precipitation)
(weather
 .withColumn('rain', (when(col('precipitation') > 0, 1).otherwise(0)))
 .groupBy(year('date').alias('year'))
 .agg(mean(col('rain')).alias('pct_days_with_rain'))
 .show()
)

+----+-------------------+
|year| pct_days_with_rain|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

