### Input Data: Metric, Value and Timestamp

In [169]:
import csv

header = ['Metric', 'Value', 'Timestamp']
rows = [
    ['temperature', 88, '2022-06-04T12:01:00.000Z'],
    ['temperature', 89, '2022-06-04T12:01:30.000Z'],
    ['precipitation', 0.5, '2022-06-04T14:23:32.000Z'],
    ['temperature', 84, '2022-06-04T13:02:00.000Z'],
    ['temperature', 86, '2022-06-04T13:03:00.000Z'],
    ['precipitation', 0.2, '2022-06-04T14:24:32.000Z'],
    ['temperature', 91, '2022-06-04T15:05:00.000Z'],
    ['precipitation', 0.8, '2022-06-04T15:30:32.000Z'],
    ['temperature', 92, '2022-06-04T16:06:00.000Z'],
    ['temperature', 94, '2022-06-04T17:07:00.000Z'],
    ['precipitation', 0.7, '2022-06-04T17:45:32.000Z'],
    ['temperature', 90, '2022-06-04T18:08:00.000Z'],
    ['precipitation', 0.6, '2022-06-04T18:20:32.000Z']
]

with open('input_data.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(header)
    writer.writerows(rows)

### Setting Up Spark Environment for Batch Aggregation

In [170]:
# import necessary modules
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# create a spark context
sc = SparkContext("local", "Batch Aggregation App")

# create a spark session
spark = SparkSession(sc)
print(sc.version)

3.3.1


### Loading the Source Data into Datframe 

In [171]:
from pyspark.sql.functions import avg, max, min, to_utc_timestamp, floor
from pyspark.sql.functions import to_utc_timestamp

# read input data from csv file
df = spark.read.csv("input_data.csv", header=True)

# convert timestamp column to utc timestamp format
df = df.withColumn("Timestamp", to_utc_timestamp(df["Timestamp"], "UTC"))

# performance optimization as it can reduce the time required to perform subsequent operations 
df = df.orderBy("Metric", "Timestamp")
df.persist()
df.show()

+-------------+-----+-------------------+
|       Metric|Value|          Timestamp|
+-------------+-----+-------------------+
|precipitation|  0.5|2022-06-04 10:23:32|
|precipitation|  0.2|2022-06-04 10:24:32|
|precipitation|  0.8|2022-06-04 11:30:32|
|precipitation|  0.7|2022-06-04 13:45:32|
|precipitation|  0.6|2022-06-04 14:20:32|
|  temperature|   88|2022-06-04 08:01:00|
|  temperature|   89|2022-06-04 08:01:30|
|  temperature|   84|2022-06-04 09:02:00|
|  temperature|   86|2022-06-04 09:03:00|
|  temperature|   91|2022-06-04 11:05:00|
|  temperature|   92|2022-06-04 12:06:00|
|  temperature|   94|2022-06-04 13:07:00|
|  temperature|   90|2022-06-04 14:08:00|
+-------------+-----+-------------------+



### Transformations/Business Loigic : Aggregating Values for Each Day and Metric

In [172]:
from pyspark.sql import functions as F

# Create a window based on the Timestamp column, with a duration of 1 day and a slide duration of 1 day
window = F.window(
    F.col("Timestamp"),
    windowDuration="1 day",
    slideDuration="1 day",
).alias("window")



# aggergated values 
df_aggregated = df.withColumn("window", window)\
                    .groupBy("window", "Metric")\
                    .agg(
                        F.avg("Value").alias("avg_value"),
                        F.min("Value").alias("min_value"),
                        F.max("Value").alias("max_value")
                    )

### Final Output

In [173]:
df_aggregated.show(truncate=False)


+------------------------------------------+-------------+---------+---------+---------+
|window                                    |Metric       |avg_value|min_value|max_value|
+------------------------------------------+-------------+---------+---------+---------+
|{2022-06-03 20:00:00, 2022-06-04 20:00:00}|precipitation|0.56     |0.2      |0.8      |
|{2022-06-03 20:00:00, 2022-06-04 20:00:00}|temperature  |89.25    |84       |94       |
+------------------------------------------+-------------+---------+---------+---------+



### Save Output to CSV

In [174]:
from pyspark.sql.functions import concat, col

# Convert window to string 
df_aggregated = df_aggregated.withColumn("window", concat(
    col("window.start").cast("string"),
    F.lit(" - "),
    col("window.end").cast("string")
))
# save the file 
df_aggregated.write.csv("./GS-assesment/aggregated_data.csv", header=True, mode="overwrite")

In [175]:
sc.stop()