In [28]:
!pip install apache-airflow


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [29]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from sqlalchemy import create_engine
import logging

In [30]:
default_args = {
    'owner': 'MTN Rwanda',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 18),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('data_pipeline', default_args=default_args, schedule_interval='@daily')

In [31]:
def extract_data():
    # extract data from CSV files
    customer_df = pd.read_csv('customer_data.csv')
    order_df = pd.read_csv('order_data.csv')
    payment_df = pd.read_csv('payment_data.csv')
    
    # load the CSV data into Pandas dataframes for later transformation
    return customer_df, order_df, payment_df

In [32]:
def transform_data(customer_df, order_df, payment_df):
  
    #DATA TRANSFORMATION 
    customer_df['date_of_birth'] = pd.to_datetime(customer_df['date_of_birth'])
    order_df['order_date'] = pd.to_datetime(order_df['order_date'])
    payment_df['payment_date'] = pd.to_datetime(payment_df['payment_date'])
    
    customer_order_data = pd.merge(customer_df, order_df, on='customer_id')
    payment_customer_order_data = pd.merge(payment_df, customer_order_data, on=['order_id', 'customer_id'])
    
    payment_customer_order_data.drop(['customer_id', 'order_id'], axis=1, inplace=True)

    customer_lifetime_value = payment_customer_order_data.groupby(['email', 'country', 'gender', 'date_of_birth']).agg(total_amount_paid=('amount', 'sum'), number_of_orders=('product', 'count')).reset_index()
    
    customer_lifetime_value['total_order_value'] = customer_lifetime_value['total_amount_paid'] / customer_lifetime_value['number_of_orders']
    
    customer_lifetime_value['lifespan'] = (pd.Timestamp.today() - customer_lifetime_value['date_of_birth']).dt.days / 365.25
    customer_lifetime_value['average_order_value'] = customer_lifetime_value['total_order_value']
    customer_lifetime_value['clv'] = customer_lifetime_value['average_order_value'] * customer_lifetime_value['number_of_orders'] * customer_lifetime_value['lifespan']
    
    return customer_lifetime_value

In [33]:
#Wrapping the code that loads the data into a try/except block
def load_data(transformed_data):
    try:
    
#To load the transformed data into a Postgres database, you can use the create_engine function 
        engine = create_engine('postgresql://username:password@localhost:5432/dbname')
        connection = engine.connect()
        table_name = 'subscriber_data'

        # create table if it doesn't exist
        if not engine.has_table(table_name):
            transformed_data.iloc[0:0].to_sql(table_name, engine, if_exists='replace', index=False)

        # insert data into table
        transformed_data.to_sql(table_name, connection, if_exists='append', index=False)

        connection.close()
        
        logging.info('Data loaded into Postgres database')
        
    except Exception as e:
        logging.error(f'Error loading data into Postgres database: {e}')
        raise


with dag:
    
    extract_data_task = PythonOperator(
        task_id='extract_data_id1',
        python_callable=extract_data
    )

    transform_data_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )

    load_data_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data
    )

    # define dependencies
    extract_data_task >> transform_data_task >> load_data_task


**Documentation of the pipeline**:

The data pipeline consists of three tasks:

**extract_data_task**: reads in CSV files containing customer, order, and payment data and returns them as Pandas dataframes.
**transform_data_task**: takes the three dataframes as inputs and performs data cleaning and transformation operations, such as merging, grouping, and aggregating, to compute the customer lifetime value for each customer.
**load_data_task**: takes the transformed data as input and loads it into a Postgres database. The data is inserted into a table named "subscriber_data".
The three tasks are defined as PythonOperator instances in an Airflow DAG named "data_pipeline". The DAG is scheduled to run daily.

Best practices:

**Modularization**: The pipeline is split into three tasks, each responsible for a specific part of the data processing. This approach makes the code easier to understand, test, and maintain.

**Error handling**: The load_data_task is wrapped in a try/except block, which catches any exceptions that may occur during the loading process and logs an error message. This practice makes the pipeline more resilient to errors and helps to prevent data loss.

**Logging**: The load_data_task logs its progress and any errors that occur during the data loading process. This practice makes it easier to debug the pipeline and monitor its performance.

**Recommendations for deployment and running the pipeline in a cloud-based provider:**
**Use a managed service**: Consider using a managed service like Amazon Redshift or Google BigQuery for your data storage and processing needs. These services are highly scalable, fault-tolerant, and can handle large volumes of data. They also have built-in security and data governance features.

**Containerize your application**: Use containers to package your application and its dependencies so that it can be easily deployed on any cloud-based provider. This will allow you to move your application between different environments without worrying about compatibility issues.

**Use a managed workflow service**: Use a managed workflow service like AWS Step Functions or Google Cloud Composer to manage the execution of your data pipeline. These services provide a visual interface for building, scheduling, and monitoring workflows, and can automatically retry failed tasks.








