In [None]:
from airflow import DAG
from airflow.operators.python_operator  import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from sqlalchemy import create_engine

default_args = {
    'owner': 'XYZ Telecoms',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 19),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('data_pipeline', default_args=default_args, schedule_interval='@daily')

def extract_data():
    customer_df = pd.read_csv('customer_data.csv')
    order_df = pd.read_csv('order_data.csv')
    payment_df = pd.read_csv('payment_data.csv')
    return customer_df, order_df, payment_df

def transform_data():
    customer_df, order_df, payment_df = extract_data()

    # convert date fields to the correct format using pd.to_datetime
    customer_df['date_of_birth'] = pd.to_datetime(customer_df['date_of_birth'])
    order_df['order_date'] = pd.to_datetime(order_df['order_date'])
    payment_df['payment_date'] = pd.to_datetime(payment_df['payment_date'])

    # merge customer and order dataframes on the customer_id column
    merged_df = pd.merge(customer_df, order_df, on='customer_id')

    # merge payment dataframe with the merged dataframe on the order_id and customer_id columns
    merged_df = pd.merge(merged_df, payment_df, on=['order_id', 'customer_id'])

    # drop unnecessary columns like customer_id and order_id
    merged_df.drop(['customer_id', 'order_id'], axis=1, inplace=True)

    # group the data by customer and aggregate the amount paid using sum
    agg_df = merged_df.groupby(['first_name', 'last_name', 'email', 'country', 'gender', 'date_of_birth']).agg({'amount': 'sum'})

    # create a new column to calculate the total value of orders made by each customer
    agg_df['total_order_value'] = merged_df.groupby(['first_name', 'last_name', 'email', 'country', 'gender', 'date_of_birth']).agg({'price': 'sum'})

    # calculate the customer lifetime value using the formula CLV = (average order value) x (number of orders made per year) x (average customer lifespan)
    today = datetime.today().date()
    agg_df['customer_ltv'] = agg_df['total_order_value'] * (365 / (today - agg_df.index.get_level_values('date_of_birth').date).dt.days) * (agg_df['amount'] / agg_df.index.get_level_values('amount').nunique())

    return agg_df.reset_index()

def load_data():
   # load the transformed data into Postgres database
    transformed_data = transform_data()
    engine = create_engine('postgresql+psycopg2://postgres:admin@localhost:5442/customer_lifecycle')
    transformed_data.to_sql('customer_ltv', engine, if_exists='replace', index=False)

with dag:
    extract_data_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
        dag=dag
    )

    transform_data_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        dag=dag
    )

    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
        dag=dag
    )

    extract_data_task >> transform_data_task >> load_data_task
