**Project Deliverable**

Telecom companies often have to extract billing data from multiple CSV files generated from
various systems and transform it into a structured format for analysis and revenue reporting.
This process can be time-consuming, error-prone, and hinder decision-making. Manually
analyzing and reconciling billing data from different sources is a tedious task and often leads to
delays in generating revenue reports. Thus, there is a need for an automated data pipeline that
can extract billing data from multiple sources and transform it into a structured format for
efficient analysis and revenue reporting.

We have 3 datasets that have missing values and outliers, source *https://bit.ly/416WE1X*

**Notes:**
1. The datasets can be joined using Customer ID, Date of purchase/payment/refund, and
country of purchase/payment/refund as keys.
2. The datasets may contain missing values and outliers for some fields, such as the total
amount billed or refund amount.
3. The payment status may be missing or incomplete for some of the transactions.
4. The promo code field may be empty for some of the purchases.
5. The reason for the refund may be missing for some of the refund transactions.


In [2]:
#We install apache-airflow
!pip install apache-airflow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-airflow
  Downloading apache_airflow-2.5.2-py3-none-any.whl (11.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.6/11.6 MB[0m [31m56.4 MB/s[0m eta [36m0:00:00[0m
Collecting python-daemon>=3.0.0
  Downloading python_daemon-3.0.1-py3-none-any.whl (31 kB)
Collecting mdit-py-plugins>=0.3.0
  Downloading mdit_py_plugins-0.3.5-py3-none-any.whl (52 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.1/52.1 KB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting cron-descriptor>=1.2.24
  Downloading cron_descriptor-1.2.35.tar.gz (29 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting apache-airflow-providers-http
  Downloading apache_airflow_providers_http-4.2.0-py3-none-any.whl (22 kB)
Collecting colorlog<5.0,>=4.0.2
  Downloading colorlog-4.8.0-py2.py3-none-any.whl (10 kB)
Collecting deprecated>=1.2.13
  Do

In [3]:
#We import Libraries
import pandas as pd
from datetime import datetime, timedelta
from google.colab import drive
import os.path
from os import path
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import psycopg2

In [4]:
#Import the data to dataframe
df1 = pd.read_csv('dataset1.csv')
df2 = pd.read_csv('dataset2.csv')
df3 = pd.read_csv('dataset3.csv')

In [5]:
#Preview the data
df1.head(5)

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase
0,101,04/01/2021,100,paid,credit card,PROMO1,USA
1,102,04/02/2021,200,paid,bank transfer,PROMO2,USA
2,103,04/02/2021,50,overdue,credit card,,UK
3,104,04/03/2021,75,disputed,e-wallet,PROMO3,UK
4,105,04/04/2021,125,paid,credit card,PROMO4,USA


In [6]:
df2.head(5)

Unnamed: 0,customer_id,date_of_payment,amount_paid,payment_method,payment_status,late_payment_fee,country_of_payment
0,101,04/01/2021,100,credit card,paid,0,USA
1,102,04/03/2021,200,bank transfer,paid,0,USA
2,103,04/03/2021,75,credit card,paid,10,UK
3,104,04/04/2021,50,e-wallet,overdue,0,UK
4,105,04/05/2021,125,credit card,paid,0,USA


In [7]:
df3.head(5)

Unnamed: 0,customer_id,date_of_refund,refund_amount,reason_for_refund,country_of_refund
0,101,04/03/2021,100,product not as described,USA
1,102,04/06/2021,200,defective product,USA
2,103,04/07/2021,75,change of mind,UK
3,104,04/08/2021,50,product not received,UK
4,105,04/09/2021,25,product not as described,USA


**Cleaning the Data**

Remove any missing values and outliers

In [14]:
#Remove missing data
clean_df1= df1.dropna()
clean_df2= df2.dropna()
clean_df3= df3.dropna()

#Rename some columns in the dataset
clean_df1 = clean_df1.rename(columns={"date_of_purchase": "date", "country_of_purchase": "country","total_amount_billed":"bill"})
clean_df2 = clean_df2.rename(columns={"date_of_payment": "date", "country_of_payment": "country","late_payment_fee":"lateness_fee"})
clean_df3 = clean_df3.rename(columns={"date_of_refund": "date", "country_of_refund": "country"})

Preview Transformed data

In [12]:
clean_df1.sample()

Unnamed: 0,customer_id,date,bill,payment_status,payment_method,promo_code,country
11,112,04/08/2021,200,paid,bank transfer,PROMO9,USA


In [15]:
clean_df2.sample()

Unnamed: 0,customer_id,date,amount_paid,payment_method,payment_status,lateness_fee,country
14,115,04/12/2021,75,e-wallet,overdue,15,UK


In [16]:
clean_df3.sample()

Unnamed: 0,customer_id,date,refund_amount,reason_for_refund,country
13,114,04/17/2021,100,defective product,USA


**Merge the data**

In [17]:
merged_df = clean_df1.merge(clean_df2,on = "customer_id").merge(clean_df3, on="customer_id" )

merged_df

Unnamed: 0,customer_id,date_x,bill,payment_status_x,payment_method_x,promo_code,country_x,date_y,amount_paid,payment_method_y,payment_status_y,lateness_fee,country_y,date,refund_amount,reason_for_refund,country
0,101,04/01/2021,100,paid,credit card,PROMO1,USA,04/01/2021,100,credit card,paid,0,USA,04/03/2021,100,product not as described,USA
1,102,04/02/2021,200,paid,bank transfer,PROMO2,USA,04/03/2021,200,bank transfer,paid,0,USA,04/06/2021,200,defective product,USA
2,104,04/03/2021,75,disputed,e-wallet,PROMO3,UK,04/04/2021,50,e-wallet,overdue,0,UK,04/08/2021,50,product not received,UK
3,105,04/04/2021,125,paid,credit card,PROMO4,USA,04/05/2021,125,credit card,paid,0,USA,04/09/2021,25,product not as described,USA
4,107,04/06/2021,75,overdue,e-wallet,PROMO5,USA,04/07/2021,75,e-wallet,overdue,20,USA,04/12/2021,150,change of mind,USA
5,108,04/06/2021,100,overdue,bank transfer,PROMO6,USA,04/07/2021,100,bank transfer,overdue,30,USA,04/13/2021,75,product not as described,USA
6,110,04/07/2021,25,overdue,credit card,PROMO7,USA,04/08/2021,25,credit card,paid,0,USA,04/14/2021,50,product not received,USA
7,111,04/08/2021,175,paid,e-wallet,PROMO8,UK,04/09/2021,175,e-wallet,paid,0,UK,04/15/2021,175,defective product,UK
8,112,04/08/2021,200,paid,bank transfer,PROMO9,USA,04/10/2021,200,bank transfer,paid,0,USA,04/16/2021,200,change of mind,USA
9,113,04/09/2021,50,disputed,credit card,PROMO10,USA,04/10/2021,50,credit card,disputed,0,USA,04/16/2021,50,product not as described,USA


**Load the data**

Load the transformed data into a database or a file, such as a CSV file,
that can be easily analyzed.

In [19]:
#We load the data into a csv file

merged_df.to_csv('merged.csv',encoding= 'utf-8-sig',index=False)
#files.download('merged.csv')

**Automating the pocess**

Automate the data pipeline by scheduling it to run at a specific
time, such as daily or weekly so that it can update the analysis data automatically.


In [21]:
pip install airflow.providers.google

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[31mERROR: Could not find a version that satisfies the requirement airflow.providers.google (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for airflow.providers.google[0m[31m
[0m

In [24]:
# Define the DAG
default_args = {
    'owner': 'telecom',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 25),
    'email': ['telecom@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'billing_pipeline',
    default_args=default_args,
    description='ETL pipeline for billing records',
    schedule_interval='@daily',
)

# Function to extract data from CSVs
def extract():
    df1 = pd.read_csv('/path/to/file1.csv')
    df2 = pd.read_csv('/path/to/file2.csv')
    df3 = pd.read_csv('/path/to/file3.csv')
    return df1, df2, df3

# Function to connect to Postgres database
def extract_data():
    conn = psycopg2.connect(host="localhost", database="mydb", user="myuser", password="mypassword")
    df = pd.read_sql_query("SELECT * FROM cdr", conn)
    return df

# Function to transform data
def transform_data(df):
    df['duration'] = df['endtime'] - df['starttime']
    df['date'] = df['starttime'].dt.date
    df['hour'] = df['starttime'].dt.hour
    df = df[['date', 'hour', 'duration', 'caller', 'callee']]
    return df

# Function to load data into Postgres database
def load_data(df):
    filename = datetime.today().strftime("%Y%m%d") + '_billing.csv'
    df.to_csv(filename, index=False)
    return filename

# Define tasks
extract = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

load = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

# Set task dependencies
extract >> transform  >> load

<Task(PythonOperator): load_data>