# Course: Data Engineering
# **Practical Exercise: Building a Basic Data Pipeline With Error Handling**
# Prepared by: Georges Assaf



<a href="https://colab.research.google.com/github/gassaf2/DataEngineering/blob/main/week3/Practical Exercise Building a Basic Data Pipeline With Error Handling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Step 2: Extract Data

Write a Python function to extract the data from the CSV file using Pandas. If there are any issues (e.g., file not found), log the error and raise it for further investigation.

In [15]:
import logging
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO, filename='pipeline.log',filemode='w',format='%(asctime)s - %(levelname)s - %(message)s')
# Extract data from the CSV file

def extract_data():
    try:
        logging.info("Extracting sales data.csv")
        data = pd.read_csv('./sample_data/salesee.csv')
        logging.info("data extracted successfully")
        return data
    except FileNotFoundError as e:
        logging.error(f"Error: File not found - {e}")
        raise
    except Exception as e:
        logging.error(f"Unexpected error during data extraction: {e}")
        raise


In [30]:
data=extract_data()
data

Unnamed: 0,date,product id,sales amount,store location
0,2/5/2025,P001,150,New York
1,2/5/2025,P002,300,Los Angeles
2,2/5/2025,P003,450,Chicago
3,2/5/2025,P004,600,Houston
4,2/5/2025,P005,750,Seattle
5,2/5/2025,P001,900,New York
6,2/5/2025,P002,950,Los Angeles
7,2/6/2025,P001,150,New York
8,2/6/2025,P002,300,Los Angeles
9,2/6/2025,P003,450,Chicago


In [2]:
sales_df=extract_data()
sales_df

Unnamed: 0,transaction_id,customer_id,product_id,quantity,price
0,T001,C001,P001,2,100
1,T002,C002,P002,1,200
2,T003,C001,P003,3,50


# Step 3: Transform Data

Transform the extracted data by calculating the total revenue for each transaction as quantity * price.

In [3]:
def transform_data(sales_df):
    try:
        logging.info("transforming data")
        sales_df['total_revenue']=sales_df['quantity']*sales_df['price']
        logging.info("transforming data is successfully completed")
        return sales_df
    except Exception as e:
        logging.error(f"Unexpected error during data extraction: {e}")
        raise
        

In [4]:
transformed_sales_df=transform_data(sales_df)
transformed_sales_df

Unnamed: 0,transaction_id,customer_id,product_id,quantity,price,total_revenue
0,T001,C001,P001,2,100,200
1,T002,C002,P002,1,200,200
2,T003,C001,P003,3,50,150


# Step 4: Load Data into MongoDB

Use PyMongo to load the transformed data into a MongoDB collection. Implement error handling to log any connection or data insertion issues.

In [5]:
from pymongo import MongoClient
from datetime import datetime
import time


In [6]:

def load_data(sales_df,retries=3,delay=5):
    for i in range(retries):
        try:
            connection_string="mongodb+srv://gassaf2:dbUserPassword@cluster0.xjx2q.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
            # Connect to the MongoDB Atlas cluster
            client = MongoClient(connection_string)
            # Access a specific database
            db = client['e-commerce']
            # Access a the collection sales within the database
            sales = db['sales']
            #Load the data in sales
            # Convert DataFrame to dictionary format
            sales_dict = sales_df.to_dict(orient="records")
            # Insert the patients data into MongoDB
            logging.info("start inserting the data into MongoDB")
            sales.insert_many(sales_dict)
            logging.info("Inserting data is successfully completed")
            print("Data loaded in MongoDB sucessfully")
            break
        except ConnectionError as e:
            logging.error(f"Unexpected error during data insertion: {e}")
            print(f"Connection failed: {e}. Retrying in {delay} seconds...")
            
            time.sleep(delay)
        except Exception as e:
            logging.error(f"Error loading data into MongoDB: {e}")
            print(f"Error loading data into MongoDB: {e}")
            raise
     

In [7]:
load_data(transformed_sales_df)

Data loaded in MongoDB sucessfully
