# Distributed ML: Spark API

In [1]:
# Import
import pyspark
from pyspark.sql import functions as F

In [2]:
# create spark session:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 09:23:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

## 1. Create a spark data frame that contains your favorite programming languages.
- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [4]:
# create a spark data frame with 5 records and a single column 'language'
df = spark.createDataFrame([("Scala", ), ("Python", ), ("R", ), ("Java", ), ("C++", )], ["language"])

In [5]:
# View the schema of the dataframe
df.printSchema()

root
 |-- language: string (nullable = true)



In [6]:
# Output the shape of the dataframe
df.count(), len(df.columns)

                                                                                

(5, 1)

In [7]:
# Show the first 5 records in the dataframe
df.show(5)

                                                                                

+--------+
|language|
+--------+
|   Scala|
|  Python|
|       R|
|    Java|
|     C++|
+--------+



## 2. Load the mpg dataset as a spark dataframe.
- a. Create 1 column of output that contains a message like the one below for each vehicle: \
The 1999 audi a4 has a 4 cylinder engine.

- b. Transform the trans column so that it only contains either manual or auto.

In [8]:
# from pydataset load the mpg dataset as a spark dataframe
from pydataset import data
mpg = spark.createDataFrame(data('mpg'))

In [9]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [10]:
# Create new column with the message 'The {year, manufacturer, model} has a {cyl} cylinder engine.'
mpg.select(
    F.concat(
        F.lit('The '),
        (mpg.year),
        F.lit(' '),
        (mpg.manufacturer),
        F.lit(' '),
        (mpg.model),
        F.lit(' has a '),
        (mpg.cyl),
        F.lit(' cylinder engine.')).alias('description')
).show(truncate=False)

+--------------------------------------------------------------+
|description                                                   |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [11]:
# Transform the trans column so that it only contains either manual or auto.
from pyspark.sql.functions import when

mpg = mpg.withColumn(
    'trans',
    when(
        mpg.trans.like('manual%'), 'manual')
    .when(
        mpg.trans.like('auto%'), 'auto'))
mpg.show(5)

                                                                                

+------------+-----+-----+----+---+------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl| trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+------+---+---+---+---+-------+
only showing top 5 rows



## 3. Load the tips dataset as a spark dataframe.
- a. What percentage of observations are smokers?
- b. Create a column that contains the tip percentage
- c. Calculate the average tip percentage for each combination of sex and smoker.

In [12]:
# from pydataset load the tips dataset as a spark dataframe
tips = spark.createDataFrame(data('tips'))

In [13]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



                                                                                

In [14]:
# What percentage of observations are smokers?
tips.select(
    F.round(
        F.avg(
            F.when(
                tips.smoker == 'Yes', 1).otherwise(0)
        ) * 100, 2
    ).alias('percent_smokers')
).show()

[Stage 10:>                                                         (0 + 8) / 8]

+---------------+
|percent_smokers|
+---------------+
|          38.11|
+---------------+



                                                                                

In [15]:
# Create a column that contains the tip percentage.
tips = tips.withColumn(
    'tip_percent',
    F.round(tips.tip / tips.total_bill * 100, 2)
)

In [16]:
# Calculate the average tip percentage for each combination of sex and smoker.
tips.groupBy('sex').pivot('smoker').agg(F.round(F.avg(tips.tip_percent), 2)).show()

                                                                                

+------+-----+-----+
|   sex|   No|  Yes|
+------+-----+-----+
|Female|15.69|18.21|
|  Male|16.07|15.28|
+------+-----+-----+



## 4. Use the Seattle weather dataset referenced in the lesson to answer the questions below.
- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in Q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).


In [17]:
# Acquire data from vega_datasets
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

                                                                                

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [18]:
# Convert the temperatures to fahrenheit
# Method 1
'''weather = weather.withColumn('temp_max', F.round(weather.temp_max * (9/5) + 32, 2))
weather = weather.withColumn('temp_min', F.round(weather.temp_min * (9/5) + 32, 2))
weather.show()'''

"weather = weather.withColumn('temp_max', F.round(weather.temp_max * (9/5) + 32, 2))\nweather = weather.withColumn('temp_min', F.round(weather.temp_min * (9/5) + 32, 2))\nweather.show()"

In [19]:
# Method 2
weather = weather.withColumns(
{
    'max_temp': F.round(F.col('temp_max') * (9/5) + 32, 2),
    'min_temp': F.round(F.col('temp_min') * (9/5) + 32, 2)  
}).drop('temp_max', 'temp_min')

weather.show()

+----------+-------------+----+-------+--------+--------+
|      date|precipitation|wind|weather|max_temp|min_temp|
+----------+-------------+----+-------+--------+--------+
|2012-01-01|          0.0| 4.7|drizzle|   55.04|    41.0|
|2012-01-02|         10.9| 4.5|   rain|   51.08|   37.04|
|2012-01-03|          0.8| 2.3|   rain|   53.06|   44.96|
|2012-01-04|         20.3| 4.7|   rain|   53.96|   42.08|
|2012-01-05|          1.3| 6.1|   rain|   48.02|   37.04|
|2012-01-06|          2.5| 2.2|   rain|   39.92|   35.96|
|2012-01-07|          0.0| 2.3|   rain|   44.96|   37.04|
|2012-01-08|          0.0| 2.0|    sun|    50.0|   37.04|
|2012-01-09|          4.3| 3.4|   rain|   48.92|    41.0|
|2012-01-10|          1.0| 3.4|   rain|   42.98|   33.08|
|2012-01-11|          0.0| 5.1|    sun|   42.98|   30.02|
|2012-01-12|          0.0| 1.9|    sun|   42.98|   28.94|
|2012-01-13|          0.0| 1.3|    sun|    41.0|   26.96|
|2012-01-14|          4.1| 5.3|   snow|   39.92|   33.08|
|2012-01-15|  

                                                                                

In [20]:
# Which month has the most rain on average?
from pyspark.sql.functions import month, year, quarter

weather.withColumn(
    'month', F.month(F.col('date'))
).groupBy(
    'month'
).agg(
    F.round(F.avg(weather.precipitation), 2).alias('avg_rain')
).sort(F.desc('avg_rain')).show(1)

                                                                                

+-----+--------+
|month|avg_rain|
+-----+--------+
|   11|    5.35|
+-----+--------+
only showing top 1 row



In [21]:
# Which year was the windiest

weather.withColumn(
    'year', F.year(F.col('date'))
).groupBy(
    'year'
).agg(
    F.round(F.sum(weather.wind)).alias('total_wind')
).sort(F.desc('total_wind')).show(1)

[Stage 34:>                                                         (0 + 1) / 1]

+----+----------+
|year|total_wind|
+----+----------+
|2012|    1245.0|
+----+----------+
only showing top 1 row



                                                                                

In [22]:
# What is the most frequent type of weather in January?
weather.filter(
    F.month(F.col('date')) == 1
        ).groupBy('weather'
                  ).count().sort(F.desc('count')).show(1)

[Stage 35:>                                                         (0 + 8) / 8]

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
+-------+-----+
only showing top 1 row



                                                                                

In [23]:
weather.filter(
    F.month(weather.date) == 1
).groupBy(
    F.col('weather')
).count().sort(F.col('count'), ascending=False).first()

                                                                                

Row(weather='fog', count=38)

In [24]:
# What is the average high and low temperature on sunny days in July in 2013 and 2014?
weather.filter(
    (F.col('weather') == 'sun') & 
    (F.month(F.col('date')) == 7) & 
    (F.year(F.col('date')).isin(2013, 2014))
).agg(
    F.round(F.avg(F.col('max_temp')), 2).alias('avg_max_temp'),
    F.round(F.avg(F.col('min_temp')), 2).alias('avg_min_temp')
).show()

[Stage 41:>                                                         (0 + 8) / 8]

+------------+------------+
|avg_max_temp|avg_min_temp|
+------------+------------+
|       80.29|       57.53|
+------------+------------+



                                                                                

In [25]:
# What percentage of days were rainy in q3 of 2015?
weather.filter(
(F.quarter(F.col('date')) == 3) &
(F.year(F.col('date')) == 2015)
).withColumn(
    'rain_happened',
    F.when(F.col('weather') == F.lit('rain'),
           1).otherwise(0)
).agg(F.round(F.mean(F.col('rain_happened'))* 100, 2).alias('avg_rain_days')).show()



+-------------+
|avg_rain_days|
+-------------+
|         2.17|
+-------------+



                                                                                

In [28]:
# For each year, find what percentage of days it rained (had non-zero precipitation).
weather.withColumn(
    'rain_happened',
    F.when(
        F.col('precipitation') > 0,
        1).otherwise(0)
).select('precipitation', 'rain_happened').show(5)

+-------------+-------------+
|precipitation|rain_happened|
+-------------+-------------+
|          0.0|            0|
|         10.9|            1|
|          0.8|            1|
|         20.3|            1|
|          1.3|            1|
+-------------+-------------+
only showing top 5 rows



In [29]:
weather.withColumn(
    'rain_happened',
    F.when(
        F.col('precipitation') > 0,
        1).otherwise(0)
).groupBy(
    F.year(F.col('date')
    )
).agg(
    F.round(F.mean('rain_happened') * 100, 2).alias('percent_rain')
).show()

[Stage 51:>                                                         (0 + 1) / 1]

+----------+------------+
|year(date)|percent_rain|
+----------+------------+
|      2012|       48.36|
|      2013|       41.64|
|      2014|        41.1|
|      2015|       39.45|
+----------+------------+



                                                                                