In [None]:
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt

def extract():
    hook = PostgresHook(postgres_conn_id="postgres_local")
    df = pd.read_sql("SELECT sale_date, quantity, price FROM sales", hook.get_conn())
    df.to_csv("sales_data.csv", index=False)
    return "sales_data.csv"

def transform():
    df = pd.read_csv("sales_data.csv")
    df["sale_date"] = pd.to_datetime(df["sale_date"])
    df["revenue"] = df["quantity"] * df["price"]
    daily = df.groupby("sale_date")["revenue"].sum().reset_index()
    daily.to_csv("daily_revenue.csv", index=False)
    return "daily_revenue.csv"

def visualize():
    df = pd.read_csv("daily_revenue.csv")
    df["sale_date"] = pd.to_datetime(df["sale_date"])
    plt.plot(df["sale_date"], df["revenue"])
    plt.xlabel("Date")
    plt.ylabel("Revenue")
    plt.title("Daily Revenue")
    plt.savefig("daily_revenue_plot.png")

with DAG(
    dag_id="simple_sales_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False
) as dag:

    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    visualize_task = PythonOperator(task_id="visualize", python_callable=visualize)

    extract_task >> transform_task >> visualize_task
