In [6]:
import subprocess
import sys

try:
    import dask
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "dask[complete]"])

In [12]:
import dask.dataframe as dd
import os
from datetime import datetime, timedelta
import logging

# Set up logging for Airflow
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define default parameters for Jupyter; Airflow will override
try:
    parameters
except NameError:
    parameters = {"cwd": "."}

# === Calculate previous month range ===
today = datetime.today()
first_of_this_month = datetime(today.year, today.month, 1)
last_of_prev_month = first_of_this_month - timedelta(days=1)
first_of_prev_month = datetime(last_of_prev_month.year, last_of_prev_month.month, 1)

start_date = first_of_prev_month.strftime("%Y-%m-%d")
end_date = last_of_prev_month.strftime("%Y-%m-%d")
month_str = first_of_prev_month.strftime("%Y_%m")

logger.info(f"📅 Loading weather data for: {start_date} to {end_date}")

# Load raw daily weather data
cwd = parameters.get("cwd", ".")
file_path = os.path.join(cwd, f"raw_weather_data_{month_str}.csv")

# Check if file exists
if not os.path.exists(file_path):
    logger.error(f"Input file not found: {file_path}")
    raise FileNotFoundError(f"Input file not found: {file_path}")

try:
    daily_df = dd.read_csv(file_path, parse_dates=["date"])
    logger.info(f"Successfully loaded {file_path}")
except Exception as e:
    logger.error(f"Failed to load {file_path}: {str(e)}")
    raise

# Preview (optional for Jupyter, skipped in Airflow)
if not hasattr(parameters, "airflow"):  # Check if running in Airflow
    logger.info("Preview of loaded data:")
    print(daily_df.head())

Unnamed: 0,district,date,temp_max,temp_min,precipitation,humidity,sunshine,month
0,Abim,2025-06-01,28.1,16.7,0.1,67,40393.03,2025_06
1,Abim,2025-06-02,30.4,17.5,0.0,55,40393.94,2025_06
2,Abim,2025-06-03,29.4,18.1,5.9,67,38380.78,2025_06
3,Abim,2025-06-04,28.5,17.2,1.0,72,40509.11,2025_06
4,Abim,2025-06-05,27.4,17.5,14.0,76,33168.99,2025_06


In [14]:
print(daily_df.dtypes)


district                 object
date             datetime64[ns]
temp_max                float64
temp_min                float64
precipitation           float64
humidity                  int64
sunshine                float64
month                    object
dtype: object


In [16]:
import dask.dataframe as dd
import os
import logging

# Set up logging for Airflow
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define default parameters for Jupyter; Airflow will override
try:
    parameters
except NameError:
    parameters = {"cwd": "."}

# Assume month_str is provided (e.g., from previous task) or define it
try:
    month_str
except NameError:
    from datetime import datetime, timedelta
    today = datetime.today()
    first_of_this_month = datetime(today.year, today.month, 1)
    last_of_prev_month = first_of_this_month - timedelta(days=1)
    first_of_prev_month = datetime(last_of_prev_month.year, last_of_prev_month.month, 1)
    month_str = first_of_prev_month.strftime("%Y_%m")
    logger.info(f"📅 Using month_str: {month_str}")

# Check if daily_df is defined
try:
    daily_df
except NameError:
    logger.error("daily_df is not defined. Please provide a Dask DataFrame with daily weather data.")
    raise ValueError("daily_df is required.")

# Validate required columns
required_columns = ["district", "date", "temp_max", "temp_min", "humidity", "precipitation", "sunshine"]
if not all(col in daily_df.columns for col in required_columns):
    logger.error(f"Missing required columns in daily_df. Found: {list(daily_df.columns)}, Required: {required_columns}")
    raise ValueError("Invalid DataFrame schema")

# Extract year and month
daily_df["year"] = daily_df["date"].dt.year
daily_df["month"] = daily_df["date"].dt.month

# Aggregate data
try:
    agg_df = daily_df.groupby(["district", "year", "month"]).agg({
        "temp_max": "mean",
        "temp_min": "mean",
        "humidity": "mean",
        "precipitation": "sum",
        "sunshine": "sum"
    }).rename(columns={
        "temp_max": "avg_temp_max",
        "temp_min": "avg_temp_min",
        "humidity": "avg_humidity",
        "precipitation": "total_precipitation",
        "sunshine": "total_sunshine_seconds"
    })

    # Add derived column for sunshine hours
    agg_df = agg_df.assign(
        total_sunshine_hours=(agg_df["total_sunshine_seconds"] / 3600).round(2)
    ).drop(columns="total_sunshine_seconds")

    # Round numeric values
    agg_df = agg_df.map_partitions(lambda df: df.round(2))

    # Compute to Pandas DataFrame
    agg_df = agg_df.reset_index().compute()
    logger.info("Aggregation and computation completed successfully")
except Exception as e:
    logger.error(f"Failed during aggregation or computation: {str(e)}")
    raise

# Save to CSV
cwd = parameters.get("cwd", ".")
output_path = os.path.join(cwd, f"weather_district_monthly_{month_str}.csv")
try:
    agg_df.to_csv(output_path, index=False)
    logger.info(f"Data saved to {output_path}")
except Exception as e:
    logger.error(f"Failed to save CSV to {output_path}: {str(e)}")
    raise

# Preview (optional for Jupyter, skipped in Airflow)
if not hasattr(parameters, "airflow"):  # Check if running in Airflow
    logger.info("Preview of aggregated data:")
    print(agg_df.head())

   district  year  month  avg_temp_max  avg_temp_min  avg_humidity  \
0      Abim  2025      6         28.39         17.58         67.70   
1  Adjumani  2025      6         30.94         19.57         69.17   
2     Agago  2025      6         30.82         18.78         66.00   
3  Alebtong  2025      6         29.41         18.54         71.87   
4  Amolatar  2025      6         28.23         19.78         74.17   

   total_precipitation  total_sunshine_hours  
0                 92.1                315.87  
1                 82.7                324.84  
2                 40.3                314.71  
3                106.3                312.32  
4                 76.0                315.23  


In [21]:
import dask.dataframe as dd
import os
import logging
from datetime import datetime, timedelta

# Set up logging for Airflow
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define default parameters for Jupyter; Airflow will override
try:
    parameters
except NameError:
    parameters = {"cwd": "."}

# Define month_str if not provided
try:
    month_str
except NameError:
    today = datetime.today()
    first_of_this_month = datetime(today.year, today.month, 1)
    last_of_prev_month = first_of_this_month - timedelta(days=1)
    first_of_prev_month = datetime(last_of_prev_month.year, last_of_prev_month.month, 1)
    month_str = first_of_prev_month.strftime("%Y_%m")
    logger.info(f"📅 Using month_str: {month_str}")

# Check if daily_df is defined
try:
    daily_df
except NameError:
    logger.error("daily_df is not defined. Please provide a Dask DataFrame with daily weather data.")
    raise ValueError("daily_df is required.")

# Validate required columns
required_columns = ["date", "temp_max", "temp_min", "humidity", "precipitation", "sunshine"]
if not all(col in daily_df.columns for col in required_columns):
    logger.error(f"Missing required columns in daily_df. Found: {list(daily_df.columns)}, Required: {required_columns}")
    raise ValueError("Invalid DataFrame schema")

# Validate date column type
if daily_df["date"].dtype != "datetime64[ns]":
    logger.error("date column must be datetime64[ns]")
    raise ValueError("Invalid date column type")

# Extract year and month
daily_df["year"] = daily_df["date"].dt.year
daily_df["month"] = daily_df["date"].dt.month

# National-level aggregation
try:
    national_df = daily_df.groupby(["year", "month"]).agg({
        "temp_max": "mean",
        "temp_min": "mean",
        "humidity": "mean",
        "precipitation": "sum",
        "sunshine": "sum"
    }).rename(columns={
        "temp_max": "avg_temp_max",
        "temp_min": "avg_temp_min",
        "humidity": "avg_humidity",
        "precipitation": "total_precipitation",
        "sunshine": "total_sunshine_seconds"
    })

    # Compute sunshine hours
    national_df = national_df.assign(
        total_sunshine_hours=(national_df["total_sunshine_seconds"] / 3600).round(2)
    ).drop(columns="total_sunshine_seconds")

    # Round numeric columns
    national_df = national_df.map_partitions(lambda df: df.round(2))

    # Compute to Pandas DataFrame
    national_df = national_df.reset_index().compute()
    logger.info("National aggregation and computation completed successfully")
except Exception as e:
    logger.error(f"Failed during aggregation or computation: {str(e)}")
    raise

# Save to CSV
cwd = parameters.get("cwd", ".")
output_path = os.path.join(cwd, f"weather_national_monthly_{month_str}.csv")
try:
    national_df.to_csv(output_path, index=False)
    logger.info(f"Data saved to {output_path}")
except Exception as e:
    logger.error(f"Failed to save CSV to {output_path}: {str(e)}")
    raise

# Preview (optional for Jupyter, skipped in Airflow)
if not hasattr(parameters, "airflow"):  # Check if running in Airflow
    logger.info("Preview of national aggregated data:")
    print(national_df.head())