In [9]:
import pandas as pd
import pydataset
import pyspark
from pyspark.sql.functions import *
spark = pyspark.sql.SparkSession.builder.getOrCreate()


1. Create a spark data frame that contains your favorite programming languages.



In [2]:
df = pd.DataFrame({'language': ['java', 'python', 'swift', 'scala']})
# convert to spark df
df = spark.createDataFrame(df)
df

DataFrame[language: string]

The name of the column should be language


View the schema of the dataframe


In [3]:
df.printSchema()

root
 |-- language: string (nullable = true)



Output the shape of the dataframe


In [4]:
def get_shape(df):
    return df.count(), len(df.columns)


get_shape(df)

                                                                                

(4, 1)

Show the first 5 records in the dataframe


In [5]:
df.head(5)

[Row(language='java'),
 Row(language='python'),
 Row(language='swift'),
 Row(language='scala')]

Load the mpg dataset as a spark dataframe.



In [7]:
mpg= spark.createDataFrame(pydataset.data('mpg'))
mpg.show(5)


+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



22/05/22 15:39:05 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 129731587 ms exceeds timeout 120000 ms
22/05/22 15:39:05 WARN SparkContext: Killing executors is not supported by current scheduler.


2. Create 1 column of output that contains a message like the one below:




The 1999 audi a4 has a 4 cylinder engine.


For each vehicle.



In [12]:
mpg.select(
    concat(
        lit("The "),
        col("year"),
        lit(" "),
        col("manufacturer"),
        lit(" "),
        col("model"),
        lit(" has a "),
        col("cyl"),
        lit(" cylinder engine."),
    ).alias("vehicle_cylinder_desc")
).show(1, truncate=False)

+-----------------------------------------+
|vehicle_cylinder_desc                    |
+-----------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.|
+-----------------------------------------+
only showing top 1 row



Transform the trans column so that it only contains either manual or auto.



In [24]:
mpg2 = mpg.withColumn(
    'trans',
    when(mpg.trans.contains('auto'), 'auto')
    .otherwise('manual')
)

In [25]:
mpg2.toPandas()

Unnamed: 0,manufacturer,model,displ,year,cyl,trans,drv,cty,hwy,fl,class
0,audi,a4,1.8,1999,4,auto,f,18,29,p,compact
1,audi,a4,1.8,1999,4,manual,f,21,29,p,compact
2,audi,a4,2.0,2008,4,manual,f,20,31,p,compact
3,audi,a4,2.0,2008,4,auto,f,21,30,p,compact
4,audi,a4,2.8,1999,6,auto,f,16,26,p,compact
...,...,...,...,...,...,...,...,...,...,...,...
229,volkswagen,passat,2.0,2008,4,auto,f,19,28,p,midsize
230,volkswagen,passat,2.0,2008,4,manual,f,21,29,p,midsize
231,volkswagen,passat,2.8,1999,6,auto,f,16,26,p,midsize
232,volkswagen,passat,2.8,1999,6,manual,f,18,26,p,midsize


3. Load the tips dataset as a spark dataframe.



In [27]:
tips = spark.createDataFrame(pydataset.data('tips'))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



What percentage of observations are smokers?


In [35]:
tips.groupby('smoker').count().withColumn('percent', round(col('count') / tips.count()*100,1)).show()


+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|   61.9|
|   Yes|   93|   38.1|
+------+-----+-------+



Create a column that contains the tip percentage


In [43]:
tips.withColumn("tip_percentage", round(col('tip') / tips.total_bill*100,1)).show(5)

+----------+----+------+------+---+------+----+--------------+
|total_bill| tip|   sex|smoker|day|  time|size|tip_percentage|
+----------+----+------+------+---+------+----+--------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|           5.9|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|          16.1|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|          16.7|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|          14.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|          14.7|
+----------+----+------+------+---+------+----+--------------+
only showing top 5 rows



Calculate the average tip percentage for each combination of sex and smoker.


In [44]:
(
    tips.withColumn("tip_percentage", col('tip') / col('total_bill'))
    .groupby("sex")
    .pivot("smoker") # make a pivot table
    .agg(round(mean("tip_percentage"), 4))
    .show()
)

+------+------+------+
|   sex|    No|   Yes|
+------+------+------+
|Female|0.1569|0.1822|
|  Male|0.1607|0.1528|
+------+------+------+



4. Use the seattle weather dataset referenced in the lesson to answer the questions below.



In [45]:

from vega_datasets import data

weather = data.seattle_weather()
weather = spark.createDataFrame(weather)
weather.show(4)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 4 rows



Convert the temperatures to fahrenheit.


In [46]:
weather = weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32))
weather.show(5)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|   55.04|    41.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|   51.08|   37.04| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|   53.06|   44.96| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|   53.96|   42.08| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|   48.02|   37.04| 6.1|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 5 rows



Which month has the most rain, on average?


In [47]:
row = (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
    .first()
)
row

Row(month=11, avg_monthly_rain=160.625)

Which year was the windiest?


In [49]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_winds"))
    .sort(col("total_winds").desc())
    .head(1)
)

[Row(year=2012, total_winds=1244.7)]

What is the most frequent type of weather in January?


In [51]:
(
    weather.withColumn("month", month("date"))
    .filter(col("month") == 1)
    .groupBy("weather")
    .count()
    .sort(col("count").desc())
    .show(1)
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
+-------+-----+
only showing top 1 row



What is the average high and low temperature on sunny days in July in 2013 and 2014?


In [52]:
(
    weather.filter(month("date") == 7)
    .filter(year("date") > 2012)
    .filter(year("date") < 2015)
    .filter(col("weather") == lit("sun"))
    .agg(
        avg("temp_max").alias("average_high_temp"),
        avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



What percentage of days were rainy in q3 of 2015?


In [53]:
(
    weather.filter(year("date") == 2015)
    .filter(quarter("date") == 3)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain"))
    .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



For each year, find what percentage of days it rained (had non-zero precipitation).



In [54]:
(
    weather.withColumn("year", year("date"))
    .select(when(col("precipitation") > 0, 1).otherwise(0).alias("did_rain"), "year")
    .groupby("year")
    .agg(mean("did_rain"))
    .show()
)

+----+-------------------+
|year|      avg(did_rain)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

