In [None]:
#import necessary libraries
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd 
import matplotlib.pyplot as plt 

#connect to database 
postg_conn='postgress_conn'

# deauflt dag arguments 
default_args = {
    'owner': 'kiwilytics',
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}


In [None]:

#fetch sales data from postgres database
def fetech_sales():
    hook=PostgresHook(postgress_conn_id=postg_conn)
    conn=hook.get_conn()
    query="""
     select o.orderdate::date as Sales_Date,
     od.productid,
     od.quantity,
     p.price 
     from orders as o 
     join order_details as od on o.orderid =od.orderid 
     join products as p on od.productid=p.productid
    """
    df=pd.read_sql(query,conn)
    df.to_csv('/home/kiwilytics/airflow_output/daily_sales_capstone.csv',index=False)




In [None]:
#transform sales data to get daily revenue
def aggergate_sales():
    df=pd.read_csv('/home/kiwilytics/airflow_output/daily_sales_capstone.csv')
    df['Total_Revenue']=df['quantity']*df['price']
    day_revenue=df.groupby('sales_date').agg(total_rev=('Total_Revenue','sum')).reset_index()
    day_revenue.to_csv('/home/kiwilytics/airflow_output/Daily_reven_capstone.csv',index=False)

In [None]:
#plot daily sales revenue
def plot_sales():
    df=pd.read_csv('/home/kiwilytics/airflow_output/Daily_reven_capstone.csv')
    df['Sales_Date']=pd.to_datetime(df['Sales_Date'])
    plt.figure(figsize=(10,6))
    plt.plot(df['Sales_Date'],df['total_rev'],marker='o')
    plt.title('Daily Sales Revenue')
    plt.xlabel('Date')
    plt.ylabel('Total Revenue')
    plt.xticks(rotation=45)
    plt.grid()
    plt.savefig('/home/kiwilytics/airflow_output/daily_sales_revenue_plot.png')
    print("Plot saved successfully.")


In [None]:
#defining the dag
with DAG(
    dag_id='daily_sales_capstone',
    default_args=default_args,
    description='A simple daily sales data pipeline',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['sales','daily_sales'],
) as dag:

    task_1=PythonOperator(
        task_id='fetch_sales_data',
        python_callable=fetech_sales
    )

    task_2=PythonOperator(
        task_id='aggregate_daily_sales',
        python_callable=aggergate_sales
    )

    task_3=PythonOperator(
        task_id='plot_daily_sales',
        python_callable=plot_sales
    )

    task_1 >> task_2 >> task_3