In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests, csv, os

In [None]:


def fetch_actual_weather():
    yesterday = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
    lat, lon = 52.2053, 0.1218
    url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&start_date={yesterday}&end_date={yesterday}&daily=temperature_2m_max,temperature_2m_min&timezone=Europe%2FLondon"
    data = requests.get(url).json()
   
    with open('actual_weather.csv', 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['date', 'temp_max', 'temp_min'])
        writer.writerow([
            data['daily']['time'][0],
            data['daily']['temperature_2m_max'][0],
            data['daily']['temperature_2m_min'][0]
        ])

def load_forecast():
    # Just a placeholder to simulate loading
    pass

def compare_weather():
    with open('actual_weather.csv') as f1, open('forecast_weather.csv') as f2:
        actual = list(csv.DictReader(f1))[0]
        forecast = list(csv.DictReader(f2))[0]

    summary = f"""Weather Comparison for {actual['date']}:
    Forecast Max: {forecast['temp_max']}째C | Actual Max: {actual['temp_max']}째C
    Forecast Min: {forecast['temp_min']}째C | Actual Min: {actual['temp_min']}째C
    """

    with open('weather_summary.txt', 'w') as f:
        f.write(summary)

def delete_forecast():
    os.remove('forecast_weather.csv')

default_args = {
    'owner': 'joe',
    'start_date': datetime(2025, 10, 4),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='weather_truth_vs_forecast',
    default_args=default_args,
    schedule_interval='0 2 * * *',
    catchup=False,
    tags=['weather', 'comparison'],
) as dag:

    t1 = PythonOperator(task_id='fetch_actual_weather', python_callable=fetch_actual_weather)
    t2 = PythonOperator(task_id='load_forecast', python_callable=load_forecast)
    t3 = PythonOperator(task_id='compare_weather', python_callable=compare_weather)
    t4 = PythonOperator(task_id='delete_forecast', python_callable=delete_forecast)

    t1 >> t2 >> t3 >> t4