# Real-Time Weather Data ETL Pipeline Using Airflow PySpark Deployed on AWS using EC2 and S3 Service 


## 🌤️ Project Overview

This project implements a comprehensive **real-time weather data ETL pipeline** that automatically extracts current weather information for Indian cities(eg. Pune), processes the data, and stores it in a scalable cloud architecture. The pipeline combines Apache Airflow for orchestration, AWS services for storage, and Apache Spark for data transformation.

###  Data Transformation (Databricks Notebook)

The transformation layer (`OpenWeather-ETL-project.ipynb`) includes:

#### Data Processing:
- **Type Casting**: Converting string columns to appropriate data types
- **Temperature Conversion**: Kelvin to Celsius transformation
- **Feature Engineering**: Creating derived metrics (temperature ranges, daylight hours)
- **Time Extraction**: Year, month, day extraction from timestamps

#### Data Analysis:
- **Aggregations**: Average temperature by day, weather condition counts
- **Grouping**: Statistics by weather type and time periods
- **Categorization**: Temperature binning (Cold, Moderate, Hot)
- **Quality Checks**: Null value handling and data validation


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  col, round
from pyspark.sql.functions import to_timestamp, year, month, dayofmonth

In [0]:
# Datbricks creats session automatically we can see using 
# spark
# Manually creating
spark = SparkSession.builder \
    .appName("WeatherDataProcessing") \
    .getOrCreate()


In [0]:
ACCESS_KEY = "AKIAUYXOGVO7D65JFF4A"
SECRET_KEY = "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"
BUCKET_NAME = "openweather-etl-extracted-data"
FILE_PATH = "s3a://" + BUCKET_NAME + "/*.csv"


spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata") # as by default to_timestamp() parses the timezone and converts it to UTC.
# spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
# spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
# spark.conf.set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com")
# spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")

spark.conf.set("fs.s3a.access.key", ACCESS_KEY) 
spark.conf.set("fs.s3a.secret.key", SECRET_KEY)
spark.conf.set("fs.s3a.endpoint", "s3.ap-south-1.amazonaws.com")
spark.conf.set("com.amazonaws.services.s3.enableV4", "true") 
df_raw = spark.read.option("header", True).csv(FILE_PATH)

In [0]:
df = df_raw.select(
    col("City"),
    col("Country"),
    col("Weather_main"),
    col("Weather_subtype"),
    col("Temperature").cast("double"),
    col("Feels_Like").cast("double"),
    col("Min_Temp").cast("double"),
    col("Max_Temp").cast("double"),
    col("Pressure").cast("int"),
    col("Humidity").cast("int"),
    col("Visibility").cast("int"),
    col("Wind_speed").cast("double"),
    col("cloudiness_percent").cast("int"),
    col("Rain_mm_hour").cast("double"),
    col("Snow_mm_hour").cast("double"),
    col("Time").cast("long"),
    col("Timezone_offset").cast("int"),
    to_timestamp("Time_Recorded_local").alias("Time_Recorded_local"),
    to_timestamp("Sunrise_local").alias("Sunrise_local"),
    to_timestamp("Sunset_local").alias("Sunset_local")
)

# Alternate way
# df = df_raw.withColumn("Col-A" ,col("ColA").cast("type"))\
#     .withColumn("Col-B" ,col("ColB").cast("type"))\
#     .drop(ColA)\
#     .drop(ColB)

df.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Weather_main: string (nullable = true)
 |-- Weather_subtype: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Feels_Like: double (nullable = true)
 |-- Min_Temp: double (nullable = true)
 |-- Max_Temp: double (nullable = true)
 |-- Pressure: integer (nullable = true)
 |-- Humidity: integer (nullable = true)
 |-- Visibility: integer (nullable = true)
 |-- Wind_speed: double (nullable = true)
 |-- cloudiness_percent: integer (nullable = true)
 |-- Rain_mm_hour: double (nullable = true)
 |-- Snow_mm_hour: double (nullable = true)
 |-- Time: long (nullable = true)
 |-- Timezone_offset: integer (nullable = true)
 |-- Time_Recorded_local: timestamp (nullable = true)
 |-- Sunrise_local: timestamp (nullable = true)
 |-- Sunset_local: timestamp (nullable = true)



In [0]:

# df_raw.columns
num_rows = df.count()
num_cols = len(df.columns)
df_shape = (num_rows, num_cols) # doing similar to .shape() in pandas
print(df_shape)

(220, 20)


In [0]:
display(df.describe()) 

summary,City,Country,Weather_main,Weather_subtype,Temperature,Feels_Like,Min_Temp,Max_Temp,Pressure,Humidity,Visibility,Wind_speed,cloudiness_percent,Rain_mm_hour,Snow_mm_hour,Time,Timezone_offset
count,220,220,220,220,220.0,220.0,220.0,220.0,220.0,220.0,220.0,220.0,220.0,220.0,0.0,220.0,220.0
mean,,,,,299.3771818181818,301.1885909090909,299.3771818181818,299.3771818181818,1007.5818181818182,79.44545454545455,10000.0,3.474545454545455,99.35,0.0682727272727272,,1754574061.2863636,19800.0
stddev,,,,,1.459965053574498,2.8295158783532286,1.459965053574498,1.459965053574498,1.215439625309676,3.9101468835444657,0.0,0.6886531575767526,1.3208029936160814,0.1280971313179524,,9941.138019737327,0.0
min,Pune,IN,Clouds,light rain,297.68,298.41,297.68,297.68,1006.0,71.0,10000.0,2.41,96.0,0.0,,1754557183.0,19800.0
max,Pune,IN,Rain,overcast clouds,301.83,307.33,301.83,301.83,1009.0,85.0,10000.0,4.46,100.0,0.5,,1754589448.0,19800.0


#### Using SparkSQL to do the above operation

In [0]:
df.createOrReplaceTempView("my_table")

In [0]:
%sql 
Select count(*) from my_table

count(1)
175


In [0]:
# df.show(5, truncate=False)
display(df.limit(5)) # alias df.display() # help(display)


City,Country,Weather_main,Weather_subtype,Temperature,Feels_Like,Min_Temp,Max_Temp,Pressure,Humidity,Visibility,Wind_speed,cloudiness_percent,Rain_mm_hour,Snow_mm_hour,Time,Timezone_offset,Time_Recorded_local,Sunrise_local,Sunset_local
Pune,IN,Clouds,overcast clouds,301.83,306.44,301.83,301.83,1006,77,10000,3.75,100,0.0,,1754557183,19800,2025-08-07T14:29:43.000+0530,2025-08-07T06:14:02.000+0530,2025-08-07T19:06:44.000+0530
Pune,IN,Clouds,overcast clouds,301.83,306.44,301.83,301.83,1006,77,10000,3.75,100,0.0,,1754557183,19800,2025-08-07T14:29:43.000+0530,2025-08-07T06:14:02.000+0530,2025-08-07T19:06:44.000+0530
Pune,IN,Clouds,overcast clouds,301.83,306.44,301.83,301.83,1006,77,10000,3.75,100,0.0,,1754557183,19800,2025-08-07T14:29:43.000+0530,2025-08-07T06:14:02.000+0530,2025-08-07T19:06:44.000+0530
Pune,IN,Clouds,overcast clouds,301.83,306.44,301.83,301.83,1006,77,10000,3.75,100,0.0,,1754557183,19800,2025-08-07T14:29:43.000+0530,2025-08-07T06:14:02.000+0530,2025-08-07T19:06:44.000+0530
Pune,IN,Clouds,overcast clouds,301.83,306.44,301.83,301.83,1006,77,10000,3.75,100,0.0,,1754557183,19800,2025-08-07T14:29:43.000+0530,2025-08-07T06:14:02.000+0530,2025-08-07T19:06:44.000+0530


In [0]:
display(df.describe())

summary,City,Country,Weather_main,Weather_subtype,Temperature,Feels_Like,Min_Temp,Max_Temp,Pressure,Humidity,Visibility,Wind_speed,cloudiness_percent,Rain_mm_hour,Snow_mm_hour,Time,Timezone_offset
count,105,105,105,105,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,105.0,0.0,105.0,105.0
mean,,,,,300.7225714285714,303.74514285714287,300.7225714285714,300.7225714285714,1006.4666666666668,76.27619047619048,10000.0,4.112666666666667,98.63809523809525,0.0545714285714285,,1754565055.9142857,19800.0
stddev,,,,,0.9317033909947828,2.026543625470248,0.9317033909947828,0.9317033909947828,0.666025332543535,3.163783148308966,0.0,0.2922139188316913,1.6415506829484894,0.1366215193900271,,5537.501594261263,0.0
min,Pune,IN,Clouds,light rain,298.6,299.34,298.6,298.6,1006.0,71.0,10000.0,3.49,96.0,0.0,,1754557183.0,19800.0
max,Pune,IN,Rain,overcast clouds,301.83,307.33,301.83,301.83,1008.0,83.0,10000.0,4.46,100.0,0.5,,1754574731.0,19800.0


In [0]:
df.select("Temperature", "Wind_speed","Time_Recorded_local").printSchema()

root
 |-- Temperature: double (nullable = true)
 |-- Wind_speed: double (nullable = true)
 |-- Time_Recorded_local: timestamp (nullable = true)



###  Dealing with Nulls

##### null count for each column, Dropping those with many such values

In [0]:
from pyspark.sql.functions import  sum
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).display()


In [0]:
# note df = df.dropna(subset=["col1","col2"])  # Drop rows where 'col1', "col2" has missing values
df= df.drop("Snow_mm_hour")
len(df.columns) # 

Out[99]: 19

In [0]:
df.select("Temperature", "Feels_Like", "Min_Temp", "Max_Temp").show(3)
# df.select("Temperature", "Feels_Like", "Min_Temp", "Max_Temp").limi(3).display()


###  Column Transformations


##### Convert temperature from Kelvin to Celsius

In [0]:
df = df.withColumn("Temp_C", round(col("Temperature") - 273.15, 2))\
       .withColumn("Feels_Like_C", round(col("Feels_Like") - 273.15, 2))\
       .withColumn("Min_Temp_C", round(col("Min_Temp") - 273.15, 2))\
       .withColumn("Max_Temp_C", round(col("Max_Temp") - 273.15, 2))

##### Create a new column for temperature range

In [0]:
df = df.withColumn("Temp_Range", col("Max_Temp") - col("Min_Temp"))


### Date/Time Transformations
##### Calculate daylight duration in minutes

In [0]:
from pyspark.sql.functions import (unix_timestamp)

df = df.withColumn(
    "Daylight_Hours",
    (unix_timestamp("Sunset_local") - unix_timestamp("Sunrise_local")) / 3600
)

##### Extract Year and Month, Day from timestamp

In [0]:

df = df.withColumn("Year", year("Time_Recorded_local")) \
       .withColumn("Month", month("Time_Recorded_local")) \
       .withColumn("Day", dayofmonth("Time_Recorded_local"))

###  Grouping and Aggregations

##### Average Temperature by Day



In [0]:
df.groupBy("Month","Day").avg("Temp_C").orderBy("Month","Day").show()
# df.groupBy("Month").avg("Temp_C").orderBy("Month").show()  # for Average temp by Month

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3815046530871904>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mgroupBy[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mDay[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mavg[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mTemp_C[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241m.[39morderBy([38;5;124m"[39m[38;5;124mDay[39m[38;5;124m"[39m)[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[

##### Average temperature per city

In [0]:
df.groupBy("City").avg("Temperature").show()

+----+-----------------+
|City| avg(Temperature)|
+----+-----------------+
|Pune|299.7942857142857|
+----+-----------------+



##### Count by Weather Conditions 

In [0]:
df.groupBy("Weather_main").count().show()

+------------+-----+
|Weather_main|count|
+------------+-----+
|      Clouds|  118|
|        Rain|   57|
+------------+-----+



##### Maximum wind speed per weather type

In [0]:
df.groupBy("Weather_main").max("Wind_speed").show()


+------------+---------------+
|Weather_main|max(Wind_speed)|
+------------+---------------+
|      Clouds|           4.46|
|        Rain|           4.18|
+------------+---------------+



### Categorization / Binning

##### Categorize temperature into ranges

In [0]:
from pyspark.sql.functions import when

df = df.withColumn("Temp_Category", 
    when(col("Temperature_C") < 10, "Cold")
   .when(col("Temperature_C") < 25, "Moderate")
   .otherwise("Hot")
)


In [0]:
# Save final dataframe back to S3 as a single Parquet file and  CSV 
# df.coalesce(1).write.mode("overwrite").parquet("s3a://your-public-bucket-name/output1/")
# df.coalesce(1).write.mode("overwrite").parquet("s3a://your-public-bucket-name/output2/")