# 🌦️ Weather Data Analytics Pipeline with AWS

## 📌 Step 1: Fetch Raw Weather Data from Open-Meteo API
This script:
- Installs required packages
- Requests hourly and daily data for multiple cities
- Processes and merges the data
- Formats data (adds `is_day`, separates `date_only`, etc.)
- Uploads two CSVs (hourly and daily) to `S3://weather-data-bucket-test1/rawData/`

In [None]:
import sys
import subprocess

# Install required dependencies at runtime
subprocess.run([sys.executable, "-m", "pip", "install", "boto3", "openmeteo_requests", "requests_cache", "pandas", "retry_requests"])

In [None]:
import openmeteo_requests
import requests_cache
import pandas as pd
import boto3
from retry_requests import retry
from datetime import datetime
from io import StringIO

# Setup Open-Meteo API client
cache_session = requests_cache.CachedSession('.cache', expire_after=3600)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
openmeteo = openmeteo_requests.Client(session=retry_session)

# API URL & Parameters
url = "https://api.open-meteo.com/v1/forecast"

# Add location and coordinates
location_map = {
    'New Haven': [41.3081, -72.9282],
    'New York': [40.7143, -74.006],
    'Boston': [42.3584,-71.0598],
    'Los Angeles': [34.0522,-118.2437],
    'Las Vegas': [36.175,-115.1372],
    'San Francisco': [37.7749,-122.4194],
    'Washington D.C': [38.8951,-77.0364]
}

# Collect data
hourly_data_list = []
daily_data_list = []

for location, coords in location_map.items():
    params = {
        "latitude": coords[0],
        "longitude": coords[1],
        "hourly": ["temperature_2m", "rain", "snow_depth", "cloud_cover"],
        "daily": ["temperature_2m_max", "temperature_2m_min", "uv_index_max"],
        "timezone": "America/New_York"
    }
    responses = openmeteo.weather_api(url, params=params)
    response = responses[0]

    # Hourly data
    hourly = response.Hourly()
    hourly_data = {
        "date": pd.date_range(start=pd.to_datetime(hourly.Time(), unit="s", utc=True),
                              end=pd.to_datetime(hourly.TimeEnd(), unit="s", utc=True),
                              freq=pd.Timedelta(seconds=hourly.Interval()), inclusive="left"),
        "temperature_2m": hourly.Variables(0).ValuesAsNumpy(),
        "rain": hourly.Variables(1).ValuesAsNumpy(),
        "snow_depth": hourly.Variables(2).ValuesAsNumpy(),
        "cloud_cover": hourly.Variables(3).ValuesAsNumpy(),
    }
    hourly_df = pd.DataFrame(data=hourly_data)
    hourly_df["location"] = location
    hourly_data_list.append(hourly_df)

    # Daily data
    daily = response.Daily()
    daily_data = {
        "date": pd.date_range(start=pd.to_datetime(daily.Time(), unit="s", utc=True),
                              end=pd.to_datetime(daily.TimeEnd(), unit="s", utc=True),
                              freq=pd.Timedelta(seconds=daily.Interval()), inclusive="left"),
        "temperature_2m_max": daily.Variables(0).ValuesAsNumpy(),
        "temperature_2m_min": daily.Variables(1).ValuesAsNumpy(),
        "uv_index_max": daily.Variables(2).ValuesAsNumpy(),
    }
    daily_df = pd.DataFrame(data=daily_data)
    daily_df["location"] = location
    daily_data_list.append(daily_df)

# Merge and format
hourly_df = pd.concat(hourly_data_list, ignore_index=True)
daily_df = pd.concat(daily_data_list, ignore_index=True)

hourly_df["date_only"] = hourly_df["date"].dt.date
hourly_df["time_only"] = hourly_df["date"].dt.time
hourly_df["is_day"] = 0
hourly_df["time_only_dt"] = pd.to_datetime(hourly_df["time_only"], format="%H:%M:%S")
hourly_df.loc[(hourly_df["time_only_dt"].dt.hour >= 0) & (hourly_df["time_only_dt"].dt.hour < 12), "is_day"] = 1
hourly_df = hourly_df.drop(columns=["time_only_dt", "date"])

daily_df["date_only"] = daily_df["date"].dt.date
daily_df["time_only"] = daily_df["date"].dt.time
daily_df = daily_df.drop(columns=["date"])

# Upload to S3
csv_buffer_hourly = StringIO()
csv_buffer_daily = StringIO()
hourly_df.to_csv(csv_buffer_hourly, index=False)
daily_df.to_csv(csv_buffer_daily, index=False)

s3 = boto3.client("s3")
bucket = "weather-data-bucket-test1"
folder = "rawData/"
s3.put_object(Bucket=bucket, Key=f"{folder}hourly_data.csv", Body=csv_buffer_hourly.getvalue())
s3.put_object(Bucket=bucket, Key=f"{folder}daily_data.csv", Body=csv_buffer_daily.getvalue())
print("Uploaded both CSVs to S3.")


## 🛠️ Step 2: Convert CSVs to Parquet using PySpark
This script reads the uploaded raw CSV files, transforms them into efficient Parquet format, and saves the results in `s3://weather-data-bucket-test1/processedData/`.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSV_to_Parquet_Converter").getOrCreate()

S3_RAW_PATH = "s3://weather-data-bucket-test1/rawData/"
S3_PROCESSED_PATH = "s3://weather-data-bucket-test1/processedData/"

hourly_df = spark.read.option("header", "true").csv(S3_RAW_PATH + "hourly_data.csv")
daily_df = spark.read.option("header", "true").csv(S3_RAW_PATH + "daily_data.csv")

hourly_df.write.mode("overwrite").parquet(S3_PROCESSED_PATH + "hourly/")
daily_df.write.mode("overwrite").parquet(S3_PROCESSED_PATH + "daily/")

print("CSV to Parquet conversion completed successfully!")

## 🔄 Step 3: Crawler & Athena
- An AWS Glue Crawler scans the `processedData/` Parquet files and maps schema into a Glue Data Catalog.
- Amazon Athena then queries the tables `plshourly` and `plsdaily` from the catalog.
- One important thing to ensure in this step is that the workgroup needs a valid query result S3 location configured.

## 📊 Step 4: Power BI Integration
- Now using the Athena ODBC driver in Power BI we can connect to Athena.
- Load the `plsdaily` and `plshourly` tables.
- Consume the data in the form of dashboards with drill-downs, trend charts, and daily/hourly comparisons on Power BI.