In [123]:
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.functions import input_file_name, regexp_extract

In [124]:
spark = SparkSession.builder \
        .appName("S3ToAuroraPostgres") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000") \
        .config("spark.hadoop.fs.s3a.multipart.purge.age", "24") \
        .config("spark.hadoop.fs.s3a.connection.establish.timeout", "30000") \
        .config("spark.hadoop.fs.s3a.connection.timeout", "60000") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider") \
        .getOrCreate()

In [86]:
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
iterator = hadoop_conf.iterator()
while iterator.hasNext():
    entry = iterator.next()
    if "fs.s3a" in entry.getKey():
        print(f"{entry.getKey()}: {entry.getValue()}")

fs.s3a.connection.establish.timeout: 60000
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.socket.timeout: 60000
fs.s3a.threads.keepalivetime: 60
fs.s3a.connection.timeout: 60000
fs.s3a.multipart.purge.age: 24
fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider


In [125]:
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.clear()  # Clear all Hadoop configurations
hadoop_conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.connection.timeout", "60000")
hadoop_conf.set("fs.s3a.socket.timeout", "60000")
hadoop_conf.set("fs.s3a.connection.establish.timeout", "60000")
hadoop_conf.set("fs.s3a.threads.keepalivetime", "60")
hadoop_conf.set("fs.s3a.multipart.purge.age", "24")

In [126]:
df = spark.read.json("s3a://af-weather-lake-01/data/weather/")

                                                                                

In [127]:
df.select("current.*").show(5)

+--------------------+-----------+--------+------+-------------+------------+----+--------------------+-------+----------------+--------------+----------------+------------+------------------+--------------+--------------+
|apparent_temperature|cloud_cover|interval|is_day|precipitation|pressure_msl|rain|relative_humidity_2m|showers|surface_pressure|temperature_2m|            time|weather_code|wind_direction_10m|wind_gusts_10m|wind_speed_10m|
+--------------------+-----------+--------+------+-------------+------------+----+--------------------+-------+----------------+--------------+----------------+------------+------------------+--------------+--------------+
|                90.5|        100|     900|     0|          0.1|      1006.0| 0.0|                  83|    0.1|           949.5|          80.3|2025-09-08T14:00|          80|               143|          10.1|           1.8|
|                91.1|        100|     900|     1|          0.1|      1003.0| 0.0|                  83|    0

In [128]:
df.agg(
    F.count("*") \
).show()

[Stage 212:=====>                                                  (1 + 9) / 10]

+--------+
|count(1)|
+--------+
|     295|
+--------+



                                                                                

In [91]:
df.printSchema()

root
 |-- current: struct (nullable = true)
 |    |-- apparent_temperature: double (nullable = true)
 |    |-- cloud_cover: long (nullable = true)
 |    |-- interval: long (nullable = true)
 |    |-- is_day: long (nullable = true)
 |    |-- precipitation: double (nullable = true)
 |    |-- pressure_msl: double (nullable = true)
 |    |-- rain: double (nullable = true)
 |    |-- relative_humidity_2m: long (nullable = true)
 |    |-- showers: double (nullable = true)
 |    |-- surface_pressure: double (nullable = true)
 |    |-- temperature_2m: double (nullable = true)
 |    |-- time: string (nullable = true)
 |    |-- weather_code: long (nullable = true)
 |    |-- wind_direction_10m: long (nullable = true)
 |    |-- wind_gusts_10m: double (nullable = true)
 |    |-- wind_speed_10m: double (nullable = true)
 |-- current_units: struct (nullable = true)
 |    |-- apparent_temperature: string (nullable = true)
 |    |-- cloud_cover: string (nullable = true)
 |    |-- interval: string (nulla

In [129]:
work_df = df.select("current.*", "date", "latitude", "longitude") \
    .withColumn("file_path", input_file_name()) \
    .withColumn("city", regexp_extract("file_path", r"date=[^/]+/([^/]+)/[^/]+\.json$", 1))

In [93]:
work_df.printSchema()

root
 |-- apparent_temperature: double (nullable = true)
 |-- cloud_cover: long (nullable = true)
 |-- interval: long (nullable = true)
 |-- is_day: long (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- pressure_msl: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- relative_humidity_2m: long (nullable = true)
 |-- showers: double (nullable = true)
 |-- surface_pressure: double (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- time: string (nullable = true)
 |-- weather_code: long (nullable = true)
 |-- wind_direction_10m: long (nullable = true)
 |-- wind_gusts_10m: double (nullable = true)
 |-- wind_speed_10m: double (nullable = true)
 |-- date: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- file_path: string (nullable = false)
 |-- city: string (nullable = false)



In [94]:
work_df.select("city").show(5)

+------+
|  city|
+------+
|havana|
|havana|
|havana|
|havana|
|havana|
+------+
only showing top 5 rows


In [130]:
work_df.groupBy("city") \
    .agg(
        F.count("*").alias("num_of_cities")
    ) \
    .show()



+------+-------------+
|  city|num_of_cities|
+------+-------------+
|havana|           97|
|nassau|           99|
| miami|           99|
+------+-------------+



                                                                                

In [96]:
work_df.groupBy("date") \
    .agg(
        F.count("*").alias("num_of_dates")
    ) \
    .show(3)

[Stage 139:>                                                      (0 + 10) / 10]

+---------------+------------+
|           date|num_of_dates|
+---------------+------------+
|2025-09-02_2100|           3|
|2025-09-01_1400|           3|
|2025-09-01_2000|           3|
+---------------+------------+
only showing top 3 rows


                                                                                

In [143]:
work_df.groupBy("date") \
    .agg(
        F.count("*").alias("num_of_dates")
    ) \
    .where("num_of_dates < 3") \
    .show()



+---------------+------------+
|           date|num_of_dates|
+---------------+------------+
|2025-09-03_0400|           2|
|2025-09-09_1800|           2|
+---------------+------------+



                                                                                

In [148]:
# clean_df = work_df.select("*")\
#     .where(work_df.date != "2025-09-03_0400")
valid_dates_df = work_df.groupBy("date") \
    .agg(
        F.count("*").alias("num_of_dates")
    ) \
    .where("num_of_dates == 3") \
    .select("date") 
clean_df = work_df.join(valid_dates_df, "date", "inner")
# clean_df.show()

In [149]:
clean_df.groupBy("city") \
    .agg(
        F.count("*").alias("num_of_cities")
    ) \
    .show()



+------+-------------+
|  city|num_of_cities|
+------+-------------+
|havana|           97|
|nassau|           97|
| miami|           97|
+------+-------------+



                                                                                

In [150]:
clean_df.agg(F.count("*")).show()



+--------+
|count(1)|
+--------+
|     291|
+--------+



                                                                                

In [153]:
clean_df.count()

                                                                                

291

In [151]:
clean_df = clean_df.dropna()

In [152]:
clean_df.agg(F.count("*")).show()



+--------+
|count(1)|
+--------+
|     291|
+--------+



                                                                                

In [103]:
clean_df.printSchema()

root
 |-- apparent_temperature: double (nullable = true)
 |-- cloud_cover: long (nullable = true)
 |-- interval: long (nullable = true)
 |-- is_day: long (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- pressure_msl: double (nullable = true)
 |-- rain: double (nullable = true)
 |-- relative_humidity_2m: long (nullable = true)
 |-- showers: double (nullable = true)
 |-- surface_pressure: double (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- time: string (nullable = true)
 |-- weather_code: long (nullable = true)
 |-- wind_direction_10m: long (nullable = true)
 |-- wind_gusts_10m: double (nullable = true)
 |-- wind_speed_10m: double (nullable = true)
 |-- date: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- file_path: string (nullable = false)
 |-- city: string (nullable = false)



In [104]:
clean_df.select(clean_df.surface_pressure, clean_df.city).show(60)

[Stage 155:>                                                        (0 + 2) / 2]

+----------------+------+
|surface_pressure|  city|
+----------------+------+
|           949.5|havana|
|           946.8|havana|
|           943.7|havana|
|           944.4|havana|
|           946.4|havana|
|           945.1|havana|
|           946.2|havana|
|           951.6|havana|
|           946.4|havana|
|           944.6|havana|
|           946.1|havana|
|           947.1|havana|
|           946.6|havana|
|           944.0|havana|
|           943.1|havana|
|           945.5|havana|
|           944.9|havana|
|           960.5|nassau|
|           945.9|havana|
|           944.4|havana|
|           951.1|havana|
|           951.1|havana|
|           951.1|havana|
|           945.3|havana|
|           961.5|nassau|
|           943.7|havana|
|           947.4|havana|
|           947.1|havana|
|           951.2|havana|
|           945.6|havana|
|           943.9|havana|
|           959.0|nassau|
|           960.3|nassau|
|           960.0|nassau|
|           949.0|havana|
|           

                                                                                

In [105]:
train_data, test_data = clean_df.randomSplit([0.8, 0.2], seed=42)
print("Train size: ", train_data.count())
print("Test size: ", test_data.count())

                                                                                

Train size:  219


[Stage 159:>                                                      (0 + 10) / 10]

Test size:  48


                                                                                

In [106]:
pipeline_stages = []

In [107]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="city", outputCol="city_index")
# indexer_model = indexer.fit(train_data)
# train_data = indexer_model.transform(train_data)
#
pipeline_stages.append(indexer)

In [108]:
# input_columns = ["apparent_temperature", "cloud_cover", "interval",
#                  "is_day", "precipitation", "pressure_msl",
#                  "rain", "relative_humidity_2m", "showers",
#                  "surface_pressure", "temperature_2m",
#                  "wind_direction_10m", "wind_gusts_10m", "wind_speed_10m"]
input_columns = ["apparent_temperature", "cloud_cover", "interval",
                 "precipitation", "pressure_msl",
                "rain", "relative_humidity_2m", "showers",
                 "temperature_2m",
                  "wind_direction_10m", "wind_gusts_10m", "wind_speed_10m"
                 ]
# input_columns = ["surface_pressure"]

In [109]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=input_columns, outputCol="unscaled_features")
# train_data = assembler.transform(train_data)
#
pipeline_stages.append(assembler)

In [110]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='unscaled_features', outputCol='features', withMean=True, withStd=True)

# scaler_model = scaler.fit(train_data)
# train_data = scaler_model.transform(train_data)
#
pipeline_stages.append(scaler)

In [111]:
# train_data.show(5)

In [112]:
from pyspark.ml.classification import DecisionTreeClassifier

dtc = DecisionTreeClassifier(featuresCol='features', labelCol='city_index')
# model = dtc.fit(train_data)
#
pipeline_stages.append(dtc)

In [113]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=pipeline_stages)

In [115]:
model = pipeline.fit(train_data)

                                                                                

In [None]:
# test_data = indexer_model.transform(test_data)
# test_data = assembler.transform(test_data)
# test_data = scaler_model.transform(test_data)
# test_data.show(1)

In [116]:
predictions = model.transform(test_data)
# predictions.show(3)

In [117]:
predictions.select('features', 'city_index', 'prediction').show(10)

[Stage 186:>                                                        (0 + 2) / 2]

+--------------------+----------+----------+
|            features|city_index|prediction|
+--------------------+----------+----------+
|[-1.5123543444348...|       1.0|       1.0|
|[-1.0609668126716...|       1.0|       2.0|
|[-0.9481199297308...|       1.0|       1.0|
|[-0.3613161384387...|       1.0|       0.0|
|[0.18034889967706...|       1.0|       1.0|
|[0.69944456120470...|       1.0|       0.0|
|[-1.7831868634927...|       1.0|       1.0|
|[-0.9481199297308...|       2.0|       2.0|
|[-0.9481199297308...|       1.0|       2.0|
|[-0.5193017745558...|       2.0|       2.0|
+--------------------+----------+----------+
only showing top 10 rows


                                                                                

In [118]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracty_evaluator = MulticlassClassificationEvaluator(labelCol='city_index', predictionCol='prediction', metricName='accuracy')
accuracy = accuracty_evaluator.evaluate(predictions) * 100
print(f'Accuracy = {accuracy:.2f}%')

[Stage 187:>                                                      (0 + 10) / 10]

Accuracy = 83.33%


                                                                                

In [121]:
from datetime import datetime

# Save with date key
date_str = datetime.now().strftime("%Y-%m-%d")
model_path = f"s3a://af-weather-lake-01/models/date={date_str}/model/"
model_path

's3a://af-weather-lake-01/models/date=2025-09-09/model/'

In [122]:
# model.write().overwrite().save(model_path)

                                                                                

In [155]:
import json
num_rows = clean_df.count()
metadata = {
        "input_columns": input_columns,
        "training_split_used": "None",
        "training_entries_count": num_rows,
        "paramGridBuilderValues": None,
    }
json_string = json.dumps(metadata, indent=2)
print(json_string)



{
  "input_columns": [
    "apparent_temperature",
    "cloud_cover",
    "interval",
    "precipitation",
    "pressure_msl",
    "rain",
    "relative_humidity_2m",
    "showers",
    "temperature_2m",
    "wind_direction_10m",
    "wind_gusts_10m",
    "wind_speed_10m"
  ],
  "training_split_used": "None",
  "training_entries_count": 291,
  "paramGridBuilderValues": null
}


                                                                                