# Weather Data Pipeline Project

This notebook demonstrates an end-to-end data pipeline using Jupyter for collecting, cleaning, transforming, loading, and analyzing weather data.


# Step 1: Configuration

Create a config/settings.py file to hold the API key, city, and database connection details.

In [27]:
# OpenWeatherMap API Key
API_KEY = "58baf0add2f0a3c410f41ba99ef81e62"

# City for which you want to collect weather data
CITY = "Nairobi"

# Database URI for PostgreSQL connection
DATABASE_URI = "postgresql+psycopg2://user:hellen@localhost:5432/weather"

# Step 2: Data Ingestion

This cell fetches weather data from the OpenWeatherMap API and saves it as a CSV file.

In [28]:
import requests
import pandas as pd
from datetime import datetime
from config.settings import API_KEY, CITY

def fetch_weather_data():
    url = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    
    # Check if the request was successful
    if response.status_code != 200:
        print(f"Failed to fetch data. Status code: {response.status_code}")
        print("Response:", response.json())
        return pd.DataFrame()  # Return an empty DataFrame if there was an error
    
    data = response.json()
    
    # Verify that the required fields are in the data
    if "main" not in data or "weather" not in data:
        print("Expected data fields are missing in the response.")
        print("Response data:", data)
        return pd.DataFrame()  # Return an empty DataFrame if data is incomplete

    # Extract relevant fields
    weather = {
        "city": CITY,
        "timestamp": datetime.now(),
        "temperature": data["main"]["temp"],
        "humidity": data["main"]["humidity"],
        "weather": data["weather"][0]["description"]
    }
    return pd.DataFrame([weather])

# Ingest data and save to CSV for temporary storage
df = fetch_weather_data()

# Only save if data was successfully fetched
if not df.empty:
    df.to_csv("weather_data.csv", mode='a', header=False, index=False)
    print("Weather data fetched and saved.")
else:
    print("No data to save.")
df.head()


Failed to fetch data. Status code: 401
Response: {'cod': 401, 'message': 'Invalid API key. Please see https://openweathermap.org/faq#error401 for more info.'}
No data to save.


In [31]:
print(f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric")


http://api.openweathermap.org/data/2.5/weather?q=Nairobi&appid=YOUR_OPENWEATHERMAP_API_KEY&units=metric


# Step 3: Data Cleaning

This cell loads the ingested data, removes duplicates, and handles missing values.

In [None]:
def clean_data(file_path):
    df = pd.read_csv(file_path, names=["city", "timestamp", "temperature", "humidity", "weather"])
    df.drop_duplicates(inplace=True)
    df.dropna(inplace=True)
    return df

cleaned_df = clean_data("weather_data.csv")
print("Data cleaned.")
cleaned_df.head()


# Step 4: Data Transformation

This cell adds a derived column for temperature in Fahrenheit and converts timestamps to datetime format.

In [None]:
def transform_data(df):
    df['temp_fahrenheit'] = df['temperature'] * 9/5 + 32
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    return df

transformed_df = transform_data(cleaned_df)
print("Data transformed.")
transformed_df.head()


# Step 5: Load Data into Database

This cell loads the transformed data into a PostgreSQL database.

In [None]:
from sqlalchemy import create_engine
from config.settings import DATABASE_URI

def load_data_to_db(df):
    engine = create_engine(DATABASE_URI)
    df.to_sql("weather_data", engine, if_exists="append", index=False)
    print("Data loaded to database.")

load_data_to_db(transformed_df)


# Step 6: Data Analysis 

Use SQL queries for analysis on the database (you can do this in Python with pandas if preferred).

In [None]:
import sqlalchemy as db

engine = create_engine(DATABASE_URI)
connection = engine.connect()

# Monthly average temperature
monthly_avg_query = """
SELECT
    EXTRACT(MONTH FROM timestamp) AS month,
    AVG(temperature) AS avg_temperature
FROM weather_data
GROUP BY month
ORDER BY month;
"""
monthly_avg = pd.read_sql(monthly_avg_query, connection)
monthly_avg.plot(kind='bar', x='month', y='avg_temperature', title='Monthly Average Temperature')


# Step 7: Visualize Data in Jupyter 

Create visualizations directly within the notebook to analyze trends.

In [None]:
import matplotlib.pyplot as plt

# Daily temperature fluctuations
daily_fluctuations = transformed_df.resample('D', on='timestamp').agg({
    'temperature': ['min', 'max']
})
daily_fluctuations.columns = ['temp_min', 'temp_max']
daily_fluctuations['temp_variation'] = daily_fluctuations['temp_max'] - daily_fluctuations['temp_min']

plt.figure(figsize=(10, 5))
plt.plot(daily_fluctuations.index, daily_fluctuations['temp_variation'], marker='o')
plt.title('Daily Temperature Fluctuations')
plt.xlabel('Date')
plt.ylabel('Temperature Variation (°C)')
plt.grid(True)
plt.show()


# Summary
This notebook demonstrates an end-to-end data pipeline for weather data analysis, covering data ingestion, cleaning, transformation, loading to a database, and simple visualizations. 