In [None]:
# Installing pymongo package

In [1]:
!pip install pymongo

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pymongo
  Downloading pymongo-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (492 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m492.1/492.1 KB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0
  Downloading dnspython-2.3.0-py3-none-any.whl (283 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m283.7/283.7 KB[0m [31m25.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.3.0 pymongo-4.3.3


In [2]:
import pandas as pd
import pymongo
import logging
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

In [5]:
!curl ipecho.net/plain

34.125.30.149

In [12]:
# Extraction function
def extract_data(file_path):
    # Load call log data from CSV file
    call_logs = pd.read_csv('call_logs.csv')

    # Load billing data from CSV file
    billing_data = pd.read_csv('billing_data.csv')

    # Merge the two datasets based on common columns
    merged_data = pd.merge(call_logs, billing_data, on=['phone_number', 'call_date'])

    # Convert call duration to minutes for easier analysis
    merged_data['duration_minutes'] = merged_data['call_duration'] / 60

    # Use Python logging module to log errors and activities
    logger = logging.getLogger(__name__)
    logger.info("Data extraction completed.")

    return merged_data

In [16]:
# Transformation function
# Rename columns for better readability
def transform_data(df):
    df.rename(columns={
    'phone_number': 'phone',
    'call_date': 'date',
    'duration_minutes': 'duration',
    'billing_type': 'billing'
    }, inplace=True)

# Convert date column to datetime format
    df['date'] = pd.to_datetime(df['date'])

# Group and aggregate the data
    group_cols = ['phone', pd.Grouper(key='date', freq='W')]
    agg_cols = {
    'duration': 'sum',
    'billing': 'first'
    }
    transformed_data = df.groupby(group_cols).agg(agg_cols).reset_index()

# Identify patterns in the data
# Extract weekday and hour from the date column
    transformed_data['weekday'] = transformed_data['date'].dt.weekday_name
    transformed_data['hour'] = transformed_data['date'].dt.hour

# Use Python logging module to log errors and activities
    logger = logging.getLogger(__name__)
    logger.info("Data transformation completed.")

    return transformed_data

In [6]:
# Loading function
def load_data(transformed_data):
    # Connect to MongoDB
    client = MongoClient("mongodb+srv://samuelnyabuto:Test@123@cluster0.llycovh.mongodb.net/?retryWrites=true&w=majority")
    db = client.get_database('Mongo_db')
    collection = db.Mongo_records

    # Create indexes on the collection
    collection.create_index([('phone', pymongo.ASCENDING), ('date', pymongo.ASCENDING)])

    # Use bulk inserts to optimize performance
    documents = transformed_data.to_dict('records')
    collection.insert_many(documents)

    # Use the write concern option to ensure that data is written to disk
    collection.acknowledge_writes(w=1, j=True)

# Use Python logging module to log errors and activities
logger = logging.getLogger(__name__)
logger.info("Data loading completed.")

In [None]:
# Example usage
if __name__ == '__main__':
    file_path = 'call_logs.csv'
    data = extract_data(file_path)
    transform_data = transformed_data(data)
    load_data(transform_data)