# PrivacyScrub V2 - API-First Media Anonymization (Master)

**Version:** 2.0 (SRS Compliant, Full Pipeline, Metrics, Frontend Deploy)  
* Live app available at https://privacyscrub.streamlit.app/

## Overview
This notebook implements the full PrivacyScrub V2 specification. It deploys a FastAPI service to Google Cloud Run and deploys a Streamlit Frontend to GitHub.

### Architecture Components:
1.  **Backend (Cloud Run):** FastAPI service with Sync (Image) and Async (Video) endpoints.
2.  **Worker (Cloud Tasks):** Async video processing using MoviePy and Cloud Vision.
3.  **Database (Firestore):** Job status and error tracking.
4.  **Storage (GCS):** Ephemeral input/output storage.
5.  **Frontend (Streamlit):** Interactive UI (Deployed to GitHub).

## Prerequisites
1.  **Google Cloud Project:** With Billing Enabled.
2.  **Colab Secrets:** Set the following in the sidebar (Key Icon):
    * `GCP_PROJECT_ID`
    * `GCP_REGION` (e.g., `us-central1`)
    * `SERVICE_NAME` (e.g., `privacyscrub-api`)
    * `GCS_BUCKET_NAME` (e.g., `privacyscrub-temp`)
    * `GITHUB_TOKEN` (For deploying the frontend)

# 1.0 Setup & Configuration
This section installs necessary libraries, authenticates with Google Cloud, and sets up the local environment variables.

In [None]:
# --- 1.1 Install Dependencies (V2 Spec) ---
# We install the specific versions needed for the backend and cloud interaction.
# NOTE: We pin moviepy to 1.0.3 here as well to match the Docker environment.

print("Installing V2 Dependencies... this may take a minute.")

!pip install -U -q \
  "fastapi[all]" \
  uvicorn \
  python-multipart \
  google-cloud-vision \
  google-cloud-storage \
  google-cloud-tasks \
  google-cloud-firestore \
  opencv-python-headless \
  pillow \
  "moviepy==1.0.3" \
  gcsfs

print("? Installation Complete. If you see a restart warning, restart the runtime and skip this cell.")

Installing V2 Dependencies... this may take a minute.


In [None]:
# --- 1.2 Authentication & Environment Setup ---
# This cell handles the OAUTH2 flow with Google Cloud and configures the CLI.

import os
import sys
from google.colab import auth, userdata

# Authenticate User via Pop-up
print("Authenticating with Google Cloud...")
auth.authenticate_user()

# Load Secrets from Colab Sidebar
try:
    PROJECT_ID = userdata.get('GCP_PROJECT_ID')
    REGION = userdata.get('GCP_REGION')
    SERVICE_NAME = userdata.get('SERVICE_NAME')
    BUCKET_NAME = userdata.get('GCS_BUCKET_NAME')

    os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

    print(f"? Config Loaded: {PROJECT_ID} ({REGION})")
    print(f"   Target Bucket: {BUCKET_NAME}")

    # Configure gcloud CLI to use these settings
    !gcloud config set project {PROJECT_ID}
    !gcloud config set run/region {REGION}

except Exception as e:
    print("\n? ERROR: Missing Colab Secrets.")
    print("Please ensure GCP_PROJECT_ID, GCP_REGION, SERVICE_NAME, and GCS_BUCKET_NAME are set.")
    raise e

Authenticating with Google Cloud...
? Config Loaded: privacyscrub-backend (us-central1)
   Target Bucket: privacyscrub-backend-temp-videos
INFORMATION: Project 'privacyscrub-backend' has no 'environment' tag set. Use either 'Production', 'Development', 'Test', or 'Staging'. Add an 'environment' tag using `gcloud resource-manager tags bindings create`.
Updated property [core/project].
Updated property [run/region].


# 2.0 Application Logic (The Backend)
We will write the application files to the local disk so they can be packaged into a Docker container. This includes the Pydantic models, the OpenCV processing logic, and the FastAPI routes.

In [None]:
# --- 2.1 Create Application Directory ---
import os
os.makedirs("app", exist_ok=True)
print("Created 'app' directory for source code.")

Created 'app' directory for source code.


In [None]:
%%writefile app/config.py
# --- app/config.py ---
# Defines Data Models, Enums, and Default Configuration Constants (SRS Section 9).

from enum import Enum
from pydantic import BaseModel

class AnonymizeMode(str, Enum):
    BLUR = "blur"
    PIXELATE = "pixelate"
    BLACK_BOX = "black_box"

class ComplianceProfile(str, Enum):
    NONE = "NONE"
    GDPR = "GDPR"
    CCPA = "CCPA"
    HIPAA_SAFE_HARBOR = "HIPAA_SAFE_HARBOR"

class JobStatus(str, Enum):
    QUEUED = "QUEUED"
    PROCESSING = "PROCESSING"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"

# V2 Default Configurations
DEFAULT_CONFIDENCE = 0.5
DEFAULT_MODE = AnonymizeMode.BLUR

class PrivacyConfig(BaseModel):
    target_faces: bool = True
    target_plates: bool = True
    target_logos: bool = True
    target_text: bool = True
    mode: AnonymizeMode = DEFAULT_MODE
    confidence_threshold: float = DEFAULT_CONFIDENCE
    coordinates_only: bool = False
    strip_metadata: bool = True

Overwriting app/config.py


In [None]:
%%writefile app/logic.py
# --- app/logic.py ---
# Updated: Sets max_results=100 to detect crowds and heavy traffic.

import cv2
import numpy as np
from google.cloud import vision
from app.config import PrivacyConfig, AnonymizeMode, ComplianceProfile

def get_config_for_profile(profile: ComplianceProfile, base_config: PrivacyConfig) -> PrivacyConfig:
    """Updates configuration based on V2 SRS Compliance Profiles settings."""
    config = base_config.copy()

    if profile == ComplianceProfile.GDPR:
        config.confidence_threshold = max(config.confidence_threshold, 0.6)
        config.target_text = True
        config.target_faces = True

    elif profile == ComplianceProfile.CCPA:
        config.target_plates = True
        config.target_faces = True
        config.target_text = True

    elif profile == ComplianceProfile.HIPAA_SAFE_HARBOR:
        config.confidence_threshold = 0.4
        config.mode = AnonymizeMode.BLACK_BOX
        config.target_faces = True
        config.target_plates = True
        config.target_text = True
        config.target_logos = True
        config.strip_metadata = True

    return config

def detect_sensitive_features(image_content, config: PrivacyConfig):
    """
    Calls Cloud Vision API to detect requested features (Faces, Text, Objects).
    Returns a standardized list of bounding boxes.
    """
    client = vision.ImageAnnotatorClient()
    image = vision.Image(content=image_content)

    # FIX: Set a high limit (100) to ensure crowds/traffic are fully captured.
    # Default Cloud Vision limit is 10 if unspecified.
    MAX_RESULTS = 100

    features = []
    if config.target_faces:
        features.append({"type_": vision.Feature.Type.FACE_DETECTION, "max_results": MAX_RESULTS})
    if config.target_text:
        features.append({"type_": vision.Feature.Type.TEXT_DETECTION, "max_results": MAX_RESULTS})
    if config.target_logos:
        features.append({"type_": vision.Feature.Type.LOGO_DETECTION, "max_results": MAX_RESULTS})
    if config.target_plates:
        features.append({"type_": vision.Feature.Type.OBJECT_LOCALIZATION, "max_results": MAX_RESULTS})

    if not features:
        return []

    # Execute API Call
    request = vision.AnnotateImageRequest(image=image, features=features)
    response = client.annotate_image(request)

    boxes = []

    # 1. Process Faces
    if config.target_faces:
        for face in response.face_annotations:
            if face.detection_confidence >= config.confidence_threshold:
                vertices = [(v.x, v.y) for v in face.bounding_poly.vertices]
                boxes.append({"type": "face", "poly": vertices})

    # 2. Process Text
    if config.target_text:
        # We skip index 0 (full page text) and iterate over individual blocks/words
        for text in response.text_annotations[1:]:
            vertices = [(v.x, v.y) for v in text.bounding_poly.vertices]
            boxes.append({"type": "text", "poly": vertices})

    # 3. Process Logos
    if config.target_logos:
        for logo in response.logo_annotations:
            if logo.score >= config.confidence_threshold:
                vertices = [(v.x, v.y) for v in logo.bounding_poly.vertices]
                boxes.append({"type": "logo", "poly": vertices})

    # 4. Process Objects (License Plates)
    if config.target_plates:
        for obj in response.localized_object_annotations:
            if obj.name.lower() in ["license plate", "vehicle registration plate"] and obj.score >= config.confidence_threshold:
                vertices = [(v.x, v.y) for v in obj.bounding_poly.normalized_vertices]
                boxes.append({"type": "plate", "poly_norm": vertices})

    return boxes

def apply_redaction_numpy(img, boxes, config: PrivacyConfig):
    """Applies redaction directly to a Numpy array."""
    h, w, _ = img.shape

    for box in boxes:
        if "poly_norm" in box:
            pts = np.array([(int(v[0]*w), int(v[1]*h)) for v in box["poly_norm"]], np.int32)
        else:
            pts = np.array([(int(v[0]), int(v[1])) for v in box["poly"]], np.int32)

        rect = cv2.boundingRect(pts)
        x, y, rw, rh = rect

        x, y = max(0, x), max(0, y)
        rw, rh = min(w-x, rw), min(h-y, rh)

        roi = img[y:y+rh, x:x+rw]
        if roi.size == 0: continue

        if config.mode == AnonymizeMode.BLUR:
            ksize = max(3, rw // 4) | 1
            roi = cv2.GaussianBlur(roi, (ksize, ksize), 30)
            img[y:y+rh, x:x+rw] = roi

        elif config.mode == AnonymizeMode.PIXELATE:
            temp = cv2.resize(roi, (max(1, rw//10), max(1, rh//10)), interpolation=cv2.INTER_LINEAR)
            roi = cv2.resize(temp, (rw, rh), interpolation=cv2.INTER_NEAREST)
            img[y:y+rh, x:x+rw] = roi

        elif config.mode == AnonymizeMode.BLACK_BOX:
            cv2.rectangle(img, (x, y), (x+rw, y+rh), (0, 0, 0), -1)

    return img

def apply_redaction(image_bytes, boxes, config: PrivacyConfig):
    nparr = np.frombuffer(image_bytes, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

    img = apply_redaction_numpy(img, boxes, config)

    success, encoded_img = cv2.imencode('.jpg', img)
    return encoded_img.tobytes()

Overwriting app/logic.py


In [None]:
%%writefile app/main.py
# --- app/main.py ---
# The FastAPI Entrypoint for PrivacyScrub V3.
# Updated to output HTTP URLs for frontend playback.

print("--- INFO: Initializing FastAPI Application... ---")
import os
import json
import uuid
import traceback
import cv2
import datetime
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import Response
from google.cloud import storage, tasks_v2, firestore

# Lazy load config/logic to prevent startup timeouts
from app.config import PrivacyConfig, AnonymizeMode, ComplianceProfile, JobStatus
from app.logic import detect_sensitive_features, apply_redaction, apply_redaction_numpy, get_config_for_profile

print("--- INFO: Dependencies loaded. Starting Server... ---")

app = FastAPI(title="PrivacyScrub V3", version="3.1")

# --- Environment Variables ---
PROJECT_ID = os.environ.get("GCP_PROJECT_ID") or os.environ.get("GOOGLE_CLOUD_PROJECT")
BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME")
REGION = os.environ.get("GCP_REGION", "us-central1")
QUEUE_NAME = "privacyscrub-video-queue"
SERVICE_URL = os.environ.get("SERVICE_URL")

# Initialize Google Cloud Clients
try:
    db = firestore.Client(project=PROJECT_ID)
    storage_client = storage.Client(project=PROJECT_ID)
    tasks_client = tasks_v2.CloudTasksClient()
    print("--- INFO: GCP Clients Initialized Successfully ---")
except Exception as e:
    print(f"--- FATAL ERROR: Initializing GCP clients failed: {e} ---")

@app.get("/")
def root():
    return {
        "message": "PrivacyScrub V3 API is Active",
        "config": {"project": PROJECT_ID, "region": REGION}
    }

# ==========================================
# 1. Image Endpoint (Synchronous)
# ==========================================
@app.post("/v1/anonymize-image")
async def anonymize_image(
    file: UploadFile = File(...),
    profile: ComplianceProfile = Form(ComplianceProfile.NONE),
    mode: AnonymizeMode = Form(AnonymizeMode.BLUR),
    coordinates_only: bool = Form(False),
    target_faces: bool = Form(True),
    target_plates: bool = Form(True),
    target_text: bool = Form(True),
    target_logos: bool = Form(True)
):
    content = await file.read()
    base_config = PrivacyConfig(
        mode=mode, coordinates_only=coordinates_only,
        target_faces=target_faces, target_plates=target_plates,
        target_text=target_text, target_logos=target_logos
    )
    config = get_config_for_profile(profile, base_config)
    boxes = detect_sensitive_features(content, config)

    if config.coordinates_only:
        return {"detected_features": boxes, "count": len(boxes)}

    redacted_bytes = apply_redaction(content, boxes, config)
    return Response(content=redacted_bytes, media_type="image/jpeg")

# ==========================================
# 2. Video Endpoint (Asynchronous Ingest)
# ==========================================
@app.post("/v1/anonymize-video")
async def anonymize_video(
    file: UploadFile = File(...),
    profile: ComplianceProfile = Form(ComplianceProfile.NONE)
):
    try:
        job_id = f"job_{uuid.uuid4()}"
        if not BUCKET_NAME or not PROJECT_ID:
            raise ValueError("Configuration Error: GCS_BUCKET_NAME or PROJECT_ID is missing.")

        # 1. Upload Input
        bucket = storage_client.bucket(BUCKET_NAME)
        blob_in = bucket.blob(f"input/{job_id}_{file.filename}")
        await file.seek(0)
        blob_in.upload_from_file(file.file, content_type=file.content_type)
        input_uri = f"gs://{BUCKET_NAME}/{blob_in.name}"

        # 2. Create Job Record
        db.collection("jobs").document(job_id).set({
            "status": JobStatus.QUEUED,
            "input_uri": input_uri,
            "profile": profile,
            "created_at": firestore.SERVER_TIMESTAMP
        })

        # 3. Dispatch Task
        if not SERVICE_URL:
            print("WARNING: SERVICE_URL not set.")

        parent = tasks_client.queue_path(PROJECT_ID, REGION, QUEUE_NAME)
        task = {
            "http_request": {
                "http_method": tasks_v2.HttpMethod.POST,
                "url": f"{SERVICE_URL}/v1/process-video-task",
                "headers": {"Content-Type": "application/json"},
                "body": json.dumps({"job_id": job_id}).encode()
            }
        }
        tasks_client.create_task(request={"parent": parent, "task": task})

        return {"job_id": job_id, "status": "QUEUED", "message": "Video accepted for async processing"}

    except Exception as e:
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")

# ==========================================
# 3. Video Processing Worker (Task Handler)
# ==========================================
from pydantic import BaseModel
class TaskPayload(BaseModel):
    job_id: str

@app.post("/v1/process-video-task")
def process_video_task(payload: TaskPayload):
    try:
        print(f"Processing Task for Job: {payload.job_id}")
        from moviepy.editor import VideoFileClip

        job_ref = db.collection("jobs").document(payload.job_id)
        job = job_ref.get().to_dict()
        if not job: return {"error": "Job not found"}

        job_ref.update({"status": JobStatus.PROCESSING})

        input_uri = job["input_uri"]

        # --- FILENAME HANDLING ---
        raw_filename = input_uri.split("/")[-1]
        local_input = f"/tmp/{raw_filename}"

        output_filename = f"processed_{raw_filename}"
        if not output_filename.lower().endswith(".mp4"):
            output_filename += ".mp4"
        local_output = f"/tmp/{output_filename}"

        # 1. Download
        bucket = storage_client.bucket(BUCKET_NAME)
        blob_in = bucket.blob(input_uri.split(f"gs://{BUCKET_NAME}/")[1])
        blob_in.download_to_filename(local_input)

        # 2. Config
        profile_str = job.get("profile", "NONE")
        try: profile_enum = ComplianceProfile(profile_str)
        except: profile_enum = ComplianceProfile.NONE
        config = get_config_for_profile(profile_enum, PrivacyConfig())

        # 3. Process
        clip = VideoFileClip(local_input)

        def process_frame(frame):
            img_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            success, encoded_img = cv2.imencode('.jpg', img_bgr)
            if not success: return frame

            boxes = detect_sensitive_features(encoded_img.tobytes(), config)
            img_redacted_bgr = apply_redaction_numpy(img_bgr, boxes, config)
            return cv2.cvtColor(img_redacted_bgr, cv2.COLOR_BGR2RGB)

        new_clip = clip.fl_image(process_frame)
        new_clip.write_videofile(local_output, codec='libx264', audio=False, verbose=False, logger=None)

        # 4. Upload & Generate URL
        output_uri_path = f"output/{output_filename}"
        blob_out = bucket.blob(output_uri_path)
        blob_out.upload_from_filename(local_output)

        # --- FIX: Generate Public URL ---
        # We attempt to make the file public so the frontend can play it.
        # If the bucket blocks public access, we fall back to the gs:// URI.
        output_url = None
        try:
            # Method A: Make Public (Simplest for demos)
            # blob_out.make_public()
            # output_url = blob_out.public_url

            # Method B: Signed URL (More secure, works if Service Account has permissions)
            output_url = blob_out.generate_signed_url(
                version="v4",
                expiration=datetime.timedelta(minutes=60),
                method="GET"
            )
        except Exception as e:
            print(f"Could not generate signed URL: {e}")
            # Fallback: Construct public link (requires bucket to be open)
            output_url = f"https://storage.googleapis.com/{BUCKET_NAME}/{output_uri_path}"

        output_uri = f"gs://{BUCKET_NAME}/{output_uri_path}"

        # Cleanup
        if os.path.exists(local_input): os.remove(local_input)
        if os.path.exists(local_output): os.remove(local_output)

        # 5. Update Status
        job_ref.update({
            "status": JobStatus.COMPLETED,
            "output_uri": output_uri,
            "output_url": output_url, # New Field
            "completed_at": firestore.SERVER_TIMESTAMP
        })

        return {"status": "COMPLETED", "job_id": payload.job_id}

    except Exception as e:
        traceback.print_exc()
        db.collection("jobs").document(payload.job_id).update({
            "status": JobStatus.FAILED,
            "error_message": str(e)
        })
        return {"error": str(e)}

# ==========================================
# 4. Status Endpoint
# ==========================================
@app.get("/v1/jobs/{job_id}")
def get_job_status(job_id: str):
    job = db.collection("jobs").document(job_id).get()
    if not job.exists:
        raise HTTPException(status_code=404, detail="Job not found")
    return job.to_dict()

Overwriting app/main.py


# 3.0 Infrastructure & Deployment
This section constructs the Docker environment, configures Google Cloud services (Artifact Registry, Tasks, Firestore), and executes the Cloud Run deployment.

In [None]:
%%writefile Dockerfile
# --- Dockerfile ---
# Defines the runtime environment for Cloud Run.

FROM python:3.9-slim

WORKDIR /app

# Install System Dependencies
# 'ffmpeg' is crucial for MoviePy. 'libgl1' is for OpenCV.
RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 ffmpeg && rm -rf /var/lib/apt/lists/*

# Install Python Deps
# FIX: We explicitly pin "moviepy==1.0.3" because 2.0 has breaking changes incompatible with our code.
RUN pip install fastapi uvicorn python-multipart google-cloud-vision google-cloud-storage google-cloud-tasks google-cloud-firestore opencv-python-headless pillow "moviepy==1.0.3" gcsfs

# Copy Code
COPY app /app/app

# Run
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

Overwriting Dockerfile


In [None]:
# --- 3.1 Create Artifact Registry Repository (One-time) ---
!gcloud artifacts repositories create privacyscrub-repo \
    --repository-format=docker \
    --location={REGION} \
    --description="PrivacyScrub V2 Docker Repo" || echo "Repo likely exists, skipping creation."

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists
Repo likely exists, skipping creation.


In [None]:
# --- 3.2 Create Cloud Tasks Queue (One-time) ---
!gcloud tasks queues create privacyscrub-video-queue \
    --location={REGION} || echo "Queue likely exists, skipping."

[1;31mERROR:[0m (gcloud.tasks.queues.create) ALREADY_EXISTS: Queue already exists
Queue likely exists, skipping.


In [None]:
# --- 3.3 Fix: Enable Firestore API & Create Database ---
# Auto-retry loop to handle API propagation delays. This ensures the DB exists for job tracking.
import time
import subprocess

print("Checking Firestore...")
!gcloud services enable firestore.googleapis.com

for i in range(6):
    print(f"Attempt {i+1}: Creating Database if needed...")
    result = subprocess.run(
        f"gcloud firestore databases create --location={REGION} --type=firestore-native --quiet",
        shell=True, capture_output=True, text=True
    )
    if result.returncode == 0 or "already exists" in result.stderr:
        print("? Database Ready.")
        break
    else:
        print("? API initializing... retrying in 10s.")
        time.sleep(10)
else:
    print("?? Could not verify database. Proceeding, but errors may occur if DB is missing.")

Checking Firestore...
Attempt 1: Creating Database if needed...
? Database Ready.


In [None]:
# --- 3.4 Build & Deploy (With Extended Timeout) ---
# This step builds the container in Cloud Build and deploys it to Cloud Run.
# We use a 5-minute timeout to allow Python libraries to load on cold starts.

IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/privacyscrub-repo/{SERVICE_NAME}:latest"

print(f"Building: {IMAGE_URI}")
!gcloud builds submit --tag {IMAGE_URI}

print(f"Deploying to Cloud Run: {SERVICE_NAME}...")
!gcloud run deploy {SERVICE_NAME} \
    --image {IMAGE_URI} \
    --region {REGION} \
    --allow-unauthenticated \
    --set-env-vars=GCP_PROJECT_ID={PROJECT_ID},GCP_REGION={REGION},GCS_BUCKET_NAME={BUCKET_NAME} \
    --memory=2Gi \
    --timeout=300

Building: us-central1-docker.pkg.dev/privacyscrub-backend/privacyscrub-repo/privacyscrub-api:latest
Creating temporary archive of 107 file(s) totalling 55.7 MiB before compression.
Uploading tarball of [.] to [gs://privacyscrub-backend_cloudbuild/source/1763608925.733584-85930225fa2648ff93147b9e26c8cad9.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/privacyscrub-backend/locations/global/builds/ba11af47-cb1c-4ed4-9db8-7e8b9ec39932].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/ba11af47-cb1c-4ed4-9db8-7e8b9ec39932?project=138163390354 ].
Waiting for build to complete. Polling interval: 1 second(s).
 REMOTE BUILD OUTPUT
starting build "ba11af47-cb1c-4ed4-9db8-7e8b9ec39932"

FETCHSOURCE
Fetching storage object: gs://privacyscrub-backend_cloudbuild/source/1763608925.733584-85930225fa2648ff93147b9e26c8cad9.tgz#1763608937755552
Copying gs://privacyscrub-backend_cloudbuild/source/1763608925.733584-85930225fa2648ff93147b9e26c8cad9.tgz#1763608937755552

In [None]:
# --- 3.5 Post-Deploy Configuration ---
# Retrieves the deployed service URL and updates the service env vars so it knows where to send tasks.

import subprocess
try:
    SERVICE_URL = subprocess.check_output(
        f"gcloud run services describe {SERVICE_NAME} --region {REGION} --format 'value(status.url)'",
        shell=True
    ).decode().strip()

    print(f"? Service Deployed at: {SERVICE_URL}")
    print("Updating SERVICE_URL env var for Cloud Tasks routing...")

    !gcloud run services update {SERVICE_NAME} \
        --region {REGION} \
        --set-env-vars=SERVICE_URL={SERVICE_URL},GCP_PROJECT_ID={PROJECT_ID},GCP_REGION={REGION},GCS_BUCKET_NAME={BUCKET_NAME}

except Exception as e:
    print(f"Error retrieving URL: {e}")

? Service Deployed at: https://privacyscrub-api-whbrskh54q-uc.a.run.app
Updating SERVICE_URL env var for Cloud Tasks routing...
Service [[1mprivacyscrub-api[m] revision [[1mprivacyscrub-api-00046-j6p[m] has been deployed and is serving [1m100[m percent of traffic.
Service URL: [1mhttps://privacyscrub-api-138163390354.us-central1.run.app[m


# 4.0 Validation & Metrics (SRS 8.0)
This section fulfills the testing and evaluation requirements. We run a test suite against the live API and calculate Precision/Recall/F1 scores.

In [None]:
# --- 4.1 Helper Functions for Validation ---
# This section defines utility functions to generate synthetic test data (Images and Videos).
# These assets are used to verify the API endpoints without relying on external datasets.

import time
import requests
import io
import cv2
import numpy as np
import sys

# --- Dependency Safety Check ---
# In Colab, updating Pillow often requires a runtime restart.
# We verify the import works before proceeding.
try:
    from PIL import Image, ImageDraw
except ImportError:
    print("🛑 CRITICAL ERROR: Pillow Library Mismatch.")
    print("👉 ACTION REQUIRED: Go to the top menu, click 'Runtime' > 'Restart Session'.")
    print("   Then re-run the notebook starting from Section 1.2 (skip installation if already done).")
    # We raise the error again to stop execution so the user sees the message
    raise

def generate_test_image(text="Secret Data"):
    """
    Generates a synthetic image with a white background and black text.
    Used to test the '/v1/anonymize-image' endpoint.

    Args:
        text (str): The sensitive text to burn into the image.

    Returns:
        io.BytesIO: In-memory JPEG file buffer.
    """
    # Create a simple RGB image (300x100)
    img = Image.new('RGB', (300, 100), color=(255, 255, 255))
    d = ImageDraw.Draw(img)

    # Draw text at specific coordinates
    d.text((10, 10), text, fill=(0, 0, 0))

    # Save to memory buffer as JPEG
    buf = io.BytesIO()
    img.save(buf, format='JPEG')
    buf.seek(0) # Reset pointer to start of file
    return buf

def generate_test_video(filename='test_video.mp4'):
    """
    Generates a synthetic random-noise video for async testing.

    Args:
        filename (str): Output filename.

    Returns:
        str: Path to the generated video file.
    """
    print(f"   🎥 Generating synthetic video: {filename}...")

    # Define codec (mp4v is widely compatible for testing)
    # Note: In production, we might use H.264, but mp4v is safe for OpenCV generation.
    out = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'mp4v'), 10, (100, 100))

    # Generate 30 frames (3 seconds at 10fps)
    for i in range(30):
        # Create random noise frame
        frame = np.random.randint(0, 255, (100, 100, 3), dtype='uint8')
        out.write(frame)

    out.release()
    return filename

In [None]:
# --- 4.2 Test Suite: Image Anonymization & Profiles ---

print(f"Running Tests against {SERVICE_URL}...")

test_cases = [
    {"name": "Default Blur", "params": {}, "expect_type": "image/jpeg"},
    {"name": "Coordinates Only", "params": {"coordinates_only": True}, "expect_type": "application/json"},
    {"name": "Profile: HIPAA (Black Box)", "params": {"profile": "HIPAA_SAFE_HARBOR"}, "expect_type": "image/jpeg"},
    {"name": "Profile: GDPR (Pixelate)", "params": {"profile": "GDPR", "mode": "pixelate"}, "expect_type": "image/jpeg"},
]

for test in test_cases:
    print(f"\n?? Testing: {test['name']}")
    img_buf = generate_test_image()
    files = {'file': ('test.jpg', img_buf, 'image/jpeg')}

    try:
        resp = requests.post(f"{SERVICE_URL}/v1/anonymize-image", files=files, data=test['params'], timeout=30)
        if resp.status_code == 200:
            ct = resp.headers.get("content-type")
            if test['expect_type'] in ct:
                print(f"   ? Passed. Got {ct}")
                if test['expect_type'] == "application/json":
                    print("   Response:", resp.json())
            else:
                print(f"   ? Failed. Expected {test['expect_type']}, got {ct}")
        else:
            print(f"   ? Error {resp.status_code}: {resp.text}")
    except Exception as e:
        print(f"   ? Exception: {e}")

Running Tests against https://privacyscrub-api-whbrskh54q-uc.a.run.app...

?? Testing: Default Blur
   ? Passed. Got image/jpeg

?? Testing: Coordinates Only
   ? Passed. Got application/json
   Response: {'detected_features': [{'type': 'text', 'poly': [[10, 12], [40, 12], [40, 20], [10, 20]]}, {'type': 'text', 'poly': [[42, 12], [61, 12], [61, 20], [42, 20]]}], 'count': 2}

?? Testing: Profile: HIPAA (Black Box)
   ? Passed. Got image/jpeg

?? Testing: Profile: GDPR (Pixelate)
   ? Passed. Got image/jpeg


In [None]:
# --- 4.3 Test Suite: Full Video Lifecycle (Async + Polling) ---

print("\n?? Testing: Full Video Async Pipeline")
vid_file = generate_test_video()

with open(vid_file, 'rb') as f:
    files = {'file': f}
    resp = requests.post(f"{SERVICE_URL}/v1/anonymize-video", files=files, data={'profile': 'CCPA'})

if resp.status_code == 200:
    job_data = resp.json()
    job_id = job_data['job_id']
    print(f"   ? Job Submitted. ID: {job_id}")

    # Poll for completion
    print("   ? Polling for completion (Wait ~30s)...", end="")
    for _ in range(20):
        time.sleep(3)
        status_resp = requests.get(f"{SERVICE_URL}/v1/jobs/{job_id}")
        status_data = status_resp.json()
        status = status_data.get("status")
        print(".", end="")

        if status == "COMPLETED":
            print(f"\n   ? Job Completed! Output URI: {status_data.get('output_uri')}")
            break
        elif status == "FAILED":
            # Report the specific error from the backend
            error_msg = status_data.get("error_message", "Unknown error")
            print(f"\n   ? Job Failed. Reason: {error_msg}")
            break
    else:
        print("\n   ?? Timed out waiting for completion. Check Cloud Logs.")

else:
    print(f"   ? Failed to submit video: {resp.text}")


?? Testing: Full Video Async Pipeline
   🎥 Generating synthetic video: test_video.mp4...
   ? Job Submitted. ID: job_8889d6e2-ee64-489e-b83c-b897db6425b7
   ? Polling for completion (Wait ~30s)......
   ? Job Completed! Output URI: gs://privacyscrub-backend-temp-videos/output/processed_job_8889d6e2-ee64-489e-b83c-b897db6425b7_test_video.mp4


In [None]:
# --- 5.0 Evaluation & Metrics Engine (SRS 8.0) ---
# Calculates Precision, Recall, and F1 Scores on synthetic data.

import numpy as np
from PIL import Image, ImageDraw
import requests
import io
import json

# Helper for IoU
def calculate_iou(boxA, boxB):
    def get_bounds(poly):
        xs = [p[0] for p in poly]; ys = [p[1] for p in poly]
        return min(xs), min(ys), max(xs), max(ys)

    xA_min, yA_min, xA_max, yA_max = get_bounds(boxA)
    xB_min, yB_min, xB_max, yB_max = get_bounds(boxB)
    inter_x_min = max(xA_min, xB_min)
    inter_y_min = max(yA_min, yB_min)
    inter_x_max = min(xA_max, xB_max)
    inter_y_max = min(yA_max, yB_max)
    if inter_x_max < inter_x_min or inter_y_max < inter_y_min: return 0.0
    inter_area = (inter_x_max - inter_x_min) * (inter_y_max - inter_y_min)
    boxA_area = (xA_max - xA_min) * (yA_max - yA_min)
    boxB_area = (xB_max - xB_min) * (yB_max - yB_min)
    union_area = boxA_area + boxB_area - inter_area
    return inter_area / union_area if union_area > 0 else 0

# Generator
def generate_evaluation_sample(sample_id):
    width, height = 800, 600
    img = Image.new('RGB', (width, height), color=(255, 255, 255))
    draw = ImageDraw.Draw(img)
    text_content = f"CONFIDENTIAL_{sample_id}"
    x = np.random.randint(50, width - 200)
    y = np.random.randint(50, height - 50)
    draw.text((x, y), text_content, fill=(0, 0, 0))
    gt_box = [(x, y), (x+150, y), (x+150, y+20), (x, y+20)]
    buf = io.BytesIO()
    img.save(buf, format='JPEG')
    buf.seek(0)
    return buf, [{"type": "text", "poly": gt_box}]

# Runner
def run_evaluation_suite(num_samples=5):
    print(f"?? Starting Evaluation on {num_samples} synthetic samples...")
    metrics = {"tp": 0, "fp": 0, "fn": 0}
    for i in range(num_samples):
        img_buf, ground_truths = generate_evaluation_sample(i)
        files = {'file': ('eval.jpg', img_buf, 'image/jpeg')}
        data = {'coordinates_only': True, 'target_text': True, 'confidence': 0.1}
        try:
            resp = requests.post(f"{SERVICE_URL}/v1/anonymize-image", files=files, data=data)
            if resp.status_code != 200: continue
            predictions = resp.json().get("detected_features", [])
            matched_gt_indices = set()
            for pred in predictions:
                if pred['type'] != 'text': continue
                best_iou = 0; best_gt_idx = -1
                for idx, gt in enumerate(ground_truths):
                    iou = calculate_iou(gt['poly'], pred['poly'])
                    if iou > best_iou: best_iou = iou; best_gt_idx = idx
                if best_iou >= 0.1:
                    if best_gt_idx not in matched_gt_indices: metrics["tp"] += 1; matched_gt_indices.add(best_gt_idx)
                    else: metrics["fp"] += 1
                else: metrics["fp"] += 1
            metrics["fn"] += (len(ground_truths) - len(matched_gt_indices))
        except Exception as e: print(e)
    tp, fp, fn = metrics["tp"], metrics["fp"], metrics["fn"]
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    print(json.dumps({"metrics": {"precision": round(precision, 4), "recall": round(recall, 4), "f1_score": round(f1, 4)}}, indent=2))

run_evaluation_suite()

?? Starting Evaluation on 5 synthetic samples...
{
  "metrics": {
    "precision": 1.0,
    "recall": 1.0,
    "f1_score": 1.0
  }
}


# 6.0 Frontend Deployment (SRS 1.2)
This section automates the deployment of the Streamlit frontend to GitHub.

**Requirement:** You must have a `GITHUB_TOKEN` set in your Colab Secrets.

In [None]:
# --- 6.1 Create Frontend Directory ---
import os
import shutil

FRONTEND_DIR = "frontend_repo"
if os.path.exists(FRONTEND_DIR):
    shutil.rmtree(FRONTEND_DIR)
os.makedirs(FRONTEND_DIR)

print(f"Created '{FRONTEND_DIR}' for git staging.")

Created 'frontend_repo' for git staging.


In [None]:
%%writefile frontend_repo/requirements.txt
streamlit
requests
Pillow

Writing frontend_repo/requirements.txt


In [None]:
%%writefile frontend_repo/streamlit_app.py
import streamlit as st
import requests
import time
import os
from PIL import Image
import io

# --- Streamlit Config ---
st.set_page_config(page_title="PrivacyScrub V2", page_icon="🛡️", layout="wide")

# --- Connection Setup ---
# Checks Streamlit Secrets first, then falls back to env vars
if "SERVICE_URL" in st.secrets:
    API_URL = st.secrets["SERVICE_URL"]
else:
    API_URL = os.environ.get("SERVICE_URL", "http://localhost:8080")

# --- UI Header ---
# FIX: Removed accidental assignment (st.title = ...)
st.title("🛡️ PrivacyScrub V2")
st.markdown(f"**Backend Status:** Connecting to `{API_URL}`...")

# --- Sidebar Inputs ---
st.sidebar.header("Compliance Profile")
profile = st.sidebar.selectbox(
    "Select Profile",
    ["NONE", "GDPR", "CCPA", "HIPAA_SAFE_HARBOR"],
    index=0,
    help="Presets for detection targets and strictness based on legal frameworks."
)

st.sidebar.header("Anonymization Mode")
mode = st.sidebar.radio("Mode", ["blur", "pixelate", "black_box"])

st.sidebar.header("Manual Overrides")
target_faces = st.sidebar.checkbox("Faces", True)
target_plates = st.sidebar.checkbox("License Plates", True)
target_text = st.sidebar.checkbox("Text", True)
target_logos = st.sidebar.checkbox("Logos", True)

# --- Main Layout ---
tab1, tab2 = st.tabs(["🖼️ Anonymize Image", "🎥 Anonymize Video"])

# [Tab 1] Sync Image Endpoint
with tab1:
    st.header("Image Redaction")
    uploaded_file = st.file_uploader("Upload an image", type=["jpg", "png", "jpeg"])

    if uploaded_file:
        col1, col2 = st.columns(2)
        with col1:
            st.image(uploaded_file, caption="Original", use_column_width=True)

        if st.button("Anonymize Image"):
            with st.spinner("Processing..."):
                try:
                    files = {"file": uploaded_file.getvalue()}
                    data = {
                        "profile": profile,
                        "mode": mode,
                        "target_faces": target_faces,
                        "target_plates": target_plates,
                        "target_text": target_text,
                        "target_logos": target_logos
                    }
                    resp = requests.post(f"{API_URL}/v1/anonymize-image", files=files, data=data)

                    if resp.status_code == 200:
                        with col2:
                            st.image(resp.content, caption="Anonymized Result", use_column_width=True)
                            st.success("Processing Complete!")
                    else:
                        st.error(f"Error {resp.status_code}: {resp.text}")
                except Exception as e:
                    st.error(f"Connection Error: {e}")

# [Tab 2] Async Video Endpoint
with tab2:
    st.header("Video Redaction (Async Pipeline)")
    video_file = st.file_uploader("Upload a video", type=["mp4", "mov", "avi"])

    if video_file:
        st.video(video_file)

        if st.button("Start Video Job"):
            status_container = st.empty()
            progress_bar = st.progress(0)

            try:
                # 1. Submit Job
                status_container.info("Uploading video and dispatching Cloud Task...")
                files = {"file": video_file.getvalue()}
                data = {"profile": profile}

                resp = requests.post(f"{API_URL}/v1/anonymize-video", files=files, data=data)

                if resp.status_code == 200:
                    job_data = resp.json()
                    job_id = job_data["job_id"]
                    status_container.success(f"Job Submitted! ID: `{job_id}`. Polling for status...")

                    # 2. Polling Loop
                    for i in range(100):
                        time.sleep(2)
                        status_resp = requests.get(f"{API_URL}/v1/jobs/{job_id}")
                        if status_resp.status_code == 200:
                            state = status_resp.json()
                            status = state.get("status")
                            if status == "COMPLETED":
                                progress_bar.progress(100)
                                status_container.success("Processing Complete!")
                                output_uri = state.get("output_uri")
                                st.markdown(f"**Output URI:** `{output_uri}`")
                                st.info("Video uploaded to GCS. Implement signed URLs for playback.")
                                break
                            elif status == "FAILED":
                                err = state.get("error_message", "Unknown")
                                status_container.error(f"Job Failed: {err}")
                                break
                            else:
                                # Queued or Processing
                                progress_bar.progress(min(90, 10 + i))
                        else:
                            st.warning("Polling transient error...")
                else:
                    st.error(f"Submission failed: {resp.text}")
            except Exception as e:
                st.error(f"Error: {e}")

Writing frontend_repo/streamlit_app.py


In [None]:
%%writefile frontend_repo/README.md
# PrivacyScrub V2 - Frontend

This is the Streamlit frontend for the PrivacyScrub V2 API. It allows users to interactively anonymize images and upload videos for asynchronous processing.

## Setup

1. **Backend:** Ensure the backend service is deployed on Google Cloud Run.
2. **Deploy:** Connect this repository to [Streamlit Cloud](https://share.streamlit.io/).
3. **Configuration:** In the Streamlit Cloud Dashboard, go to **App Settings -> Secrets** and add your backend URL:

```toml
SERVICE_URL = "[https://your-cloud-run-service-url.a.run.app](https://your-cloud-run-service-url.a.run.app)"
```

Writing frontend_repo/README.md


In [None]:
# --- 6.2 Push to GitHub (Requires 'GITHUB_TOKEN' in Secrets) ---
from google.colab import userdata

REPO_NAME = "privacyscrub-frontend"
USERNAME = "BURNSGREGM"

try:
    # 1. Fetch Token from Secrets
    token = userdata.get('GITHUB_TOKEN')

    # 2. Configure Git User
    !git config --global user.email "colab@privacyscrub.ai"
    !git config --global user.name "PrivacyScrub Bot"

    # 3. Initialize and Commit
    %cd frontend_repo
    !git init
    !git add .
    !git commit -m "Auto-deploy from Colab"
    !git branch -M main

    # 4. Push
    print("Pushing to GitHub...")
    repo_url = f"https://{token}@github.com/{USERNAME}/{REPO_NAME}.git"
    !git push -u {repo_url} main --force

    print(f"\n? Successfully pushed to https://github.com/{USERNAME}/{REPO_NAME}")
    %cd ..

except Exception as e:
    print(f"\n? Deployment Failed: {e}")
    print("Ensure you have added 'GITHUB_TOKEN' to Colab Secrets and the repo exists.")
    %cd ..

/content/frontend_repo
[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint: [m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint: [m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint: [m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /content/frontend_repo/.git/
[master (root-commit) 433c8a6] Auto-deploy from Colab
 3 files changed, 144 insertions(+)
 create mode 100644 README.md
 create mode 100644 requirements.txt
 create mode 100644 streamlit_app.py
Pushing to GitHub...
Enumerating objects: 5, done.
Counting objects: 100% (5/5), done.
Delta compression using up to 2 threads
Compressing objects: 100% (4/4), done.
Writing objects: 100% (5/5), 2.39 KiB | 2.39 MiB/s, done.
Total 5 (