<a href="https://colab.research.google.com/github/Francisss3/Data-Engineering/blob/main/Building_basic_data_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pandas pymongo dnspython

Collecting pymongo
  Downloading pymongo-4.13.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.13.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m28.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.13.2


In [3]:
# %% [code]
import pandas as pd
from pymongo import MongoClient
import logging
import sys

In [4]:
# 0) Setup logging
logger = logging.getLogger('etl_pipeline')
logger.setLevel(logging.INFO)
fh = logging.FileHandler('pipeline.log')
fh.setLevel(logging.INFO)
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
fh.setFormatter(fmt)
logger.addHandler(fh)

In [5]:
def extract(path: str) -> pd.DataFrame:
    """Read CSV into DataFrame, log any errors."""
    try:
        df = pd.read_csv(path, encoding='latin1')
        logger.info(f"Extracted {len(df)} rows from {path}")
        return df
    except FileNotFoundError as e:
        logger.error(f"File not found: {e}")
        raise
    except Exception as e:
        logger.error(f"Failed to read CSV: {e}")
        raise

In [6]:
def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Compute revenue per line then aggregate to total revenue per order."""
    try:
        # calculate line-level revenue
        df['line_revenue'] = df['QUANTITYORDERED'] * df['PRICEEACH']
        logger.info("Added column 'line_revenue' = QUANTITYORDERED * PRICEEACH")

        # aggregate per transaction (ORDERNUMBER)
        agg = (
            df
            .groupby('ORDERNUMBER', as_index=False)['line_revenue']
            .sum()
            .rename(columns={'ORDERNUMBER': 'transaction_id',
                             'line_revenue': 'total_revenue'})
        )
        logger.info(f"Aggregated to {len(agg)} unique transactions")
        return agg

    except KeyError as e:
        logger.error(f"Missing expected column: {e}")
        raise
    except Exception as e:
        logger.error(f"Transformation error: {e}")
        raise


In [7]:
def load(df: pd.DataFrame, uri: str):
    """Insert the transformed DataFrame into MongoDB."""
    try:
        client = MongoClient(uri)
        db = client.ecommerce
        col = db.transactions
        records = df.to_dict(orient='records')
        res = col.insert_many(records)
        logger.info(f"Loaded {len(res.inserted_ids)} records into MongoDB")
    except Exception as e:
        logger.error(f"Loading error: {e}")
        raise


In [9]:
def main():
    CSV_PATH = '/content/sales_data_sample.csv'
    MONGO_URI = (
        "mongodb+srv://charbelfrancis03:"
        "hUmsVOJeW3LRlLZ6"
        "@cluster0.8otd0br.mongodb.net/"
        "?retryWrites=true&w=majority"
    )

    # Run ETL
    df_raw = extract(CSV_PATH)
    df_trans = transform(df_raw)
    load(df_trans, MONGO_URI)
    logger.info("ETL pipeline completed successfully.")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logger.critical(f"ETL pipeline failed: {e}")
        sys.exit(1)

INFO:etl_pipeline:Extracted 2823 rows from /content/sales_data_sample.csv
INFO:etl_pipeline:Added column 'line_revenue' = QUANTITYORDERED * PRICEEACH
INFO:etl_pipeline:Aggregated to 307 unique transactions
INFO:etl_pipeline:Loaded 307 records into MongoDB
INFO:etl_pipeline:ETL pipeline completed successfully.
