# Building an Operational Data Store with Firestore and GCS

This notebook implements a multi-layer ODS approach using Google Cloud services. 

**Architecture (Best Practices):**
1. **Operational Layer (Firestore):** Stores recent orders and serves individual document lookups.
2. **Aggregate Document Pattern (Firestore):** Instead of real-time aggregation (which can be expensive), we maintain a 'summary' document that stores pre-computed metrics like revenue per category. This is a common and efficient pattern in Firestore.
3. **Archive Layer (GCS):** Stores monthly exports for long-term durability and compliance.

## Part 0: Setup

Install necessary packages if not already present:

In [None]:
%pip install google-cloud-firestore google-cloud-storage python-dotenv pandas numpy --quiet

### Imports

In [None]:
import os
import json
import time
from datetime import datetime, timedelta
from pathlib import Path

import numpy as np
import pandas as pd
from google.cloud import firestore, storage
from dotenv import load_dotenv

print("Imports OK")

## Part 1: Credentials

Load credentials from the `.env` file.

In [None]:
load_dotenv(Path("../.env"))

GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME")

print(f"Project ID: {GCP_PROJECT_ID}")

## Part 2: Connect to Firestore

In [None]:
# Connect to Firestore
db = firestore.Client(project=GCP_PROJECT_ID)
print("Connected to Firestore.")

## Part 3: Load Data into Firestore

Generate 100 synthetic orders and insert into Firestore.

In [None]:
np.random.seed(42)

CATEGORIES = ["footwear", "apparel", "accessories", "electronics"]
products_data = []
for i in range(1, 21):
    category = np.random.choice(CATEGORIES)
    products_data.append({
        "product_id": f"PROD_{i:03d}",
        "name": f"{category.title()} Item {i}",
        "category": category,
        "price": round(np.random.uniform(20, 300), 2),
    })

REGIONS = ["North", "South", "East", "West"]
STATUSES = ["pending", "shipped", "delivered", "returned"]
orders_data = []
start_date = datetime.now() - timedelta(days=35)

for i in range(1, 101):
    product = np.random.choice(products_data)
    quantity = np.random.randint(1, 4)
    order_date = start_date + timedelta(days=int(np.random.randint(0, 35)),
                                        hours=int(np.random.randint(0, 24)))
    orders_data.append({
        "order_id": f"ORD_{i:05d}",
        "customer_id": f"CUST_{np.random.randint(1, 101):04d}",
        "region": np.random.choice(REGIONS),
        "product_id": product["product_id"],
        "category": product["category"],
        "unit_price": product["price"],
        "quantity": quantity,
        "total": round(product["price"] * quantity, 2),
        "status": np.random.choice(STATUSES, p=[0.05, 0.15, 0.75, 0.05]),
        "created_at": order_date,
    })

# Batch Insert
batch = db.batch()
orders_ref = db.collection("orders")
for order in orders_data:
    doc_ref = orders_ref.document(order["order_id"])
    batch.set(doc_ref, order)
batch.commit()
print(f"Inserted {len(orders_data)} orders into Firestore.")

## Part 4: The 'Aggregate Document' Pattern

Instead of running a heavy query across thousands of orders every time you load a dashboard, we maintain a summary document.

In [None]:
def update_aggregate_document():
    """
    In a real system, this would be updated on every new order (e.g. via Cloud Functions) 
    or scheduled every few minutes.
    """
    cutoff = datetime.now() - timedelta(days=30)
    docs = db.collection("orders").where("created_at", ">=", cutoff).where("status", "!=", "returned").stream()
    
    summary = {}
    for doc in docs:
        d = doc.to_dict()
        cat = d["category"]
        if cat not in summary:
            summary[cat] = {"revenue": 0, "orders": 0}
        summary[cat]["revenue"] += d["total"]
        summary[cat]["orders"] += 1
        
    # Save the summary to a single, dedicated document
    db.collection("aggregates").document("category_revenue").set({
        "last_updated": firestore.SERVER_TIMESTAMP,
        "data": summary
    })
    print("Aggregate document updated.")

update_aggregate_document()

## Part 5: Efficient Dashboard Querying

Now we get the dashboard metrics by reading just ONE document, rather than hundreds of orders.

In [None]:
def get_dashboard_metrics():
    doc = db.collection("aggregates").document("category_revenue").get()
    if not doc.exists:
        print("Aggregate not found. This shouldn't happen if the worker is running.")
        return []
    
    data = doc.to_dict()["data"]
    formatted = []
    for c, m in data.items():
        formatted.append({"_id": c, "revenue": round(m["revenue"], 2), "orders": m["orders"]})
    
    return sorted(formatted, key=lambda x: x["revenue"], reverse=True)

print("Dashboard Results (from Aggregate Document):")
for row in get_dashboard_metrics():
    print(f"  {row['_id']:<15}  ${row['revenue']:>8,.2f}   ({row['orders']} orders)")

## Part 6: Archive to GCS

Export month-to-date orders to JSONL and upload.

In [None]:
if GCS_BUCKET_NAME and GCS_BUCKET_NAME != "YOUR_BUCKET_NAME":
    bucket = storage.Client(project=GCP_PROJECT_ID).bucket(GCS_BUCKET_NAME)
    export_path = Path("./orders_archive.jsonl")
    
    month_start = datetime.now().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
    docs = db.collection("orders").where("created_at", ">=", month_start).stream()
    
    with open(export_path, "w") as f:
        for doc in docs:
            order = doc.to_dict()
            order["created_at"] = order["created_at"].isoformat() 
            f.write(json.dumps(order) + "\n")
            
    gcs_path = f"shopstream/orders/{datetime.now().strftime('%Y/%m/%d')}/orders_archive.jsonl"
    bucket.blob(gcs_path).upload_from_filename(str(export_path))
    print(f"Archived to: gs://{GCS_BUCKET_NAME}/{gcs_path}")
else:
    print("GCS_BUCKET_NAME not set. Archiving skipped.")