In [0]:
import os
import requests
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor

# Use environment variable for API key
API_KEY = "f42013d0765087d965f0c96a5264f34b" 

# List of Nigerian states with their coordinates
states_coords = {
    "Abia": {"lat": 5.5200, "lon": 7.4900},
    "Adamawa": {"lat": 9.3000, "lon": 12.4600},
    "Akwa Ibom": {"lat": 5.0000, "lon": 7.9000},
    "Anambra": {"lat": 6.2100, "lon": 7.0700},
    "Bauchi": {"lat": 10.3100, "lon": 9.8300},
    "Bayelsa": {"lat": 4.9220, "lon": 6.2580},
    "Benue": {"lat": 7.7361, "lon": 8.5395},
    "Borno": {"lat": 11.8460, "lon": 13.1570},
    "Cross River": {"lat": 4.9580, "lon": 8.3260},
    "Delta": {"lat": 6.2056, "lon": 6.7259},
    "Ebonyi": {"lat": 6.3220, "lon": 8.1120},
    "Edo": {"lat": 6.3320, "lon": 5.6030},
    "Ekiti": {"lat": 7.6170, "lon": 5.2170},
    "Enugu": {"lat": 6.4433, "lon": 7.5130},
    "Gombe": {"lat": 10.2892, "lon": 11.1593},
    "Imo": {"lat": 5.4833, "lon": 7.0333},
    "Jigawa": {"lat": 12.2620, "lon": 9.3280},
    "Kaduna": {"lat": 10.5200, "lon": 7.4400},
    "Kano": {"lat": 12.0022, "lon": 8.5919},
    "Katsina": {"lat": 12.9933, "lon": 7.6111},
    "Kebbi": {"lat": 12.4536, "lon": 4.2023},
    "Kogi": {"lat": 7.8036, "lon": 6.7362},
    "Kwara": {"lat": 8.4900, "lon": 4.5500},
    "Lagos": {"lat": 6.5244, "lon": 3.3792},
    "Nasarawa": {"lat": 8.2043, "lon": 8.5069},
    "Niger": {"lat": 9.6167, "lon": 6.5500},
    "Ogun": {"lat": 7.1403, "lon": 3.3500},
    "Ondo": {"lat": 7.2500, "lon": 5.1900},
    "Osun": {"lat": 7.7750, "lon": 4.5600},
    "Oyo": {"lat": 7.3775, "lon": 3.8950},
    "Plateau": {"lat": 9.9200, "lon": 9.3200},
    "Rivers": {"lat": 4.8152, "lon": 7.0498},
    "Sokoto": {"lat": 13.0700, "lon": 5.2400},
    "Taraba": {"lat": 9.3200, "lon": 11.3800},
    "Yobe": {"lat": 11.9644, "lon": 11.8444},
    "Zamfara": {"lat": 12.1642, "lon": 6.6613},
    "FCT (Abuja)": {"lat": 9.0579, "lon": 7.4951}
}

# Function to fetch weather data
def fetch_weather_data(state, lat, lon):
    try:
        # Weather API
        weather_url = f"http://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={API_KEY}&units=metric"
        weather_response = requests.get(weather_url, timeout=10)
        weather_response.raise_for_status()
        weather_data = weather_response.json()

        # Air Quality API
        aqi_url = f"http://api.openweathermap.org/data/2.5/air_pollution?lat={lat}&lon={lon}&appid={API_KEY}"
        aqi_response = requests.get(aqi_url, timeout=10)
        aqi_response.raise_for_status()
        aqi_data = aqi_response.json()

        return {
            "state": state,
            "lat": lat,
            "lon": lon,
            "weather": weather_data.get("weather", [{}])[0].get("description", "N/A"),
            "temperature": weather_data.get("main", {}).get("temp", "N/A"),
            "feels_like": weather_data.get("main", {}).get("feels_like", "N/A"),
            "humidity": weather_data.get("main", {}).get("humidity", "N/A"),
            "pressure": weather_data.get("main", {}).get("pressure", "N/A"),
            "visibility": weather_data.get("visibility", "N/A"),
            "wind_speed": weather_data.get("wind", {}).get("speed", "N/A"),
            "wind_direction": weather_data.get("wind", {}).get("deg", "N/A"),
            "cloudiness": weather_data.get("clouds", {}).get("all", "N/A"),
            "precipitation": weather_data.get("rain", {}).get("1h", 0),  # Rainfall in mm
            "air_quality_index": aqi_data.get("list", [{}])[0].get("main", {}).get("aqi", "N/A"),
            "pm2_5": aqi_data.get("list", [{}])[0].get("components", {}).get("pm2_5", "N/A"),
            "pm10": aqi_data.get("list", [{}])[0].get("components", {}).get("pm10", "N/A"),
            "co": aqi_data.get("list", [{}])[0].get("components", {}).get("co", "N/A"),
            "no2": aqi_data.get("list", [{}])[0].get("components", {}).get("no2", "N/A"),
            "so2": aqi_data.get("list", [{}])[0].get("components", {}).get("so2", "N/A"),
            "o3": aqi_data.get("list", [{}])[0].get("components", {}).get("o3", "N/A"),
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }

    except (requests.exceptions.RequestException, KeyError) as e:
        return {
            "state": state,
            "lat": lat,
            "lon": lon,
            "error": str(e),
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }

# Fetch data in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
    weather_data_list = list(executor.map(lambda x: fetch_weather_data(x[0], x[1]["lat"], x[1]["lon"]), states_coords.items()))

# Convert to Pandas DataFrame
weather_df = pd.DataFrame(weather_data_list)
print(weather_df.head())

# Initialize Spark Session
spark = SparkSession.builder.appName("WeatherData").getOrCreate()

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(weather_df)

# Preview the data
spark_df.show()

# Convert to Pandas DataFrame
weather_df = pd.DataFrame(weather_data_list)
print(weather_df.head())  # Preview the data



In [0]:

# Convert Pandas DataFrame to Spark DataFrame
spark_weather_df = spark.createDataFrame(weather_df)


# Save to Delta Table
spark_weather_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("nigeria_complete_weather_data")