In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Fetch Schema from API") \
    .getOrCreate()

# Load JSON data into PySpark DataFrame
df = spark.read.option("multiline","true").option("inferschema", "true").json("/FileStore/tables/jsonformatter__3___MConverter_eu___1_.json")


df1 = df.select("*",explode_outer("feeds").alias("new_feeds")).select("*",explode_outer("new_feeds.AirBox").alias("new_AirBox")).drop("new_feeds.AirBox").drop("feeds").drop("new_feeds")

df_1 = df1.select("device_id","num_of_records","source","version")


df2 = df1.select("new_AirBox.*")

# Extract the dynamic timestamp keys
keys = df2.schema.fieldNames()

# Flatten the nested structure
flattened_df = df2.selectExpr("inline(array(" + ",".join([f"struct('{k}' as timestamp_key, `{k}`.*)" for k in keys]) + "))")

flattened_df1 = flattened_df.dropna(subset=['Date','time'])

# Show the resulting flattened DataFrame

df_inner = df_1.join(flattened_df1, on="device_id", how="inner").dropDuplicates()

display(df_inner)


# Filter where s_d0 (PM2.5) is greater than 30
#df_filtered = df_inner.filter(col("s_d0") > 30)

# Filter the DataFrame where s_d0 (PM2.5 level) is greater than 30
df_filtered = df_inner.filter(col("s_d0") > 30).select("date","timestamp", "s_d0")
display(df_filtered)

# Extract date from timestamp for daily statistics
df = df_inner.withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd"))

# Compute daily maximum, minimum, and average pollution values
daily_stats = df.groupBy("date").agg(
    max("s_d0").alias("daily_max"),
    min("s_d0").alias("daily_min"),
    avg("s_d0").alias("daily_avg")
)

# Collect the filtered times and daily statistics
times_above_threshold = df_filtered.collect()
daily_stats_report = daily_stats.collect()

# Display the results
print("Times when PM2.5 level went above the threshold of 30:")
for row in times_above_threshold:
    print(f"Timestamp: {row['timestamp']}, PM2.5 Level: {row['s_d0']}")

print("\nDaily PM2.5 Statistics:")
print("Date       | Daily Max | Daily Min | Daily Avg")
for row in daily_stats_report:
    print(f"{row['date']} | {row['daily_max']}     | {row['daily_min']}     | {row['daily_avg']:.2f}")

device_id,num_of_records,source,version,timestamp_key,SiteName,app,area,date,gps_alt,gps_fix,gps_lat,gps_lon,gps_num,name,s_d0,s_d1,s_d2,s_h0,s_t0,time,timestamp
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:58:22Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,30,35,20,100,26.37,06:58:22,2024-06-05T06:58:22Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:39:53Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,33,40,21,100,26.37,06:39:53,2024-06-05T06:39:53Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T05:50:33Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,36,44,23,100,26.62,05:50:33,2024-06-05T05:50:33Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T05:44:23Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,31,36,20,100,26.62,05:44:23,2024-06-05T05:44:23Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:09:04Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,35,43,22,100,26.5,06:09:04,2024-06-05T06:09:04Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:27:33Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,33,40,21,100,26.5,06:27:33,2024-06-05T06:27:33Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T05:38:14Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,30,34,20,100,26.75,05:38:14,2024-06-05T05:38:14Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:02:55Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,35,43,22,100,26.62,06:02:55,2024-06-05T06:02:55Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:33:43Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,33,39,21,100,26.37,06:33:43,2024-06-05T06:33:43Z
08BEAC0AB6D6,1625,history by IIS-NRL,2024-06-03T12:12:20Z,2024-06-05T06:46:05Z,英才路219,AirBox,taichuang,2024-06-05,2,1,24.15584,120.674313,9,英才路219,32,37,20,100,26.37,06:46:05,2024-06-05T06:46:05Z


date,timestamp,s_d0
2024-06-05,2024-06-05T06:39:53Z,33
2024-06-05,2024-06-05T05:50:33Z,36
2024-06-05,2024-06-05T05:44:23Z,31
2024-06-05,2024-06-05T06:09:04Z,35
2024-06-05,2024-06-05T06:27:33Z,33
2024-06-05,2024-06-05T06:02:55Z,35
2024-06-05,2024-06-05T06:33:43Z,33
2024-06-05,2024-06-05T06:46:05Z,32
2024-06-05,2024-06-05T05:56:44Z,37
2024-06-05,2024-06-05T06:15:13Z,34


Times when PM2.5 level went above the threshold of 30:
Timestamp: 2024-06-05T06:39:53Z, PM2.5 Level: 33
Timestamp: 2024-06-05T05:50:33Z, PM2.5 Level: 36
Timestamp: 2024-06-05T05:44:23Z, PM2.5 Level: 31
Timestamp: 2024-06-05T06:09:04Z, PM2.5 Level: 35
Timestamp: 2024-06-05T06:27:33Z, PM2.5 Level: 33
Timestamp: 2024-06-05T06:02:55Z, PM2.5 Level: 35
Timestamp: 2024-06-05T06:33:43Z, PM2.5 Level: 33
Timestamp: 2024-06-05T06:46:05Z, PM2.5 Level: 32
Timestamp: 2024-06-05T05:56:44Z, PM2.5 Level: 37
Timestamp: 2024-06-05T06:15:13Z, PM2.5 Level: 34
Timestamp: 2024-06-05T06:52:13Z, PM2.5 Level: 31
Timestamp: 2024-06-05T06:21:23Z, PM2.5 Level: 34

Daily PM2.5 Statistics:
Date       | Daily Max | Daily Min | Daily Avg
2024-06-05 | 37     | 29     | 32.69
