<a href="https://colab.research.google.com/github/Shazizan/portfolio/blob/master/etl_vault_ps_realtime_openweathermap.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Real-Time Weather Data ETL Pipeline Using PySpark and OpenWeather API**

# **Getting Data Ready**

Gonna fetch the real-time data of weather here: https://home.openweathermap.org/api_keys
.
- get the key

## **Test The Key / Fetch the data using API key**

The output data will display in the json format.

In [36]:
import requests

API_KEY = "2c610b16a354e9d1a696c6a905f96321"
CITY = "Kuala Lumpur"
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"

response = requests.get(URL).json()
print(response)

{'coord': {'lon': 101.6865, 'lat': 3.1431}, 'weather': [{'id': 801, 'main': 'Clouds', 'description': 'few clouds', 'icon': '02n'}], 'base': 'stations', 'main': {'temp': 26.75, 'feels_like': 29.94, 'temp_min': 26.54, 'temp_max': 26.75, 'pressure': 1011, 'humidity': 89, 'sea_level': 1011, 'grnd_level': 998}, 'visibility': 9000, 'wind': {'speed': 0.51, 'deg': 0}, 'clouds': {'all': 20}, 'dt': 1759411432, 'sys': {'type': 1, 'id': 9446, 'country': 'MY', 'sunrise': 1759359601, 'sunset': 1759403108}, 'timezone': 28800, 'id': 1733046, 'name': 'Kuala Lumpur', 'cod': 200}


In [37]:
response

{'coord': {'lon': 101.6865, 'lat': 3.1431},
 'weather': [{'id': 801,
   'main': 'Clouds',
   'description': 'few clouds',
   'icon': '02n'}],
 'base': 'stations',
 'main': {'temp': 26.75,
  'feels_like': 29.94,
  'temp_min': 26.54,
  'temp_max': 26.75,
  'pressure': 1011,
  'humidity': 89,
  'sea_level': 1011,
  'grnd_level': 998},
 'visibility': 9000,
 'wind': {'speed': 0.51, 'deg': 0},
 'clouds': {'all': 20},
 'dt': 1759411432,
 'sys': {'type': 1,
  'id': 9446,
  'country': 'MY',
  'sunrise': 1759359601,
  'sunset': 1759403108},
 'timezone': 28800,
 'id': 1733046,
 'name': 'Kuala Lumpur',
 'cod': 200}

# **Set Up Spark Configuration**

## **Install Required Libraries**

In [38]:
!pip install requests
!pip install pyspark



start spark session - Engine for spark

In [39]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WeatherStreamETL").getOrCreate()

## **Define the Schema**

Insight:
- A schema is like a blueprint for the data. Example, "What kind of data I’m giving you" — "what each column is called and what type of value it has.”
- Why we need this? Because PySpark is built to handle massive data (like millions of rows). So it needs to know ahead of time, What are the column names & What data type each column is (number, text, etc.)

This tells Spark:
- id → whole number (Integer)
- value → whole number (Integer)
- category → text (String)
- True → means “this column can have empty (null) values”

This helps Spark:
- 🏎️ Process faster (it knows how to handle each column properly)
- 🚫 Avoid confusion (e.g., not mix numbers with text)
- ✅ Validate data (catch errors if a value doesn’t match the type)

In [40]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

schema = StructType([
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("feels_like", DoubleType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("weather", StringType(), True),
    StructField("description", StringType(), True),
    StructField("wind_speed", DoubleType(), True)
])

# **Extract / Fetch Data from API**

In [41]:
import requests
import pandas as pd

API_KEY = "2c610b16a354e9d1a696c6a905f96321"  # replace with my key
CITY = "Kuala Lumpur"
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"

def get_weather():
    response = requests.get(URL).json()
    data = {
        "city": [response.get("name", "")],
        "country": [response.get("sys", {}).get("country", "")],
        "temperature": [response.get("main", {}).get("temp", None)],
        "feels_like": [response.get("main", {}).get("feels_like", None)],
        "humidity": [response.get("main", {}).get("humidity", None)],
        "weather": [response.get("weather", [{}])[0].get("main", "")],
        "description": [response.get("weather", [{}])[0].get("description", "")],
        "wind_speed": [response.get("wind", {}).get("speed", None)]
    }
    return pd.DataFrame(data)

Insight:
- units=metric gives temperature in Celsius.
- The function returns a Pandas DataFrame ready to convert to PySpark.

# **Stream Data in Micro-Batches + Apply Transformation inside Batch**

Simulate streaming by fetching data every few seconds:

In [45]:
from pyspark.sql.functions import col
import time

for batch_num in range(5):  # 5 micro-batches
    print(f"Batch {batch_num+1}")
    pdf = get_weather()
    df = spark.createDataFrame(pdf, schema=schema)

    # transformation: only show temperatures below 30°C
    transformed = df.filter(col("temperature") < 30)
    transformed.show()

    time.sleep(5)  # wait 5 seconds to simulate streaming


Batch 1
+------------+-------+-----------+----------+--------+-------+-----------+----------+
|        city|country|temperature|feels_like|humidity|weather|description|wind_speed|
+------------+-------+-----------+----------+--------+-------+-----------+----------+
|Kuala Lumpur|     MY|      26.75|     29.94|      89| Clouds| few clouds|      0.51|
+------------+-------+-----------+----------+--------+-------+-----------+----------+

Batch 2
+------------+-------+-----------+----------+--------+-------+-----------+----------+
|        city|country|temperature|feels_like|humidity|weather|description|wind_speed|
+------------+-------+-----------+----------+--------+-------+-----------+----------+
|Kuala Lumpur|     MY|      26.75|     29.94|      89| Clouds| few clouds|      0.51|
+------------+-------+-----------+----------+--------+-------+-----------+----------+

Batch 3
+------------+-------+-----------+----------+--------+-------+-----------+----------+
|        city|country|temper

Insight:
- Here I'mfetching the data repeatedly in small batches (micro-batches)
- Each batch is treated like a “mini-stream” of data.
- The time.sleep(5) simulates real-time streaming.

where does micro-batching fit?
- Micro-batching: just the mechanism to feed data in chunks → this is part of the streaming/ETL workflow, not the transformation itself.

- Transformation: happens inside each batch, when filter, select, or manipulate the data.

# **Load**

Here, I want to load the processed data weather data into the target (GitHub) repo

## **1 - Install PyGithub**

In [46]:
!pip install PyGithub

Collecting PyGithub
  Downloading pygithub-2.8.1-py3-none-any.whl.metadata (3.9 kB)
Collecting pynacl>=1.4.0 (from PyGithub)
  Downloading pynacl-1.6.0-cp38-abi3-manylinux_2_34_x86_64.whl.metadata (9.4 kB)
Downloading pygithub-2.8.1-py3-none-any.whl (432 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m432.7/432.7 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pynacl-1.6.0-cp38-abi3-manylinux_2_34_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m50.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pynacl, PyGithub
Successfully installed PyGithub-2.8.1 pynacl-1.6.0


## **2 - Generate a GitHub Personal Access Token (PAT)**

- In Github, I'm gonna generate the personal token access.
- However, this token is confidential, which I must delete it before I can save this job.

## **3 - Import and Authenticate**

In [53]:
from github import Github, Auth

# Create authentication using Auth
auth = Auth.Token("REPLACE_WITH_YOUR_GITHUB_TOKEN")

# Pass auth object to Github
g = Github(auth=auth)

# Access your repo
repo = g.get_user().get_repo("pipeline-vault")


Insight:
- The Auth.Token("XXXXX") line uses my Personal Access Token (PAT) to log in to GitHub.
- This tells GitHub: “I’m Shazizan, and I allow this notebook to act on my behalf.”
- repo = g.get_user().get_repo("pipeline-vault") - This connects to my repo named & now can interact with that repo programmatically
- 👉 Nothing is uploaded yet — it’s just like signing in.


## **4 - Create a temporary folder & Convert PySpark DataFrame to CSV**

- GitHub cannot accept a PySpark DataFrame directly — it only accepts files (CSV, JSON, etc.)
- This step creates a real CSV file that you can upload programmatically.

In [54]:
import os

# Make a temporary folder
os.makedirs("temp_data", exist_ok=True)

# Save as CSV locally (one file for simplicity)
transformed.toPandas().to_csv("temp_data/weather_data.csv", index=False)

Insight:
- os.makedirs("temp_data") → creates a folder named temp_data in your Colab environment.
- exist_ok=True → if the folder already exists, don’t throw an error.
- ✅ Purpose: to have a place to save your CSV file temporarily before uploading to GitHub.
- transformed → your PySpark DataFrame (the output after your “transformation” step).
- .toPandas() → converts the PySpark DataFrame into a Pandas DataFrame (because PySpark cannot directly write files in this simple way).
- .to_csv("temp_data/weather_data.csv", index=False) → saves the Pandas DataFrame as a CSV file inside the folder temp_data.

## **5 - Push the CSV to GitHub**

In [55]:
# Read the CSV content
with open("temp_data/weather_data.csv", "r") as file:
    content = file.read()

# File path inside repo
github_path = "weather_data.csv"  # this will appear in the repo root

# Check if file exists in repo
try:
    existing_file = repo.get_contents(github_path)
    # Update the file if it exists
    repo.update_file(existing_file.path, "Update weather data", content, existing_file.sha)
    print("✅ File updated in GitHub repo")
except:
    # Create a new file if it doesn't exist
    repo.create_file(github_path, "Add weather data", content)
    print("✅ File created in GitHub repo")

✅ File created in GitHub repo


Insight:
- The CSV in temp_data is like a staging area.
- The GitHub code here transfers that staged CSV into my target repo — effectively “pushing” my transformed ETL output.
- GitHub itself does not see or know about temp_data — it only sees the file after it is uploaded/committed.

✅ **Key point to remember for this ETL experiment:**

- **Extract:** `pdf = get_weather()` → fetching raw API data  
- **Transform:** `transformed = df.filter(col("temperature") < 30)` → filtering rows  
- **Load:** saving or pushing `transformed` to GitHub or CSV  