In [66]:
import pandas as pd
import numpy as np


In [67]:
#Reading the data.
d1 = pd.read_csv('dataset1.csv')
d2 = pd.read_csv('dataset2.csv')
d3 = pd.read_csv('dataset3.csv')

In [68]:
print(d1.columns, d2.columns, d3.columns)

Index(['customer_id', 'date_of_purchase', 'total_amount_billed',
       'payment_status', 'payment_method', 'promo_code',
       'country_of_purchase'],
      dtype='object') Index(['customer_id', 'date_of_payment', 'amount_paid', 'payment_method',
       'payment_status', 'late_payment_fee', 'country_of_payment'],
      dtype='object') Index(['customer_id', 'date_of_refund', 'refund_amount', 'reason_for_refund',
       'country_of_refund'],
      dtype='object')


In [69]:
# Remove missing values
d1 = d1.dropna()
d2 = d2.dropna()
d3 = d3.dropna()

In [70]:
d1['date_of_purchase'] = pd.to_datetime(d1['date_of_purchase'], format='%m/%d/%Y')
d2['date_of_payment'] = pd.to_datetime(d2['date_of_payment'], format='%m/%d/%Y')
d3['date_of_refund'] = pd.to_datetime(d3['date_of_refund'], format='%m/%d/%Y')


In [71]:
print(d1.columns.tolist())

['customer_id', 'date_of_purchase', 'total_amount_billed', 'payment_status', 'payment_method', 'promo_code', 'country_of_purchase']


In [72]:
print(d2.columns.tolist())

['customer_id', 'date_of_payment', 'amount_paid', 'payment_method', 'payment_status', 'late_payment_fee', 'country_of_payment']


In [73]:
print(d3.columns.tolist())

['customer_id', 'date_of_refund', 'refund_amount', 'reason_for_refund', 'country_of_refund']


In [88]:
# Converting the data types.
d1["customer_id"] = d1["customer_id"].astype(int)
d1["total_amount_billed"] = d1["total_amount_billed"].astype(float)
d2["customer_id"] = d2["customer_id"].astype(int)
d2["amount_paid"] = d2["amount_paid"].astype(float)
d3["customer_id"] = d3["customer_id"].astype(int)
d3["refund_amount"] = d3["refund_amount"].astype(float)

df1_grouped = d1.groupby(["customer_id", "date_of_purchase", "country_of_purchase"]).sum().reset_index()
df2_grouped = d2.groupby(["customer_id", "date_of_payment", "country_of_payment"]).sum().reset_index()
df3_filtered = d3.loc[d3["reason_for_refund"].notnull()]

# Merge the dataframes into a single dataframe
df_merged = pd.merge(df1_grouped, df2_grouped, how="inner", on=["customer_id"])
df_merged2 = pd.merge(df_merged, df3_filtered, how="inner", on=["customer_id"])
 

In [80]:
#Loading the data to a csv file
df_merged2.to_csv('merged_dataset.csv', index=False)

In [83]:

pip install apache-airflow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [84]:
from airflow.operators.python_operator import PythonOperator

In [85]:
from airflow import DAG

In [86]:
from datetime import datetime, timedelta

In [87]:
default_args = {
    'owner': 'Telcom Companies',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 1),
    'email': ['tlc@companies.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'cdr_pipeline',
    default_args=default_args,
    description='Telcom ETL pipeline',
    schedule_interval=timedelta(days=1),
)


In [None]:
# Test the pipeline by comparing the output with the expected output
expected_df = pd.read_csv('/content/merged_dataset.csv')
assert df_merged2.equals(expected_df)