In [4]:
!pip install apache-airflow


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [5]:
!pip install apache-airflow-providers-postgres

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [20]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

default_args = {
    'owner': 'MTN Rwanda',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 25),
    'email': ['mtnrwanda@123.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


In [9]:
import pandas as pd

def extract_customer_data():
    customer_data = pd.read_csv('/path/to/customer_data.csv')
    return customer_data

def extract_order_data():
    order_data = pd.read_csv('/path/to/order_data.csv')
    return order_data

def extract_payment_data():
    payment_data = pd.read_csv('/path/to/payment_data.csv')
    return payment_data

[[34m2023-04-24 06:44:26,242[0m] {[34mutils.py:[0m160} INFO[0m - NumExpr defaulting to 2 threads.[0m


In [18]:
# Define a function to transform the data
def transform_data(**context):
    # Load the extracted data
    customer_data = context['task_instance'].xcom_pull(task_ids='extract_customer_data')
    order_data = context['task_instance'].xcom_pull(task_ids='extract_order_data')
    payment_data = context['task_instance'].xcom_pull(task_ids='extract_payment_data')
    
    # Merge the data on customer_id and order_id
    merged_data = pd.merge(customer_data, order_data, on='customer_id')
    
    # Calculate the total payment amount for each order
    payment_totals = payment_data.groupby('order_id')['amount'].sum().reset_index()
    
    # Merge the payment totals with the merged data on order_id
    merged_data = pd.merge(merged_data, payment_totals, on='order_id')
    
    # Rename columns
    merged_data = merged_data.rename(columns={'customer_id': 'customer_id', 'order_id': 'order_id', 'payment_id': 'payment_id', 'amount_y': 'amount'})
    
    # Select the columns we want to keep
    transformed_data = merged_data[['customer_id', 'order_id', 'payment_id', 'amount']]
    
    # Store the transformed data in XCom for use in later tasks
    context['task_instance'].xcom_push(key='transformed_data', value=transformed_data)

    return transformed_data


In [21]:
from sqlalchemy import create_engine

def load_data(**kwargs):
    transformed_data = kwargs['ti'].xcom_pull(task_ids='transform_data')

    engine = create_engine('postgresql://user:password@host:port/dbname')

    transformed_data.to_sql('mtn_rwanda_data', engine, if_exists='replace')

In [22]:
dag = DAG(
    'mtn_rwanda_data_pipeline',
    default_args=default_args,
    description='Data pipeline for MTN Rwanda',
    schedule_interval='0 0 * * *'
)

task_extract_customer_data = PythonOperator(
    task_id='extract_customer_data',
    python_callable=extract_customer_data,
    dag=dag
)

task_extract_order_data = PythonOperator(
    task_id='extract_order_data',
    python_callable=extract_order_data,
    dag=dag
)

task_extract_payment_data = PythonOperator(
    task_id='extract_payment_data',
    python_callable=extract_payment_data,
    dag=dag
)

task_transform_data = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag
)

task_load_data = PostgresOperator(
    task_id='load_data',
    sql='INSERT INTO mtn_rwanda_data (customer_id, order_id, payment_id, amount) SELECT customer_id, order_id, payment_id, amount FROM transformed_data;',
    postgres_conn_id='postgres_conn',
    dag=dag
)

task_extract_customer_data >> task_transform_data
task_extract_order_data >> task_transform_data
task_extract_payment_data >> task_transform_data
task_transform_data >> task_load_data


<Task(PostgresOperator): load_data>

**Best Practices used during the implementation:**

a) I have used default arguments to specify the common parameters for all tasks in the DAG, such as the owner, email addresses, and retry options.

b) I have used operators to define the tasks, which are building blocks of the DAG. This makes it easy to define dependencies between tasks and create a modular and maintainable pipeline.

c) I have also used XCom to share data between tasks. This allows us to pass data between tasks without having to write it to disk or a database.

**Recommendations for deployment and running the pipeline in a cloud-based provider:** 

a) One can use containerization platform like Docker to package the Airflow DAG file and its dependencies.

b) We can also deploy the containerized pipeline to a cloud-based provider like AWS, which provides scalable and reliable infrastructure for running data pipelines.

c) One can use a managed Postgres service like Amazon RDS or Google Cloud SQL for hosting the Postgres database. This allows you to focus on the pipeline and not worry about managing the database infrastructure.