In [1]:
import pyspark
import pandas as pd

1. Create a spark data frame that contains your favorite programming languages.
    * The name of the column should be language
    * View the schema of the dataframe
    * Output the shape of the dataframe
    * Show the first 5 records in the dataframe

In [2]:
pd_df = pd.DataFrame({'language':['Python','C++','JavaScript'], 
                   'rank':[2,1,3]})
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(pd_df)
spark_df.show()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/05 15:07:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+----------+----+
|  language|rank|
+----------+----+
|    Python|   2|
|       C++|   1|
|JavaScript|   3|
+----------+----+



2. Load the mpg dataset as a spark dataframe.
    * Create 1 column of output that contains a message like the one below:
    `The 1999 audi a4 has a 4 cylinder engine.`
    * For each vehicle.
    * Transform the trans column so that it only contains either manual or auto.

In [3]:
# create spark dataframe
from pydataset import data
mpg = spark.createDataFrame(data('mpg'))
mpg.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [4]:
# create column with the intended statement
from pyspark.sql.functions import concat, lit
mpg.select(concat(lit('The '), 
                  lit(mpg.year), 
                  lit(' '), 
                  lit(mpg.manufacturer),
                  lit(' '),
                  lit(mpg.model),
                  lit(' has a '),
                  lit(mpg.cyl),
                  lit(' cylinder engine.')
).alias('statement')).head(5)

[Row(statement='The 1999 audi a4 has a 4 cylinder engine.'),
 Row(statement='The 1999 audi a4 has a 4 cylinder engine.'),
 Row(statement='The 2008 audi a4 has a 4 cylinder engine.'),
 Row(statement='The 2008 audi a4 has a 4 cylinder engine.'),
 Row(statement='The 1999 audi a4 has a 6 cylinder engine.')]

3. Load the tips dataset as a spark dataframe.
    * What percentage of observations are smokers?
    * Create a column that contains the tip percentage
    * Calculate the average tip percentage for each combination of sex and smoker.

In [5]:
tips = spark.createDataFrame(data('tips'))
tips.head(3)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3)]

In [6]:
# % who are smokers
from pyspark.sql.functions import mean, when
percent_smoking = tips.select(mean(when(tips.smoker == 'Yes', 1)\
                                   .otherwise(0))\
                              .alias('percent_smokers'))
print(percent_smoking.show())
# tip percentage
tips = tips.select('*', (tips.tip / tips.total_bill).alias('tip_percentage'))
tips.show()


+-------------------+
|    percent_smokers|
+-------------------+
|0.38114754098360654|
+-------------------+

None
+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|


In [7]:
# averate tip percentage per each combo of sex/smoker
from pyspark.sql.functions import col
tips.groupBy('sex').pivot('smoker').agg(mean(col('tip_percentage'))).show()

[Stage 16:>                                                         (0 + 8) / 8]

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941032|
|  Male|0.1606687151291298|0.15277117520248512|
+------+------------------+-------------------+



                                                                                

4. Use the seattle weather dataset referenced in the lesson to answer the questions below.
    * Convert the temperatures to fahrenheit.
    * Which month has the most rain, on average?
    * Which year was the windiest?
    * What is the most frequent type of weather in January?
    * What is the average high and low temperature on sunny days in July in 2013 and 2014?
    * What percentage of days were rainy in q3 of 2015?
    * For each year, find what percentage of days it rained (had non-zero precipitation).

In [8]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(3)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 3 rows



In [9]:
# convert temps to F
def c_to_f(temp):
    return (temp * (9/5)) + 32

weather.select('temp_max', 
               c_to_f(weather.temp_max).alias('f_temp_max'),
               'temp_min',
               c_to_f(weather.temp_min).alias('f_temp_min')
              ).show(3)

+--------+------------------+--------+----------+
|temp_max|        f_temp_max|temp_min|f_temp_min|
+--------+------------------+--------+----------+
|    12.8|55.040000000000006|     5.0|      41.0|
|    10.6|             51.08|     2.8|     37.04|
|    11.7|             53.06|     7.2|     44.96|
+--------+------------------+--------+----------+
only showing top 3 rows



In [10]:
# most rainy month
from pyspark.sql.functions import month, mean
weather.withColumn("month", month("date"))\
    .groupBy("month")\
    .agg(mean(weather.precipitation).alias("average_rainfall"))\
    .sort(col('average_rainfall').desc())\
    .show(1)



+-----+-----------------+
|month| average_rainfall|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+
only showing top 1 row



                                                                                

In [11]:
# windiest year
from pyspark.sql.functions import year
weather.withColumn("year", year("date"))\
    .groupBy("year")\
    .agg(mean(weather.wind).alias("average_wind"))\
    .sort(col('average_wind').desc())\
    .show(1)

+----+-----------------+
|year|     average_wind|
+----+-----------------+
|2012|3.400819672131148|
+----+-----------------+
only showing top 1 row



In [12]:
# most frequent weather type in january
from pyspark.sql.functions import count
weather.where(month('date') == '1')\
       .groupBy('weather')\
       .agg(count('weather').alias('count'))\
       .sort(col('count').desc())\
.show(1)

[Stage 30:>                                                         (0 + 8) / 8]

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
+-------+-----+
only showing top 1 row



                                                                                

In [13]:
# What is the average high and low temperature on sunny days in July in 2013 and 2014?
from pyspark.sql.functions import max, min, mean
weird_one = weather.where(((year('date') == '2013') | (year('date') == '2014')))
weird_one = weather.where(month('date') == '7')
weird_one = weather.where(weather.weather == 'sun')
weird_one = weather.select(mean(weather.temp_max).alias('average_high'),
                    mean(weather.temp_min).alias('average_low'))
weird_one.show()

+------------------+-----------------+
|      average_high|      average_low|
+------------------+-----------------+
|16.439082819986314|8.234770704996578|
+------------------+-----------------+



In [14]:
# What percentage of days were rainy in q3 of 2015?
q3 = (month('date') == '7') | (month('date') == '8') | (month('date') == '9')
weather.where((year('date') == '2015') & q3)\
        .select(count(col('weather') == 'rainy').alias('rainy_days')).show()

+----------+
|rainy_days|
+----------+
|        92|
+----------+



In [15]:
# For each year, find what percentage of days it rained (had non-zero precipitation).
from pyspark.sql.functions import sum
weather.withColumn('year', year('date'))\
       .withColumn('rained', when(col('precipitation') > 0, 1).otherwise(0))\
       .groupBy(col('year'))\
       .agg(sum('rained').alias('rainy_days'), 
            count('precipitation').alias('number_of_days'))\
       .withColumn('percent_rained', col('rainy_days') / col('number_of_days'))\
.show()

+----+----------+--------------+-------------------+
|year|rainy_days|number_of_days|     percent_rained|
+----+----------+--------------+-------------------+
|2012|       177|           366|0.48360655737704916|
|2013|       152|           365|0.41643835616438357|
|2014|       150|           365|  0.410958904109589|
|2015|       144|           365|0.39452054794520547|
+----+----------+--------------+-------------------+

