# **Data Pipelines with Python Project**

# **Project Deliverable**
Telecom companies often have to extract billing data from multiple CSV files generated from
various systems and transform it into a structured format for analysis and revenue reporting.
This process can be time-consuming, error-prone, and hinder decision-making. Manually
analyzing and reconciling billing data from different sources is a tedious task and often leads to
delays in generating revenue reports. Thus, there is a need for an automated data pipeline that
can extract billing data from multiple sources and transform it into a structured format for
efficient analysis and revenue reporting.

In [1]:
from google.colab import files
uploaded = files.upload()

Saving dataset1.csv to dataset1.csv
Saving dataset2.csv to dataset2.csv
Saving dataset3.csv to dataset3.csv


Extract the data

In [2]:
import pandas as pd

In [3]:
df1 = pd.read_csv('dataset1.csv')
df1.head()

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase
0,101,04/01/2021,100,paid,credit card,PROMO1,USA
1,102,04/02/2021,200,paid,bank transfer,PROMO2,USA
2,103,04/02/2021,50,overdue,credit card,,UK
3,104,04/03/2021,75,disputed,e-wallet,PROMO3,UK
4,105,04/04/2021,125,paid,credit card,PROMO4,USA


In [4]:
df2 = pd.read_csv('dataset2.csv')
df2.head()

Unnamed: 0,customer_id,date_of_payment,amount_paid,payment_method,payment_status,late_payment_fee,country_of_payment
0,101,04/01/2021,100,credit card,paid,0,USA
1,102,04/03/2021,200,bank transfer,paid,0,USA
2,103,04/03/2021,75,credit card,paid,10,UK
3,104,04/04/2021,50,e-wallet,overdue,0,UK
4,105,04/05/2021,125,credit card,paid,0,USA


In [5]:
df3 = pd.read_csv('dataset3.csv')
df3.head()

Unnamed: 0,customer_id,date_of_refund,refund_amount,reason_for_refund,country_of_refund
0,101,04/03/2021,100,product not as described,USA
1,102,04/06/2021,200,defective product,USA
2,103,04/07/2021,75,change of mind,UK
3,104,04/08/2021,50,product not received,UK
4,105,04/09/2021,25,product not as described,USA


**Clean the data:**

In [None]:
from datetime import datetime, timedelta

In [8]:
df1['date_of_purchase'] = pd.to_datetime(df1['date_of_purchase'])
df2['date_of_payment'] = pd.to_datetime(df2['date_of_payment'])
df3['date_of_refund'] = pd.to_datetime(df3['date_of_refund'])

In [9]:
transformed_data1= df1.dropna()
transformed_data2= df2.dropna()
transformed_data3= df3.dropna()
transformed_data1

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status,payment_method,promo_code,country_of_purchase
0,101,2021-04-01,100,paid,credit card,PROMO1,USA
1,102,2021-04-02,200,paid,bank transfer,PROMO2,USA
3,104,2021-04-03,75,disputed,e-wallet,PROMO3,UK
4,105,2021-04-04,125,paid,credit card,PROMO4,USA
6,107,2021-04-06,75,overdue,e-wallet,PROMO5,USA
7,108,2021-04-06,100,overdue,bank transfer,PROMO6,USA
9,110,2021-04-07,25,overdue,credit card,PROMO7,USA
10,111,2021-04-08,175,paid,e-wallet,PROMO8,UK
11,112,2021-04-08,200,paid,bank transfer,PROMO9,USA
12,113,2021-04-09,50,disputed,credit card,PROMO10,USA


**merge the datasets**

In [10]:
df_merged = transformed_data1.merge(transformed_data2,on = "customer_id").merge(transformed_data3, on="customer_id" )
df_merged

Unnamed: 0,customer_id,date_of_purchase,total_amount_billed,payment_status_x,payment_method_x,promo_code,country_of_purchase,date_of_payment,amount_paid,payment_method_y,payment_status_y,late_payment_fee,country_of_payment,date_of_refund,refund_amount,reason_for_refund,country_of_refund
0,101,2021-04-01,100,paid,credit card,PROMO1,USA,2021-04-01,100,credit card,paid,0,USA,2021-04-03,100,product not as described,USA
1,102,2021-04-02,200,paid,bank transfer,PROMO2,USA,2021-04-03,200,bank transfer,paid,0,USA,2021-04-06,200,defective product,USA
2,104,2021-04-03,75,disputed,e-wallet,PROMO3,UK,2021-04-04,50,e-wallet,overdue,0,UK,2021-04-08,50,product not received,UK
3,105,2021-04-04,125,paid,credit card,PROMO4,USA,2021-04-05,125,credit card,paid,0,USA,2021-04-09,25,product not as described,USA
4,107,2021-04-06,75,overdue,e-wallet,PROMO5,USA,2021-04-07,75,e-wallet,overdue,20,USA,2021-04-12,150,change of mind,USA
5,108,2021-04-06,100,overdue,bank transfer,PROMO6,USA,2021-04-07,100,bank transfer,overdue,30,USA,2021-04-13,75,product not as described,USA
6,110,2021-04-07,25,overdue,credit card,PROMO7,USA,2021-04-08,25,credit card,paid,0,USA,2021-04-14,50,product not received,USA
7,111,2021-04-08,175,paid,e-wallet,PROMO8,UK,2021-04-09,175,e-wallet,paid,0,UK,2021-04-15,175,defective product,UK
8,112,2021-04-08,200,paid,bank transfer,PROMO9,USA,2021-04-10,200,bank transfer,paid,0,USA,2021-04-16,200,change of mind,USA
9,113,2021-04-09,50,disputed,credit card,PROMO10,USA,2021-04-10,50,credit card,disputed,0,USA,2021-04-16,50,product not as described,USA


**Load the data**

In [None]:
# Load the merged  data into a CSV file



In [12]:
df_merged.to_csv('merged.csv',encoding= 'utf-8-sig')
files.download('merged.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [14]:
from google.colab import drive
drive.mount('/content/drive')
path = '/content/drive/My Drive/output.csv'
with open(path, 'w', encoding = 'utf-8-sig') as f:
  df_merged.to_csv(f)

Mounted at /content/drive


# automating the process 

In [27]:
import os.path
from os import path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import psycopg2


In [25]:
pip install apache-airflow


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
pip install airflow.providers.google

In [29]:
default_args = {
    'owner': 'telecom',
    'depends_on_past': False,
    'start_date': datetime(2022, 2, 18),
    'email': ['telecom@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'cdr_pipeline',
    default_args=default_args,
    description='ETL pipeline for call detail records',
    schedule_interval=timedelta(days=1),
)

def extract_data():
    conn = psycopg2.connect(host="localhost", database="mydb", user="myuser", password="mypassword")
    df = pd.read_sql_query("SELECT * FROM cdr", conn)
    return df

def transform_data(df):
    df['duration'] = df['endtime'] - df['starttime']
    df['date'] = df['starttime'].dt.date
    df['hour'] = df['starttime'].dt.hour
    df = df[['date', 'hour', 'duration', 'caller', 'callee']]
    return df

def load_data(df):
    filename = datetime.today().strftime("%Y%m%d") + '_cdr.csv'
    df.to_csv(filename, index=False)
    return filename

def upload_to_gcs(filename):
    gcs_hook = GoogleCloudStorageHook()
    gcs_hook.upload(
        bucket='mybucket',
        object='data/cdrs/' + filename,
        filename=filename,
    )

extract = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

load = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)


**monitoring  the process**

---



*  Enabling logging 


