In [1]:
import os
import pandas as pd
import subprocess
from pymongo import MongoClient
from datetime import datetime

In [15]:
# MongoDB Connection
MONGODB_URI = "mongodb+srv://Bitcoin:Dashcode231@cluster0.onusv.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
DATABASE_NAME = "GDDA707_A2_BTC_Transactions"
COLLECTION_NAME = "BTC_DailyTransactions"

# HDFS Configuration
HDFS_FILE_PATH = "/user/data/bitcoin_transactions/GDDA707_A2_Processed_Bitcoin_Transaction_HDFS.csv"
LOCAL_FILE_PATH = "GDDA707_A2_Processed_Bitcoin_Transaction_Data_Part_B.csv"
HADOOP_CONTAINER_NAME = "namenode"  

# Log file
LOG_FILE = "mongodb_ingestion_log.txt"

def log_message(message):
    """Logs messages to a log file for tracking the ingestion process."""
    with open(LOG_FILE, 'a') as log:
        log.write(f"{datetime.now()}: {message}\n")
    print(message)

def connect_mongo():
    """Establishes a connection to MongoDB."""
    try:
        client = MongoClient(MONGODB_URI)
        db = client[DATABASE_NAME]
        log_message("Connected to MongoDB successfully.")
        return db
    except Exception as e:
        log_message(f"MongoDB Connection Error: {e}")
        return None

In [17]:
def fetch_file_from_hdfs(hdfs_file_path, local_file_path, container_name):
    """
    Fetches the file from HDFS to the local environment using Docker.
    """
    try:
        # Remove existing file in Docker container's /tmp directory
        remove_file_command = f"rm -f /tmp/{os.path.basename(hdfs_file_path)}"
        subprocess.run(
            ["docker", "exec", container_name, "bash", "-c", remove_file_command],
            check=False,  # Ignore errors if the file does not exist
            text=True,
            capture_output=True,
        )

        # Copy the file from HDFS to the container's /tmp directory
        subprocess.run(
            ["docker", "exec", container_name, "hadoop", "fs", "-get", hdfs_file_path, "/tmp/"],
            check=True,
            text=True,
            capture_output=True,
        )

        # Copy the file from the container to the local machine
        subprocess.run(
            ["docker", "cp", f"{container_name}:/tmp/{os.path.basename(hdfs_file_path)}", local_file_path],
            check=True,
            text=True,
            capture_output=True,
        )

        log_message(f"File successfully fetched from HDFS: {hdfs_file_path}")
        return True
    except subprocess.CalledProcessError as e:
        log_message(f"Failed to fetch file from HDFS: {e.stderr.strip()}")
        return False


def load_data_to_mongodb(local_file_path, db, collection_name):
    """
    Loads data from a CSV file into MongoDB Atlas.
    """
    try:
        # Read the CSV file
        data = pd.read_csv(local_file_path)

        # Convert data to a list of dictionaries and insert into MongoDB
        records = data.to_dict(orient='records')
        db[collection_name].insert_many(records)

        log_message(f"Successfully loaded {len(records)} records into MongoDB collection '{collection_name}'.")
        return True
    except Exception as e:
        log_message(f"Failed to load data into MongoDB: {e}")
        return False

def query_mongodb(db, collection_name):
    """
    Executes example queries on the MongoDB collection to demonstrate functionality.
    """
    try:
        collection = db[collection_name]

        # Example Query 1: Total fees for a specific date
        date = "2024-07-06"
        total_fees = collection.aggregate([
            {"$match": {"Date": date}},
            {"$group": {"_id": None, "TotalFees": {"$sum": "$Transaction Fee (USD)"}}}
        ])
        for result in total_fees:
            print(f"Total fees on {date}: {result['TotalFees']} USD")

        # Example Query 2: Aggregate data by month
        monthly_data = collection.aggregate([
            {"$group": {
                "_id": {"$substr": ["$Date", 0, 7]},  # Extract year-month
                "TotalTransactions": {"$sum": 1},
                "TotalInput": {"$sum": "$Input Total (USD)"},
                "TotalOutput": {"$sum": "$Output Total (USD)"}
            }},
            {"$sort": {"_id": 1}}
        ])
        print("Monthly Data Aggregation:")
        for result in monthly_data:
            print(result)

        log_message("MongoDB queries executed successfully.")
    except Exception as e:
        log_message(f"Failed to execute MongoDB queries: {e}")

In [19]:
def main_pipeline():
    """
    Main function to execute the data ingestion pipeline.
    """
    log_message(">>>>>>>>>>> MongoDB ingestion pipeline >>>>>>>>>>>")

    # Step 1: Establish MongoDB connection
    db = connect_mongo()

    if db is None:  # Explicit comparison to None
        log_message("FAILED: MONGODB CONNECTION...... Exiting pipeline.....")
        return

    # Step 2: Fetch File from HDFS
    if fetch_file_from_hdfs(HDFS_FILE_PATH, LOCAL_FILE_PATH, HADOOP_CONTAINER_NAME):
        # Step 3: Load Data into MongoDB
        load_data_to_mongodb(LOCAL_FILE_PATH, db, COLLECTION_NAME)

    log_message("MONGODB INGESTION PIPELINE COMPLETED.")

if __name__ == "__main__":
    main_pipeline()


Starting MongoDB ingestion pipeline.
Connected to MongoDB successfully.
File successfully fetched from HDFS: /user/data/bitcoin_transactions/GDDA707_A2_Processed_Bitcoin_Transaction_HDFS.csv
Successfully loaded 184 records into MongoDB collection 'BTC_DailyTransactions'.
MongoDB ingestion pipeline completed.
