# Background Information

Our telecommunications company, MTN Rwanda, has a vast customer base, and we generate alarge amount of data daily. We must efficiently process and store this data to make informed business decisions. Therefore, we plan to develop a data pipeline to extract, transform, and load data from three CSV files and store it in a Postgres database.

We require a skilled data
engineer who can use the Airflow tool to develop the pipeline to achieve this.


# Problem Statement

The main challenge is that the data generated is in a raw format, and we need to process it efficiently to make it usable for analysis. This requires us to develop a data pipeline that can extract, transform and load the data from multiple CSV files into a single database, which can be used for further analysis.

# Guidelines

The data pipeline should be developed using Airflow, an open-source tool for creating and managing data pipelines. The following steps should be followed to develop the data pipeline:


● The data engineer should start by creating a DAG (Directed Acyclic Graph) that definesthe workflow of the data pipeline.

● The DAG should include tasks that extract data from the three CSV files.

● After extraction, the data should be transformed using Python libraries to match therequired format.

● Finally, the transformed data should be loaded into a Postgres database.

● The data pipeline should be scheduled to run at a specific time daily using the Airflow scheduler.

● We can use the shared file (mtnrwanda-dag.py) as a starting point.

The following are sample CSV files that will be used in the data pipeline:

● customer_data.csv

● order_data.csv

● payment_data.csv

All files for this project can be downloaded from here (link).



In [None]:
#Install Iarflow
!pip install apache-airflow


import pandas as pd
import psycopg2
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

In [None]:
# DAG default arguments
default_args = {
    'owner': 'MTN Rwanda',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 19),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# DAG Definition
dag = DAG('data_pipeline',
          default_args=default_args,
          schedule_interval=timedelta(days=1)
          )

# Data extraction from the CSV files
def extract_data():
    cust_df = pd.read_csv('customer_data.csv')
    order_df = pd.read_csv('order_data.csv')
    payment_df = pd.read_csv('payment_data.csv')

    return cust_df, order_df, payment_df

# Transform the data
def transform_data(cust_df, order_df, payment_df):
    # convert date of birth t datetome format
    cust_df['date_of_birth'] = pd.to_datetime(cust_df['date_of_birth'])

    #  customer order dataframes on  customer_id column merging
    customer_order_df = pd.merge(cust_df, order_df, on='customer_id')

    # payment dataframe with  merged dataframe on the order_id and customer_id columns
    customer_payment_df = pd.merge(customer_order_df, payment_df, on=['order_id', 'customer_id'])

    # Drop unnecessary columns
    customer_payment_df.drop(columns=['customer_id', 'order_id'], inplace=True)

    # Group the data by customer and aggregate the amount paid
    customer_grouped_df = customer_payment_df.groupby(['first_name', 'last_name', 'email', 'country', 'gender', 'date_of_birth'])['amount'].sum().reset_index()

    # Calculate the total value of orders made by each customer in a new column
    customer_grouped_df['total_order_value'] = customer_payment_df.groupby(['first_name', 'last_name', 'email', 'country', 'gender', 'date_of_birth'])['price'].sum().values

    # Calculate the customer lifetime value using the formula CLV = (average order value) x (number of orders made per year) x (average customer lifespan)
    customer_grouped_df['average_order_value'] = customer_grouped_df['total_order_value'] / customer_grouped_df['amount']
    customer_grouped_df['number_of_orders_per_year'] = customer_grouped_df['amount'] / ((pd.to_datetime('now') - customer_grouped_df['date_of_birth']).dt.days / 365)
    customer_grouped_df['average_customer_lifespan'] = (pd.to_datetime('now') - customer_grouped_df['date_of_birth']).dt.days / 365
    customer_grouped_df['clv'] = customer_grouped_df['average_order_value'] * customer_grouped_df['number_of_orders_per_year'] * customer_grouped_df['average_customer_lifespan']

    return customer_grouped_df

In [None]:
# Load transformed data into a PostgreSQL db
def load_data(transformed_df):
    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(
            host = "34.170.193.146"
            database = "MTNRwanda"
            user = "root"
            password = "root@123"
        )

In [None]:
 # Open a cursor to perform database operations
        cur = conn.cursor()

        # Create the cust_ltv table
        cur.execute("""
            CREATE TABLE IF NOT EXISTS cust_ltv (
                customer_id INTEGER PRIMARY KEY,
                total_orders INTEGER,
                total_amount NUMERIC(10,2),
                avg_order_value NUMERIC(10,2),
                ltv NUMERIC(10,2)
            )
        """)


In [None]:
 # Insert the transformed data into the cust_ltv table
        for index, row in transformed_df.iterrows():
            cur.execute("""
                INSERT INTO cust_ltv (customer_id, total_orders, total_amount, avg_order_value, ltv)
                VALUES (%s, %s, %s, %s, %s)
            """, (row['customer_id'], row['total_orders'], row['total_amount'], row['avg_order_value'], row['ltv']))

        conn.commit()

In [None]:
 # Close the cursor and connection
        cur.close()
        conn.close()

        # success message
        logging.info("Data loaded successfully")

    except Exception as e:
        # error message
        logging.error(f"Error loading data: {str(e)}")
        raise e

In [None]:
# extract data task
extract_data_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

In [None]:
# transform data task
transform_data_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag
)

In [None]:
# load data task
load_data_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag
)

# task dependencies
extract_data_task >> transform_data_task >> load_data_task