# S3‑to‑Mongo ETL Pipeline

**What this notebook does**  
This notebook implements our AWS S3 → MongoDB ingestion pipeline to:  
1. Initialize the S3 client and load configuration.  
2. List and sort all GZIP files in a given date prefix.  
3. Read and parse JSON lines from each `.gz` file in S3.  
4. Batch‑insert articles into MongoDB (via `insert_articles`) with duplicate handling.  
5. Report totals of inserted vs. duplicate records per day.  


# 1. Imports & Environment Setup  
Import required libraries, load environment variables, and initialize modules.


In [None]:
import boto3
import gzip
import io
import json
import os
from dotenv import load_dotenv
from mongo import insert_articles
import time

# Load environment variables
load_dotenv()


# 2. Initialize AWS S3 Client & Constants  
Create the `boto3` S3 client and define batch size.


In [None]:
# Initialize S3 client
s3 = boto3.client("s3")

BATCH_SIZE = 500  # safe, small batches


# 3. List GZIP Files for a Given Day  
List and sort all `.gz` keys under the `date_str` prefix in the S3 bucket.


In [None]:
def list_gz_files_for_day(bucket, date_str):
    prefix = f"{date_str}/"  
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    files = [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".gz")]
    return sorted(files)


# 4. Read & Parse a GZIP File from S3  
Download the GZIP object, decompress line by line, and parse each JSON record.


In [None]:
def read_gz_file_from_s3(bucket, key):
    obj = s3.get_object(Bucket=bucket, Key=key)
    gzipped_body = obj['Body']

    records = []
    with gzip.GzipFile(fileobj=gzipped_body, mode='rb') as gz:
        for line in gz:
            try:
                line = line.decode("utf-8", errors="replace").strip()
                if not line:
                    continue
                data = json.loads(line)
                records.append(data)
            except json.JSONDecodeError as e:
                print(f"Skipping corrupted line in {key}: {e}")
    return records


# 5. Process & Insert Articles for One Day  
For each `.gz` file on `date_str`, read records in batches and call `insert_articles`.  
Returns total inserted and duplicate counts.


In [None]:
def process_articles_for_day(bucket, date_str):
    keys = list_gz_files_for_day(bucket, date_str)

    total_inserted = 0
    total_duplicates = 0
    pending_batch = []

    for key in keys:
        print(f"Processing file: {key}")
        articles = read_gz_file_from_s3(bucket, key)
        if articles:
            pending_batch.extend(articles)

            if len(pending_batch) >= BATCH_SIZE:
                inserted, duplicates = insert_articles(pending_batch)
                total_inserted += inserted
                total_duplicates += duplicates
                pending_batch = []
                time.sleep(1)  # let Mongo breathe

    if pending_batch:
        inserted, duplicates = insert_articles(pending_batch)
        total_inserted += inserted
        total_duplicates += duplicates

    return total_inserted, total_duplicates
