In [1]:
import boto3
import pandas as pd
import requests
import json
from datetime import datetime
import os
from dotenv import load_dotenv


In [2]:
# Load .env file
load_dotenv()

# --- CONFIGURATION ---
CITY = "Karachi"
API_KEY = os.getenv("API_KEY")  # OpenWeatherMap API Key
OUTPUT_FILE = "./historical_aqi_weather_data.json"
# --- COORDINATES FETCH ---
def fetch_coordinates():
    GEO_URL = f"http://api.openweathermap.org/geo/1.0/direct?q={CITY}&limit=1&appid={API_KEY}"
    response = requests.get(GEO_URL)
    response.raise_for_status()
    data = response.json()
    if not data:
        raise Exception("City not found.")
    return data[0]['lat'], data[0]['lon']

In [3]:
# --- CURRENT AQI FETCH ---
def fetch_current_aqi(lat, lon):
    AQI_URL = f"http://api.openweathermap.org/data/2.5/air_pollution?lat={lat}&lon={lon}&appid={API_KEY}"
    response = requests.get(AQI_URL)
    if response.status_code == 200:
        return response.json()
    else:
        print("Failed to fetch AQI data")
        return None

In [4]:
def fetch_current_weather(lat, lon):
    OWM_URL = (
        f"https://api.open-meteo.com/v1/forecast?"
        f"latitude={lat}&longitude={lon}"
        f"&current=temperature_2m,relative_humidity_2m,precipitation,wind_speed_10m,"
        f"wind_direction_10m,surface_pressure,cloud_cover,visibility,dew_point_2m,"
        f"apparent_temperature,shortwave_radiation,et0_fao_evapotranspiration"
        f"&timezone=auto"
    )
    
    response = requests.get(OWM_URL)
    if response.status_code == 200:
        data = response.json()
        current = data.get("current", {})
        
        # Get the actual timestamp from the API response
        current_time_str = current.get("time")
        if current_time_str:
            current_time = datetime.strptime(current_time_str, "%Y-%m-%dT%H:%M")
            timestamp = current_time.timestamp()
        else:
            timestamp = datetime.utcnow().timestamp()
        
        return {
            "temperature_2m": current.get("temperature_2m"),
            "relative_humidity_2m": current.get("relative_humidity_2m"),
            "precipitation": current.get("precipitation"),
            "wind_speed_10m": current.get("wind_speed_10m"),
            "wind_direction_10m": current.get("wind_direction_10m"),
            "surface_pressure": current.get("surface_pressure"),
            "cloudcover": current.get("cloud_cover"),
            "visibility": current.get("visibility"),
            "dew_point_2m": current.get("dew_point_2m"),
            "apparent_temperature": current.get("apparent_temperature"),
            "shortwave_radiation": current.get("shortwave_radiation"),
            "et0_fao_evapotranspiration": current.get("et0_fao_evapotranspiration"),
            "timestamp": timestamp
        }
    else:
        print(f"Failed to fetch weather data: {response.text}")
        return None

In [None]:
def append_to_csv(bucket_name, data_key, entry):
    """
    Extracts features from entry, appends them to an existing CSV file on S3,
    and uploads the updated file back.
    Works entirely in-memory without local file storage.
    """
    from io import StringIO

    # ---- Extract features ----
    aqi_data = entry.get('aqi', {})
    weather_data = entry.get('weather', {})

    coord = aqi_data.get('coord', {})
    aqi_list = aqi_data.get('list', [{}])[0]
    main_aqi = aqi_list.get('main', {}).get('aqi')
    components = aqi_list.get('components', {})
    dt_unix = aqi_list.get('dt')

    # Convert UNIX timestamp to datetime (UTC)
    aqi_dt = datetime.utcfromtimestamp(dt_unix) if dt_unix else None

    # Extract features
    year = aqi_dt.year if aqi_dt else None
    month = aqi_dt.month if aqi_dt else None
    day = aqi_dt.day if aqi_dt else None
    hour = aqi_dt.hour if aqi_dt else None


    new_entry = {
        'aqi_index': main_aqi,
        'co': components.get('co'),
        'no': components.get('no'),
        'no2': components.get('no2'),
        'o3': components.get('o3'),
        'so2': components.get('so2'),
        'pm2_5': components.get('pm2_5'),
        'pm10': components.get('pm10'),
        'nh3': components.get('nh3'),
        'temperature_2m': weather_data.get('temperature_2m'),
        'relative_humidity_2m': weather_data.get('relative_humidity_2m'),
        'precipitation': weather_data.get('precipitation'),
        'wind_speed_10m': weather_data.get('wind_speed_10m'),
        'wind_direction_10m': weather_data.get('wind_direction_10m'),
        'surface_pressure': weather_data.get('surface_pressure'),
        'cloudcover': weather_data.get('cloudcover'),
        'dew_point_2m': weather_data.get('dew_point_2m'),
        'apparent_temperature': weather_data.get('apparent_temperature'),
        'shortwave_radiation': weather_data.get('shortwave_radiation'),
        'et0_fao_evapotranspiration': weather_data.get('et0_fao_evapotranspiration'),
        'year': year,
        'month': month,
        'day': day,
        'hour': hour,
    }

    # ---- Connect to S3 ----
    s3 = boto3.client(
        's3',
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
        region_name=os.getenv('AWS_DEFAULT_REGION')
    )

    # ---- Download file from S3 directly to memory ----
    try:
        obj = s3.get_object(Bucket=bucket_name, Key=data_key)
        csv_content = obj['Body'].read().decode('utf-8')
        df = pd.read_csv(StringIO(csv_content))
        print(f"Loaded existing CSV from S3 with {len(df)} rows")
    except s3.exceptions.NoSuchKey:
        # If file not found, create new DataFrame
        print("S3 file not found, creating new CSV")
        df = pd.DataFrame()
    except Exception as e:
        print(f"Error reading from S3: {str(e)}, creating new CSV")
        df = pd.DataFrame()

    # ---- Append new row ----
    df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True)
    
    # ---- Upload updated file back to S3 (in-memory) ----
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    
    s3.put_object(
        Bucket=bucket_name,
        Key=data_key,
        Body=csv_buffer.getvalue(),
        ContentType='text/csv'
    )
    print(f"File successfully updated on S3 with {len(df)} total rows.")


In [6]:
# --- APPEND DATA TO S3 ---
def append_to_json_array(bucket_name, data_key, new_entry):
    """
    Appends a new entry to JSON array stored in S3.
    
    Args:
        bucket_name: S3 bucket name
        data_key: S3 object key for the JSON file
        new_entry: New data entry to append
    """
    # Initialize S3 client
    s3 = boto3.client(
        's3',
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
    )
    
    # Try to get existing data from S3
    try:
        obj = s3.get_object(Bucket=bucket_name, Key=data_key)
        data = json.loads(obj['Body'].read().decode('utf-8'))
        if not isinstance(data, list):
            data = []
    except (s3.exceptions.NoSuchKey, json.JSONDecodeError):
        # File doesn't exist or is not valid JSON
        data = []
    print(data[-1])
    # Append new entry
    data.append(new_entry)
    
    # Upload updated data back to S3
    s3.put_object(
        Bucket=bucket_name,
        Key=data_key,
        Body=json.dumps(data, indent=4),
        ContentType='application/json'
    )
    
    print(f"Successfully appended data to S3 bucket: {bucket_name}/{data_key}")


In [None]:
# --- MAIN --- #
if __name__ == "__main__":
    try:
        # S3 configuration
        bucket_name = 'my-feature-store-data'
        data_key = 'raw-data/historical_aqi_weather_data.json'
        lat, lon = fetch_coordinates()
        # print(f"Coordinates: {lat}, {lon}")
        bucket_name2 = 'my-feature-store-data'
        data_key2 = 'raw-data/aqi_weather_data.csv'
        
        aqi_data = fetch_current_aqi(lat, lon)
        weather_data = fetch_current_weather(lat, lon)

        if aqi_data and weather_data:
            entry = {
                "aqi": aqi_data,
                "weather": weather_data,
            }
            print(entry)
            append_to_json_array(bucket_name, data_key, entry)
            append_to_csv(bucket_name2, data_key2, entry)
            print(f"Successfully saved current data with agricultural parameters")
        else:
            print("Failed to collect complete dataset")
    except Exception as e:
        print(f"Critical error: {e}")


{'aqi': {'coord': {'lon': 67.0207, 'lat': 24.8547}, 'list': [{'main': {'aqi': 4}, 'components': {'co': 421.02, 'no': 0.25, 'no2': 4.78, 'o3': 100.71, 'so2': 8.7, 'pm2_5': 70.64, 'pm10': 168.08, 'nh3': 2.78}, 'dt': 1761106046}]}, 'weather': {'temperature_2m': 27.8, 'relative_humidity_2m': 52, 'precipitation': 0.0, 'wind_speed_10m': 10.8, 'wind_direction_10m': 37, 'surface_pressure': 1013.4, 'cloudcover': 0, 'visibility': 24140.0, 'dew_point_2m': 17.0, 'apparent_temperature': 28.7, 'shortwave_radiation': 449.0, 'et0_fao_evapotranspiration': 0.0, 'timestamp': 1761105600.0}}
{'aqi': {'coord': {'lon': 67.0207, 'lat': 24.8547}, 'list': [{'main': {'aqi': 4}, 'components': {'co': 237.68, 'no': 0, 'no2': 2.03, 'o3': 99.69, 'so2': 1.69, 'pm2_5': 43.95, 'pm10': 106.48, 'nh3': 0.65}, 'dt': 1761078539}]}, 'weather': {'temperature_2m': 25.5, 'relative_humidity_2m': 68, 'precipitation': 0.0, 'wind_speed_10m': 5.2, 'wind_direction_10m': 16, 'surface_pressure': 1011.2, 'cloudcover': 12, 'visibility': 2

  aqi_dt = datetime.utcfromtimestamp(dt_unix) if dt_unix else None


File successfully updated on S3.
Successfully saved current data with agricultural parameters
