# Exercise 1

## Setup

In [114]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, StringType, FloatType

In [115]:
temp_file_path = "../data/tempm.txt"
hum_file_path = "../data/hum.txt"

In [116]:
spark = SparkSession.builder.appName("Exercise 1").getOrCreate()

spark

## Data Loading

In [117]:
# Define schema for the JSON data
schema = MapType(StringType(), StringType())

# Load temperature data
temp_df = spark.read.text(temp_file_path) \
    .select(F.from_json(F.col("value"), schema).alias("json")) \
    .selectExpr("explode(json) as (Timestamp, Temperature)") \
    .withColumn("Timestamp", F.to_timestamp("Timestamp", "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("Temperature", F.col("Temperature").cast("float"))

# Load humidity data
hum_df = spark.read.text(hum_file_path) \
    .select(F.from_json(F.col("value"), schema).alias("json")) \
    .selectExpr("explode(json) as (Timestamp, Humidity)") \
    .withColumn("Timestamp", F.to_timestamp("Timestamp", "yyyy-MM-dd'T'HH:mm:ss")) \
    .withColumn("Humidity", F.col("Humidity").cast("float"))

In [118]:
temp_df.show(5)

+-------------------+-----------+
|          Timestamp|Temperature|
+-------------------+-----------+
|2014-02-13 06:20:00|        3.0|
|2014-02-13 13:50:00|        7.0|
|2014-02-13 06:00:00|        2.0|
|2014-02-13 03:00:00|        3.0|
|2014-02-13 13:00:00|        6.0|
+-------------------+-----------+
only showing top 5 rows



In [119]:
hum_df.show(5)

+-------------------+--------+
|          Timestamp|Humidity|
+-------------------+--------+
|2014-02-13 06:20:00|    93.0|
|2014-02-13 13:50:00|    66.0|
|2014-02-13 06:00:00|    91.0|
|2014-02-13 03:00:00|    84.0|
|2014-02-13 13:00:00|    62.0|
+-------------------+--------+
only showing top 5 rows



## Queries

### Q1 - The number of days with temperature between 18°C and 22°C

We'll count the days that at some point had a temperature between 18°C and 22°C.

In [120]:
any_days_count = temp_df.filter((F.col("Temperature") >= 18) & (F.col("Temperature") <= 22)) \
    .groupBy(F.col("Timestamp").cast("date").alias("date")) \
    .agg(F.first("Temperature").alias("Temperature")) \
    .count()

print(f"The number of days that at some point had a temperature between 18°C and 22°C was {any_days_count}.")

The number of days that at some point had a temperature between 18°C and 22°C was 26.


As an alternative approach, we'll count the days that had an average temperature between 18°C and 22°C.

In [121]:
avg_days_count = temp_df.groupBy(F.col("Timestamp").cast("date").alias("date")) \
    .agg(F.avg("Temperature").alias("avg_temp")) \
    .filter((F.col("avg_temp") >= 18) & (F.col("avg_temp") <= 22)) \
    .count()

print(f"The number of days that had an average temperature between 18°C and 22°C was {avg_days_count}.")

The number of days that had an average temperature between 18°C and 22°C was 1.


### Q2 - The 10 coldest and 10 hottest days

To find the coldest days, we'll get the 10 unique ones with the lowest temperatures.

In [122]:
coldest_days = temp_df.filter(F.col("Temperature").isNotNull()) \
    .groupBy(F.col("Timestamp").cast("date").alias("Date")) \
    .agg(F.min("Temperature").alias("Lowest Temperature")) \
    .orderBy(F.col("Lowest Temperature").asc()) \
    .limit(10)

coldest_days.show()

+----------+------------------+
|      Date|Lowest Temperature|
+----------+------------------+
|2014-03-27|              -3.0|
|2014-03-25|              -3.0|
|2014-03-11|              -3.0|
|2014-03-24|              -2.0|
|2014-04-16|              -1.0|
|2014-03-30|              -1.0|
|2014-03-12|              -1.0|
|2014-03-13|               0.0|
|2014-03-26|               0.0|
|2014-03-31|               0.0|
+----------+------------------+



Similarly, to find the hottest days, we'll get the 10 unique ones with the highest temperatures.

In [123]:
hottest_days = temp_df.filter(F.col("Temperature").isNotNull()) \
    .groupBy(F.col("Timestamp").cast("date").alias("Date")) \
    .agg(F.max("Temperature").alias("Highest Temperature")) \
    .orderBy(F.col("Highest Temperature").desc()) \
    .limit(10)

hottest_days.show()

+----------+-------------------+
|      Date|Highest Temperature|
+----------+-------------------+
|2014-05-22|               25.0|
|2014-05-21|               24.0|
|2014-05-30|               22.0|
|2014-06-02|               22.0|
|2014-06-08|               22.0|
|2014-05-25|               21.0|
|2014-04-29|               20.0|
|2014-05-24|               20.0|
|2014-05-26|               20.0|
|2014-05-17|               20.0|
+----------+-------------------+



### Q3 - The month with the highest humidity standard deviation

We will calculate the standard deviation of the humidity for each month and then get the month with the highest standard deviation.

In [124]:
from calendar import month_name

hum_std_df = hum_df \
    .groupBy(F.month("Timestamp").alias("Month")) \
    .agg(F.stddev("Humidity").alias("Standard Deviation")) \
    .orderBy(F.col("Standard Deviation").desc()) \
    .limit(1)

result_row = hum_std_df.first()
month_num = result_row['Month']
std_dev = result_row['Standard Deviation']
month = month_name[month_num]

print(f"The month with the highest standard deviation in humidity was {month} with {std_dev:.2f}.")

The month with the highest standard deviation in humidity was April with 17.73.


### Q4 - The minimum and maximum discomfort index

We'll calculate the discomfort index for all timestamps recorded and get the minimum and maximum.

In [125]:
di_expr = F.col('Temperature') - 0.55 * (1 - 0.01 * F.col('Humidity')) * (F.col('Temperature') - 14.5)

discomfort_df = temp_df.join(hum_df, on='Timestamp').withColumn('Discomfort Index', F.round(di_expr, 2))
discomfort_df = discomfort_df.na.drop(how="any")

min_di_row = discomfort_df.orderBy('Discomfort Index').first()
max_di_row = discomfort_df.orderBy(F.desc('Discomfort Index')).first()

print(f"The minimum DI was {min_di_row['Discomfort Index']} on {min_di_row['Timestamp']}.")
print(f"The maximum DI was {max_di_row['Discomfort Index']} on {max_di_row['Timestamp']}.")

The minimum DI was -2.33 on 2014-03-11 07:20:00.
The maximum DI was 22.11 on 2014-05-22 14:20:00.


## Stop

In [126]:
spark.stop()