**Data Pipelines with Python Project**

**Project Deliverable**

● A GitHub repository with a python file (.py) or notebook (.ipynb) with your solution.

**Project Description**

Telecom companies often have to extract billing data from multiple CSV files generated from various systems and transform it into a structured format for analysis and revenue reporting.
This process can be time-consuming, error-prone, and hinder decision-making. Manually analyzing and reconciling billing data from different sources is a tedious task and often leads to delays in generating revenue reports. Thus, there is a need for an automated data pipeline that
can extract billing data from multiple sources and transform it into a structured format for efficient analysis and revenue reporting.

**Guidelines**
Here are some guidelines and hints to help you create the data pipeline:


1.   Determine the requirements: First, you need to define the requirements of the data pipeline, including the source and destination of the data, the type of data that needs to be processed, the transformations that need to be applied, and the output format.
2.   Extract the data: Use Python to read the CSV files and extract the data.
3.   Clean the data: Perform data cleaning on the extracted data to remove any missing values and outliers. For example, you can replace missing values with an appropriate value or remove them altogether.
4.   Transform the data: Apply any necessary transformations on the data, such as data type conversion, data aggregation, and data filtering, to prepare the data for analysis.
5.   Merge the datasets: Join the different datasets into a single dataset that can be used for analysis.
6. Load the data: Load the transformed data into a database or a file, such as a CSV file,that can be easily analyzed.
7. Automate the process: Automate the data pipeline by scheduling it to run at a specific time, such as daily or weekly so that it can update the analysis data automatically.
8. Test the pipeline: Test the data pipeline to ensure it produces the correct results. This can be done by comparing the results with the expected output or using a test dataset.
9. Optimize the pipeline: Optimize the data pipeline to improve performance and reduce errors. This can be done by optimizing the code, parallel processing, and reducing the data size.
10. Monitor the pipeline: Monitor the data pipeline to ensure that it runs smoothly and that there are no errors or issues.

**Datasets**

Here are three sample datasets (https://bit.ly/416WE1X) with billing data that can be joined. The datasets contain some missing values and outliers:

1. Dataset 1:
* Customer ID (numeric)
* Date of purchase (MM/DD/YYYY)
* Total amount billed (numeric)
* Payment status (categorical - paid, overdue, disputed)
* Payment method (categorical - credit card, bank transfer, e-wallet)
* Promo code (text)
* Country of purchase (categorical)

2. Dataset 2:
*  Customer ID (numeric)
*  Date of payment (MM/DD/YYYY)
*  Amount paid (numeric)
*  Payment method (categorical - credit card, bank transfer, e-wallet)
*  Payment status (categorical - paid, overdue, disputed)
*  Late payment fee (numeric)
*  Country of payment (categorical)

3. Dataset 3:
*  Customer ID (numeric)
*  Date of refund (MM/DD/YYYY)
*  Refund amount (numeric)
*  Reason for refund (text)
*  Country of refund (categorical)

**Notes:**
1. The datasets can be joined using Customer ID, Date of purchase/payment/refund, and
country of purchase/payment/refund as keys.
2. The datasets may contain missing values and outliers for some fields, such as the totaL amount billed or refund amount.
3. The payment status may be missing or incomplete for some of the transactions.
4. The promo code field may be empty for some of the purchases.
5. The reason for the refund may be missing for some of the refund transactions.

Importing the required and necessary libraries

In [1]:
#Importing the necessary libraries
import pandas as pd
import numpy as np
import os


Extracting the data from the datasets provided and loading.
We use the pandas library to load three datasets into dataframes.

The first step defines file paths for the three datasets using relative file paths, assuming that the datasets are in the same directory as the Python script.

The next step loads each dataset into a dataframe using pd.read_csv() method of pandas. This allows for quick checks of the datasets, such as identifying missing data, checking naming conventions, and inspecting the general data structure.

In [7]:
# Define the file paths for the three datasets. We want to see what is inside the datasets
file1 = "dataset1.csv"
file2 = "dataset2.csv"
file3 = "dataset3.csv"

# Loading the datasets into dataframes for quick checks(Checking missing data, naming conventions and the general data structure)
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
df3 = pd.read_csv(file3)


We can further modify the code to check for any potential errors or issues while loading the data:

In [29]:
import pandas as pd

# Define the file paths for the three datasets
file1 = "dataset1.csv"
file2 = "dataset2.csv"
file3 = "dataset3.csv"

# Load the datasets into dataframes for quick checks
try:
    df1 = pd.read_csv(file1)
    print(f"Loaded {len(df1)} rows from {file1}")
except FileNotFoundError:
    print(f"{file1} not found")

try:
    df2 = pd.read_csv(file2)
    print(f"Loaded {len(df2)} rows from {file2}")
except FileNotFoundError:
    print(f"{file2} not found")

try:
    df3 = pd.read_csv(file3)
    print(f"Loaded {len(df3)} rows from {file3}")
except FileNotFoundError:
    print(f"{file3} not found")


Loaded 15 rows from dataset1.csv
Loaded 15 rows from dataset2.csv
Loaded 15 rows from dataset3.csv


Checking data in df1

In [9]:
# General overview of data in df1
print(df1)

    customer_id date_of_purchase  total_amount_billed payment_status  \
0           101       04/01/2021                  100           paid   
1           102       04/02/2021                  200           paid   
2           103       04/02/2021                   50        overdue   
3           104       04/03/2021                   75       disputed   
4           105       04/04/2021                  125           paid   
5           106       04/05/2021                  150           paid   
6           107       04/06/2021                   75        overdue   
7           108       04/06/2021                  100        overdue   
8           109       04/07/2021                   50           paid   
9           110       04/07/2021                   25        overdue   
10          111       04/08/2021                  175           paid   
11          112       04/08/2021                  200           paid   
12          113       04/09/2021                   50       disp

Checking data in df2

In [10]:
# General overview of data in df2
print(df2)

    customer_id date_of_payment  amount_paid payment_method payment_status  \
0           101      04/01/2021          100    credit card           paid   
1           102      04/03/2021          200  bank transfer           paid   
2           103      04/03/2021           75    credit card           paid   
3           104      04/04/2021           50       e-wallet        overdue   
4           105      04/05/2021          125    credit card           paid   
5           106      04/06/2021          150    credit card           paid   
6           107      04/07/2021           75       e-wallet        overdue   
7           108      04/07/2021          100  bank transfer        overdue   
8           109      04/08/2021           50  bank transfer           paid   
9           110      04/08/2021           25    credit card           paid   
10          111      04/09/2021          175       e-wallet           paid   
11          112      04/10/2021          200  bank transfer     

Checking data in df3

In [11]:
# General overview of data in df3
print(df3)

    customer_id date_of_refund  refund_amount         reason_for_refund  \
0           101     04/03/2021            100  product not as described   
1           102     04/06/2021            200         defective product   
2           103     04/07/2021             75            change of mind   
3           104     04/08/2021             50      product not received   
4           105     04/09/2021             25  product not as described   
5           106     04/11/2021            125         defective product   
6           107     04/12/2021            150            change of mind   
7           108     04/13/2021             75  product not as described   
8           109     04/13/2021            100         defective product   
9           110     04/14/2021             50      product not received   
10          111     04/15/2021            175         defective product   
11          112     04/16/2021            200            change of mind   
12          113     04/16

Defining the paths to the datasets afresh(Input and Output paths)

```
# Let us define new paths for more analysis
```



We define the file paths for three input datasets (set1_path, set2_path, and set3_path) and an output file path (output_path).

The relative paths specified assume that the datasets and the output file will be stored in the same directory as the Python script.

In [30]:
# Defining new paths to the CSV files
set1_path = "dataset1.csv"
set2_path = "dataset2.csv"
set3_path = "dataset3.csv"

# Defining the output path for the transformed data. We will call the transformed CSV "output.csv"
output_path = "final_output.csv"

We can modify the code to include error handling for the file paths defined above:

In [31]:
set1_path = "dataset1.csv"
set2_path = "dataset2.csv"
set3_path = "dataset3.csv"
output_path = "final_output.csv"

# Check if input file paths exist
if not all(map(os.path.isfile, [set1_path, set2_path, set3_path])):
    raise FileNotFoundError("One or more input files not found")

# Check if output file path exists
if os.path.isfile(output_path):
    print("Output file already exists. It will be overwritten.")

# Use absolute paths instead of relative paths
set1_path = os.path.abspath(set1_path)
set2_path = os.path.abspath(set2_path)
set3_path = os.path.abspath(set3_path)
output_path = os.path.abspath(output_path)


Transforming the data: 


```
# We need to apply any necessary transformations on the data, such as data type conversion, filling missing values, ensuring everything is in lowercase and replace spaces in variable names with underscores
```



We then define the data types for each column in the three input datasets using dictionaries.

For example, set1_dtypes specifies the data types for the columns in the dataset1.csv file. The customer_id column should be of type np.int64, the total_amount_billed column should be of type np.float64, the payment_status and payment_method columns should be of type "category", the promo_code column should be of type str, and the country_of_purchase column should be of type "category". The commented out lines indicate that the original code anticipated having columns representing dates, but those are not being used.

Using the correct data types is important because it can save memory, reduce the possibility of errors in data analysis and visualization, and improve performance.

In [15]:
# Defining the data types for each column in the datasets read using the paths
set1_dtypes = {
    "customer_id": np.int64,
    # "date_of_purchase": np.datetime64,
    "total_amount_billed": np.float64,
    "payment_status": "category",
    "payment_method": "category",
    "promo_code": str,
    "country_of_purchase": "category"
}

set2_dtypes = {
    "customer_id": np.int64,
    # "date_of_payment": np.datetime64,
    "amount_paid": np.float64,
    "payment_method": "category",
    "payment_status": "category",
    "late_payment_fee": np.float64,
    "country_of_payment": "category"
}

set3_dtypes = {
    "customer_id": np.int64,
    # "date_of_refund": np.datetime64,
    "refund_amount": np.float64,
    "reason_for_refund": str,
    "country_of_refund": "category"
}


We the define three lambda functions to parse dates in each of the three datasets.

The set1_date_parser function takes a date string in the format "month/day/year" (e.g., "01/25/2022") and converts it to a Pandas datetime object. The format argument specifies the format of the input date string, and the errors argument tells Pandas to set any invalid date strings to NaT (Not a Time) instead of raising an error.

The set2_date_parser and set3_date_parser functions are identical to set1_date_parser, but they are defined for the second and third datasets, respectively.

Using the correct date parsers is important because it ensures that date columns are correctly interpreted and can be used in time-based analyses, such as time series forecasting or trend analysis.

In [14]:
# Defining the date parsers for each dataset
set1_date_parser = lambda x: pd.to_datetime(x, format="%m/%d/%Y", errors="coerce")
set2_date_parser = lambda x: pd.to_datetime(x, format="%m/%d/%Y", errors="coerce")
set3_date_parser = lambda x: pd.to_datetime(x, format="%m/%d/%Y", errors="coerce")

We then read CSV files using pandas and specify data types and date parsers for each column. Additionally, we are trying to parse the dates while reading the CSV files.


In [16]:
# Reading the datasets into pandas dataframes
set1 = pd.read_csv(set1_path, dtype=set1_dtypes, parse_dates=["date_of_purchase"], date_parser=set1_date_parser)
set2 = pd.read_csv(set2_path, dtype=set2_dtypes, parse_dates=["date_of_payment"], date_parser=set2_date_parser)
set3 = pd.read_csv(set3_path, dtype=set3_dtypes, parse_dates=["date_of_refund"], date_parser=set3_date_parser)

Cleaning the data: We will use all the methods to ensure data is clean

We replace missing values, but note that replacing all missing values with 0 can distort your data and produce incorrect analysis results, especially for numerical data. You may want to consider other approaches for handling missing data such as imputation or removal of rows/columns with missing values depending on your specific use case.

In [17]:
# Replace missing values in Total amount billed and Refund amount with 0
set1.replace(["", " ", "-"], np.nan, inplace=True)
set2.replace(["", " ", "-"], np.nan, inplace=True)
set3.replace(["", " ", "-"], np.nan, inplace=True)


Merging the datasets:

We will first print the columns for the three respective datasets. Printing the columns of each dataset individually, help us identify which columns can be merged.

In [33]:
# Preparing the data for merging by checking their column names to determine which columns can be merged
print(f'Dataset 1\n{set1.columns}')
print(f'Dataset 2\n{set2.columns}')
print(f'Dataset 3\n{set3.columns}')



Dataset 1
Index(['customer_id', 'date', 'total_amount_billed', 'payment_status',
       'payment_method', 'promo_code', 'country'],
      dtype='object')
Dataset 2
Index(['customer_id', 'date', 'amount_paid', 'payment_method',
       'payment_status', 'late_payment_fee', 'country'],
      dtype='object')
Dataset 3
Index(['customer_id', 'date', 'refund_amount', 'reason_for_refund', 'country'], dtype='object')


We then reset the index of each dataset and drops the old index column. This is done to avoid duplicate index values when we merge the datasets.

In [34]:
# Droping the index columns
set1 = set1.reset_index(drop=True)
set2 = set2.reset_index(drop=True)
set3 = set3.reset_index(drop=True)


We then rename the columns to a common naming convention, which is a good practice for consistency in data.

In [35]:
# Renaming the columns
set1 = set1.rename(columns={"date_of_purchase": "date", "country_of_purchase": "country"})
set2 = set2.rename(columns={"date_of_payment": "date", "country_of_payment": "country"})
set3 = set3.rename(columns={"date_of_refund": "date", "country_of_refund": "country"})

We the successfully merge the three datasets using Customer ID, Date of purchase/payment/refund, and country of purchase/payment/refund as keys. The resulting merged_data dataframe now contains the columns from all three datasets.

However, before proceeding further, it is important to check the merged data for missing values and outliers as mentioned in the project description. You may need to perform some data cleaning and transformation steps to prepare the data for analysis.

In [36]:
# Merge the datasets by Customer ID, Date of purchase/payment/refund, and country of purchase/payment/refund
merged_data = pd.merge(set1, set2, on=['customer_id','date','country'],
                          how='outer', suffixes = ('_purchase', '_payment'))

merged_data = pd.merge(merged_data, set3, on=['customer_id','date','country'], 
                          how='outer', suffixes = ('_payment', '_refund'))
merged_data.head()


Unnamed: 0,customer_id,date,total_amount_billed,payment_status_purchase,payment_method_purchase,promo_code,country,amount_paid,payment_method_payment,payment_status_payment,late_payment_fee,refund_amount,reason_for_refund
0,101,2021-04-01,100.0,paid,credit card,PROMO1,USA,100.0,credit card,paid,0.0,,
1,102,2021-04-02,200.0,paid,bank transfer,PROMO2,USA,,,,,,
2,103,2021-04-02,50.0,overdue,credit card,,UK,,,,,,
3,104,2021-04-03,75.0,disputed,e-wallet,PROMO3,UK,,,,,,
4,105,2021-04-04,125.0,paid,credit card,PROMO4,USA,,,,,,


Just in case it bring issues renaming the columns, you can merge using the original names

In [40]:
# Check the columns of each dataset
print(f"Columns in set1: {set1.columns}")
print(f"Columns in set2: {set2.columns}")
print(f"Columns in set3: {set3.columns}")

# Merge the datasets by Customer ID, Date of purchase/payment/refund, and country of purchase/payment/refund
#merged_data = pd.merge(set1, set2, on=['customer_id','date_of_purchase','country_of_purchase'],
#                          how='outer', suffixes = ('_purchase', '_payment'))

#merged_data = pd.merge(merged_data, set3, on=['customer_id','date_of_purchase','country_of_purchase'], 
#                          how='outer', suffixes = ('_payment', '_refund'))
#merged_data.head()


Columns in set1: Index(['customer_id', 'date', 'total_amount_billed', 'payment_status',
       'payment_method', 'promo_code', 'country'],
      dtype='object')
Columns in set2: Index(['customer_id', 'date', 'amount_paid', 'payment_method',
       'payment_status', 'late_payment_fee', 'country'],
      dtype='object')
Columns in set3: Index(['customer_id', 'date', 'refund_amount', 'reason_for_refund', 'country'], dtype='object')


Further and deeper cleaing of the data for finer and refined analysis

In [26]:
# Cleaning the data further to prepair it for deeper analysis 

# Replacing the missing values in 'amount_paid' with 0
merged_data['amount_paid'].fillna(value=0, inplace=True)

# Replace missing values in 'late_payment_fee' with 0
merged_data['late_payment_fee'].fillna(value=0, inplace=True)


# Replace missing values in 'refund_amount' with 0
merged_data['refund_amount'].fillna(value=0, inplace=True)

In [27]:
# Calculating the revenue by subtracting the refund amount and late payment fee from the total amount billed
merged_data["revenue"] = merged_data["total_amount_billed"] - merged_data["refund_amount"] - merged_data["late_payment_fee"]

Loading the data to an external file: After successfully merging the datasets, we then save the transformed data into a CSV file. 
Printing the first 5 rows of the merged dataset confirms that the merge of the three data sets was successful.

In [28]:
# Load the transformed data into a CSV file
merged_data.to_csv(output_path, index=False)

# Print the first 5 rows of the transformed data
merged_data.head()

Unnamed: 0,customer_id,date,total_amount_billed,payment_status_purchase,payment_method_purchase,promo_code,country,amount_paid,payment_method_payment,payment_status_payment,late_payment_fee,refund_amount,reason_for_refund,revenue
0,101,2021-04-01,100.0,paid,credit card,PROMO1,USA,100.0,credit card,paid,0.0,0.0,,100.0
1,102,2021-04-02,200.0,paid,bank transfer,PROMO2,USA,0.0,,,0.0,0.0,,200.0
2,103,2021-04-02,50.0,overdue,credit card,,UK,0.0,,,0.0,0.0,,50.0
3,104,2021-04-03,75.0,disputed,e-wallet,PROMO3,UK,0.0,,,0.0,0.0,,75.0
4,105,2021-04-04,125.0,paid,credit card,PROMO4,USA,0.0,,,0.0,0.0,,125.0


**Automating  the process using cron job:

In [None]:
# We can auotmate the process by setting up a cron job to execute automatically as desired 
# Here's an example of how to set up a cron job to run the data pipeline script every day at 3:00 AM:

0 3 * * * /path/to/python /path/to/data_pipeline.py

This unix crontab ebtry, schedules a command to run at a specified time interval.

The entry "0 3 * * *" means that the command will be executed at 3:00 AM every day.

The command itself is "/path/to/python /path/to/python_data_pipeline.py", which is a Python script that likely performs some data processing or analysis.

Overall, this crontab entry schedules a daily data processing job to run at 3:00 AM using the specified Python script.

You can also use bash script and cron job simultaneously to schedule this job. 
Here's an example bash script you can use for the cron job:

In [None]:
#!/bin/bash
python /path/to/python_data_pipeline.py


Save the script with a filename such as run_python_data_pipeline.sh in the same directory as your Python script.

Make sure to give the script execute permissions using the command chmod +x run_python_data_pipeline.sh.

Then, update your cron job to execute the bash script at the desired time:

In [None]:
0 3 * * * /path/to/run_data_pipeline.sh


This will execute the Python script at 3:00 AM every day using the run_python_data_pipeline.sh script.