In [5]:
# ============================================================
# CLOUD PIPELINE EMULATOR (RUNS DIRECTLY IN GOOGLE COLAB)
# ============================================================

import os
import json
import time
from datetime import datetime, timezone

# --- CONFIGURATION ---
INBOX = "inbox"
PROCESSED = "processed"
DATA_DIR = "data"
METADATA_FILE = os.path.join(DATA_DIR, "metadata.jsonl")
POLL_INTERVAL = 1.5  # seconds

os.makedirs(INBOX, exist_ok=True)
os.makedirs(PROCESSED, exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)


# ============================================================
# HANDLER: INGESTION (like AWS Lambda for S3 -> DynamoDB)
# ============================================================
def ingestion_handler(event):
    try:
        records = event.get("Records", [])
        for record in records:
            s3 = record.get("s3", {})
            bucket = s3.get("bucket", {}).get("name")
            key = s3.get("object", {}).get("key")

            # append metadata line
            metadata = {
                "fileKey": key,
                "bucket": bucket,
                "status": "RECEIVED",
                "ingestedAt": datetime.now(timezone.utc).isoformat(),
            }
            with open(METADATA_FILE, "a", encoding="utf-8") as f:
                f.write(json.dumps(metadata) + "\n")

        return {"statusCode": 200, "body": json.dumps({"message": "Metadata stored"})}

    except Exception as e:
        return {"statusCode": 500, "body": json.dumps({"error": str(e)})}


# ============================================================
# HANDLER: TRANSFORM (like ETL Lambda)
# ============================================================
def transform_handler(event):
    try:
        for record in event.get("Records", []):
            s3 = record.get("s3", {})
            bucket = s3.get("bucket", {}).get("name")
            key = s3.get("object", {}).get("key")

            src_path = os.path.join(bucket, key)
            dst_path = os.path.join(PROCESSED, key)
            os.makedirs(os.path.dirname(dst_path), exist_ok=True)

            # read and transform (uppercase)
            with open(src_path, "r", encoding="utf-8") as f:
                content = f.read()
            transformed = content.upper()

            # write processed version
            with open(dst_path, "w", encoding="utf-8") as f:
                f.write(transformed)

        return {"statusCode": 200, "body": json.dumps("Transformation complete")}

    except Exception as e:
        return {"statusCode": 500, "body": json.dumps({"error": str(e)})}


# ============================================================
# FUNCTION: TRIGGER EVENT (simulate S3 event)
# ============================================================
def make_s3_event(file_path):
    return {
        "Records": [
            {
                "s3": {
                    "bucket": {"name": INBOX},
                    "object": {"key": os.path.basename(file_path)},
                }
            }
        ]
    }


# ============================================================
# MAIN PIPELINE RUNNER
# ============================================================
def run_pipeline():
    print("📦 Watching folder:", INBOX)
    print("📑 Metadata file:", METADATA_FILE)
    print("⚙️  Drop any .txt file into the 'inbox' folder to trigger processing.")
    print("🔁 Press Stop (■) in Colab toolbar to end.\n")

    seen = set()
    os.makedirs(INBOX, exist_ok=True)

    while True:
        files = [f for f in os.listdir(INBOX) if os.path.isfile(os.path.join(INBOX, f))]
        for file in files:
            if file in seen:
                continue
            path = os.path.join(INBOX, file)
            event = make_s3_event(path)
            ingestion_handler(event)
            transform_handler(event)
            seen.add(file)
            print(f"✅ Processed '{file}' at {datetime.now().strftime('%H:%M:%S')}")

        time.sleep(POLL_INTERVAL)


# ============================================================
# EXAMPLE: CREATE A SAMPLE FILE
# ============================================================
def create_sample_file():
    os.makedirs(INBOX, exist_ok=True)
    file_path = os.path.join(INBOX, "example.txt")
    with open(file_path, "w", encoding="utf-8") as f:
        f.write("hello from colab")
    print("📝 Created:", file_path)
    return file_path


# ============================================================
# QUICK DEMO
# ============================================================
print("🚀 Starting local serverless-like data pipeline demo in Colab...\n")

# create a sample file for demo
create_sample_file()

# run the watcher loop for a short demo
# (You can stop it manually in Colab with the stop button.)
try:
    run_pipeline()
except KeyboardInterrupt:
    print("🛑 Pipeline stopped by user.")


🚀 Starting local serverless-like data pipeline demo in Colab...

📝 Created: inbox/example.txt
📦 Watching folder: inbox
📑 Metadata file: data/metadata.jsonl
⚙️  Drop any .txt file into the 'inbox' folder to trigger processing.
🔁 Press Stop (■) in Colab toolbar to end.

✅ Processed 'example.txt' at 08:53:19
🛑 Pipeline stopped by user.
