In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Custom schema
custom_schema = StructType([
    StructField("WeatherStation", StringType(), True),
    StructField("WBAN", StringType(), True),
    StructField("ObservationDate", DateType(), True),
    StructField("ObservationHour", IntegerType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True),
    StructField("Elevation", IntegerType(), True),
    StructField("WindDirection", IntegerType(), True),
    StructField("WDQualityCode", IntegerType(), True),
    StructField("SkyCeilingHeight", IntegerType(), True),
    StructField("SCQualityCode", IntegerType(), True),
    StructField("VisibilityDistance", IntegerType(), True),
    StructField("VDQualityCode", IntegerType(), True),
    StructField("AirTemperature", FloatType(), True),
    StructField("ATQualityCode", IntegerType(), True),
    StructField("DewPoint", FloatType(), True),
    StructField("DPQualityCode", IntegerType(), True),
    StructField("AtmosphericPressure", FloatType(), True),
    StructField("APQualityCode", IntegerType(), True)
])

# Load data from 60.json (MinIO) and 50 (MariaDB table)
json_path = "s3a://rharidasu/output/60.json"
df_60 = (spark.read.schema(custom_schema)
         .option("recursiveFileLookup", "true")
         .json(json_path)
         .select("WeatherStation", "ObservationDate", "AirTemperature")  # Select required columns
         .repartition(4))

df_50 = spark.read \
    .format("jdbc") \
    .option("url", 'jdbc:mysql://system75.rice.iit.edu:3306/rharidasu') \
    .option("dbtable", "(SELECT WeatherStation, ObservationDate, AirTemperature FROM dbrc50 WHERE AirTemperature IS NOT NULL) AS subquery") \
    .option("user", dbusername) \
    .option("password", dbpassword) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("fetchsize", 1000) \
    .option("partitionColumn", "ObservationDate") \
    .option("lowerBound", "1950-01-01") \
    .option("upperBound", "1970-12-31") \
    .option("numPartitions", 10) \
    .load()

# Apply filters and create temp tables
df_60_filtered = df_60.filter(
    (col("ObservationDate").isNotNull()) &
    (col("AirTemperature").isNotNull()) &
    (year(col("ObservationDate")).between(1950, 1970)) &
    (month(col("ObservationDate")) == 2) &
    (col("AirTemperature").between(-100, 100))
)
df_60_filtered.createOrReplaceTempView("filtered_60")

df_50_filtered = df_50.filter(
    (col("ObservationDate").isNotNull()) &
    (col("AirTemperature").isNotNull()) &
    (year(col("ObservationDate")).between(1950, 1970)) &
    (month(col("ObservationDate")) == 2) &
    (col("AirTemperature").between(-100, 100))
)
df_50_filtered.createOrReplaceTempView("filtered_50")

# Combine the filtered data using SQL and create a combined temp table
combined_df = spark.sql("""
    SELECT * FROM filtered_60
    UNION ALL
    SELECT * FROM filtered_50
""")
combined_df.createOrReplaceTempView("february_data")

# Task 1: Count the number of records
record_count_df = spark.sql("SELECT COUNT(*) AS record_count FROM february_data")

# Task 2: Average air temperature for February
avg_temp_df = spark.sql("SELECT AVG(AirTemperature) AS avg_temp FROM february_data")

# Task 3: Median air temperature for February
median_temp_df = spark.sql("""
    SELECT percentile_approx(AirTemperature, 0.5) AS median_temp
    FROM february_data
""")

# Task 4: Standard deviation of air temperature for February
stddev_temp_df = spark.sql("""
    SELECT stddev(AirTemperature) AS stddev_temp
    FROM february_data
""")

# Task 5: Average air temperature per StationID for each year in February
avg_temp_per_station_year = spark.sql("""
    SELECT 
        WeatherStation,
        YEAR(ObservationDate) AS Year,
        AVG(AirTemperature) AS avg_temp
    FROM february_data
    GROUP BY WeatherStation, YEAR(ObservationDate)
""")


# Output path
output_path = "s3a://rharidasu/module-11"

# Write results to CSV files
record_count_df.coalesce(1).write.csv(output_path + "/record_count", mode="overwrite", header=True)
avg_temp_df.coalesce(1).write.csv(output_path + "/avg_temp", mode="overwrite", header=True)
median_temp_df.coalesce(1).write.csv(output_path + "/median_temp", mode="overwrite", header=True)
stddev_temp_df.coalesce(1).write.csv(output_path + "/stddev_temp", mode="overwrite", header=True)
avg_temp_per_station_year.coalesce(1).write.csv(output_path + "/avg_temp_per_station_year", mode="overwrite", header=True)

# Stop Spark session
spark.stop()
