<a href="https://colab.research.google.com/github/WKhisa/Data-Pipelines-with-Python-with-MongoDB-Project/blob/main/Data_Pipelines_with_Python_with_MongoDB_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
#Install and import prerequisite libraries
!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (671 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m671.3/671.3 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.4.2-py3-none-any.whl (300 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m300.4/300.4 kB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.4.2 pymongo-4.5.0


In [5]:
import pandas as pd
import pymongo
import logging

In [6]:
#Extraction Function
# Extraction function
def extract_data():
    # Load call log data from CSV file
    call_logs = pd.read_csv('call_logs.csv')
    call_logs.rename(columns = {'call_date':'date'}, inplace = True)

    # Load billing data from CSV file
    billing_data = pd.read_csv('billing_systems.csv')
    billing_data.rename(columns = {'transaction_date':'date'}, inplace = True)

    # Merge the two datasets based on common columns
    merged_data = pd.merge(call_logs, billing_data, on=['date'])

    # Convert call duration to minutes for easier analysis
    merged_data['duration_minutes'] = merged_data['call_duration'] / 60

    # Use Python logging module to log errors and activities
    logger = logging.getLogger(__name__)
    logger.info("Data extraction completed.")

    return merged_data

In [7]:
extract_data()

Unnamed: 0,call_id,caller_number,receiver_number,call_duration,call_type,date,transaction_id,customer_id,transaction_amount,transaction_type,duration_minutes
0,1,700123456,712345678,120,Outgoing,2022-02-21,1,1001,500.0,Recharge,2.0
1,1,700123456,712345678,120,Outgoing,2022-02-21,2,1002,200.0,Recharge,2.0
2,2,712345678,755555555,60,Incoming,2022-02-21,1,1001,500.0,Recharge,1.0
3,2,712345678,755555555,60,Incoming,2022-02-21,2,1002,200.0,Recharge,1.0
4,3,722222222,777777777,180,Outgoing,2022-02-22,3,1001,50.0,Data,3.0
5,3,722222222,777777777,180,Outgoing,2022-02-22,4,1003,1000.0,Recharge,3.0
6,4,712345678,766666666,90,Incoming,2022-02-23,5,1004,500.0,Recharge,1.5
7,5,733333333,722222222,240,Outgoing,2022-02-23,5,1004,500.0,Recharge,4.0


In [8]:
# Transformation function
def transform_data():
    merged_data = extract_data()
    # Data cleaning and handling missing values
    merged_data.drop_duplicates(inplace=True)
    merged_data.fillna(value={'call_type': 'unknown', 'call_duration': 0}, inplace=True)
    merged_data['date'] = pd.to_datetime(merged_data['date'])

    # Group and aggregate the data
    aggregate_df = merged_data.groupby(['customer_id', 'transaction_type'])['transaction_amount'].agg(['sum', 'count'])

    # calculate the average transaction amount for each customer and transaction type
    aggregate_df['avg_transaction_amount'] = aggregate_df['sum'] / aggregate_df['count']
    # identify suspicious transactions by finding transactions that are more than 3 standard deviations above the average transaction amount
    suspicious_transactions = aggregate_df[aggregate_df['sum'] > aggregate_df['avg_transaction_amount'] * 3]
    # Use Python logging module to log errors and activities
    logger = logging.getLogger(__name__)
    logger.info("Data transformation completed.")

    return suspicious_transactions


In [10]:
transform_data()


Unnamed: 0_level_0,Unnamed: 1_level_0,sum,count,avg_transaction_amount
customer_id,transaction_type,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1


# Loading Function

In [None]:
# Loading function
def load_data(transformed_data):
    # Connect to MongoDB
    client = pymongo.MongoClient(host, port, ssl=True, ssl_cert_reqs='CERT_NONE')
    db = client[db_name]
    collection = db[collection_name]
    collection.acknowledge_writes(w=1, j=True)

    # Use Python logging module to log errors and activities
    logger = logging.getLogger(__name__)
    logger.info("Data loading completed.")

    # Example usage
if __name__ == '__main__':
    call_logs_path = ('call_logs.csv')
    billing_data_path = ('billing_systems.csv')
    merged_data = extract_data(call_logs_path, billing_data_path)
    transformed_data = transform_data(merged_data)
    load_data(transformed_data)

    # Connect to the MongoDB server
    host = 'localhost'
    port = 27017
    db_name = 'billing_data'
    collection_name = 'billing_data'
    load_data(transformed_data, host, port, db_name, collection_name)