# GeoZarr Pipeline Operator - Setup

## Prerequisites

Before running this notebook, ensure you have:

1. **Python 3.11+** with the data-pipeline environment
2. **Jupyter dependencies** installed
3. **Kubernetes access** configured
4. **RabbitMQ credentials** in `.env` file

## üöÄ Quick Setup

Run this **once** in your terminal before opening the notebook:

```bash
# From the repository root
cd /path/to/data-pipeline

# Install all dependencies including notebook support
uv sync --all-extras

# Or if using pip:
pip install -e ".[notebooks]"

# Create .env file with RabbitMQ password
cp notebooks/.env.example notebooks/.env
# Edit notebooks/.env and add:
# AMQP_PASSWORD=your_password_here

# Get password from Kubernetes
kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d
```

## ‚ö†Ô∏è Common Issues

**"requires the ipykernel package"**
```bash
# Install notebook dependencies
uv sync --extra notebooks
# or
pip install ipykernel ipywidgets ipyleaflet pystac-client python-dotenv
```

**"ModuleNotFoundError: No module named 'operator_utils'"**
```bash
# Ensure you're running from notebooks/ directory or repository root
cd /path/to/data-pipeline
jupyter lab notebooks/operator.ipynb
```

**RabbitMQ connection errors**
- Check `.env` file has correct `AMQP_PASSWORD`
- Verify kubeconfig: `kubectl get pods -n core -l app.kubernetes.io/name=rabbitmq`

---

Once setup is complete, proceed to the next cell to start the pipeline!

# GeoZarr Pipeline Operator

**Trigger and monitor GeoZarr conversion workflows on Kubernetes.**

## Prerequisites

- Kubernetes access (`kubectl` configured)
- RabbitMQ password in `.env` file

**Setup `.env`:**
```bash
cp .env.example .env
# Get password: kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d
# Add to .env: AMQP_PASSWORD=your_password_here
```

## Quick Start (3 steps)

1. **Setup** ‚Üí Load config, start port-forward
2. **Publish & Monitor** ‚Üí Send payload, track workflow
3. **Validate** ‚Üí Check STAC catalog

**Optional:** Run **Interactive Search** before step 2 to pick a different scene.

## How It Works

```
This Notebook ‚Üí pika (AMQP) ‚Üí RabbitMQ ‚Üí EventSource ‚Üí Argo Workflow ‚Üí Convert ‚Üí Register ‚Üí STAC
```

**AMQP Flow:**
- `publish_amqp_message()` uses `pika` library to connect to RabbitMQ (via port-forward)
- Publishes JSON payload to `geozarr` exchange with routing key `eopf.items.convert`
- EventSource (K8s) watches this queue and triggers Argo Workflow
- Workflow converts Zarr ‚Üí GeoZarr, registers STAC item

üìö **Docs:** [README.md](../README.md) | [CONTRIBUTING.md](../CONTRIBUTING.md)

In [None]:
# Setup
import json
from pathlib import Path

from operator_utils import Config, start_port_forward

print("üîß Loading configuration...")
config = Config()
if not config.verify():
    raise RuntimeError("‚ùå Config validation failed - check .env file")

print("\nüîå Starting RabbitMQ port-forward...")
pf_process = start_port_forward(config)

payload_file = Path("../workflows/payload.json")
with open(payload_file) as f:
    payload = json.load(f)

print(f"\n‚úÖ Ready! Using payload: {payload.get('source_url', 'N/A')[:60]}...")

üîç Searching for kubectl...
   ‚úÖ Found: /opt/homebrew/bin/kubectl

üîß Configuration:
  kubectl: /opt/homebrew/bin/kubectl
  Kubeconfig: /Users/w/Documents/Github/data-pipeline/.work/kubeconfig
  Workflow Namespace: devseed
  RabbitMQ Namespace: core
  RabbitMQ Service: rabbitmq
  AMQP User: user
  AMQP Password: ***
  STAC API: https://api.explorer.eopf.copernicus.eu/stac
  Raster API: https://api.explorer.eopf.copernicus.eu/raster

‚úÖ Kubeconfig exists
‚úÖ pika library available

üê∞ Checking RabbitMQ service in core...
   ‚úÖ Found: /opt/homebrew/bin/kubectl

üîß Configuration:
  kubectl: /opt/homebrew/bin/kubectl
  Kubeconfig: /Users/w/Documents/Github/data-pipeline/.work/kubeconfig
  Workflow Namespace: devseed
  RabbitMQ Namespace: core
  RabbitMQ Service: rabbitmq
  AMQP User: user
  AMQP Password: ***
  STAC API: https://api.explorer.eopf.copernicus.eu/stac
  Raster API: https://api.explorer.eopf.copernicus.eu/raster

‚úÖ Kubeconfig exists
‚úÖ pika library available

ü

In [None]:
# Interactive Search (Optional - skip to Publish & Monitor for default)
from operator_utils import create_search_ui

print("üó∫Ô∏è  Opening interactive map search...")
print("   Select a scene, click 'Update Payload', then re-run Setup")

try:
    create_search_ui(payload_file)
except ImportError:
    print("‚ö†Ô∏è  Missing dependencies: uv pip install ipywidgets ipyleaflet pystac-client")
    raise

VBox(children=(HTML(value='<h4>üìç Draw bbox or enter coordinates</h4>'), Map(center=[48.0, 10.0], controls=(Zoo‚Ä¶

In [None]:
# Publish & Monitor
import time

from IPython.display import HTML, display
from operator_utils import get_latest_workflow, monitor_workflow, publish_amqp_message

# Step 1: Publish payload via AMQP (pika ‚Üí RabbitMQ ‚Üí EventSource)
print("üì§ Publishing payload via AMQP...")
print(f"   Source: {payload.get('source_url', 'N/A')[:60]}...")
print(f"   Target: localhost:{config.amqp_local_port} ‚Üí RabbitMQ ‚Üí geozarr exchange")

item_id = publish_amqp_message(config, payload)
if not item_id:
    raise RuntimeError("‚ùå Publish failed - check RabbitMQ connection")

# Step 2: Wait for Argo Workflow to be triggered
print("\n‚è±Ô∏è  Waiting for EventSource to trigger workflow (30s timeout)...")
workflow_name = None
for attempt in range(6):
    time.sleep(5)
    workflow_name = get_latest_workflow(config, min_age_seconds=60)
    if workflow_name:
        print(f"‚úÖ Workflow created: {workflow_name}")
        break
    print(f"   Attempt {attempt + 1}/6...")

if not workflow_name:
    print("\nüí° Debug: kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=20")
    raise RuntimeError("‚ùå No workflow created - check EventSource logs")

# Step 3: Monitor workflow progress
argo_ui = (
    f"https://argo-workflows.hub-eopf-explorer.eox.at/workflows/{config.namespace}/{workflow_name}"
)
display(HTML(f'<h3>üîó <a href="{argo_ui}" target="_blank">View Workflow in Argo UI</a></h3>'))

print("\nüìä Monitoring workflow progress...")
success = monitor_workflow(config, workflow_name, timeout_minutes=10)

if success:
    print("\n‚úÖ Workflow completed! Ready to validate STAC item.")
else:
    print(
        f"\n‚ùå Workflow incomplete - check Argo UI or: kubectl get wf {workflow_name} -n {config.namespace}"
    )

üì§ Publishing: https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/it...
üí° Auto-derived item_id: S2C_MSIL2A_20251006T100041_N0511_R122_T33TTG_20251006T152515
üìù Payload:
{
  "source_url": "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2C_MSIL2A_20251006T100041_N0511_R122_T33TTG_20251006T152515",
  "collection": "sentinel-2-l2a-dp-test",
  "groups": [
    "/measurements/reflectance/r10m",
    "/measurements/reflectance/r20m",
    "/measurements/reflectance/r60m"
  ],
  "spatial_chunk": 4096,
  "tile_width": 256,
  "crs_groups": [
    "/conditions/geometry"
  ],
  "item_id": "S2C_MSIL2A_20251006T100041_N0511_R122_T33TTG_20251006T152515"
}

üöÄ Publishing to RabbitMQ...
‚úÖ Payload published successfully!
   Exchange: geozarr
   Routing key: eopf.items.convert
   Output item ID: S2C_MSIL2A_20251006T100041_N0511_R122_T33TTG_20251006T152515
   Collection: sentinel-2-l2a-dp-test
‚úÖ Published ‚Üí S2C_MSIL2A_20251006T100041_N0511_R122_T33TTG_20251006T152515
‚è

In [None]:
# Validate
from operator_utils import validate_stac_item

print("üîç Validating STAC item in catalog...")
print(f"   Item ID: {item_id}")
print(f"   Collection: {payload['collection']}")

validate_stac_item(config, item_id, payload["collection"])
print("\n‚úÖ Validation complete! Check map visualization above.")

## ‚úÖ Workflow Complete

Your GeoZarr data is now in the STAC catalog with visualized preview above!

### Next Steps

- [STAC Browser](https://api.explorer.eopf.copernicus.eu/browser) - Browse catalog
- [Argo Workflows](https://argo-workflows.hub-eopf-explorer.eox.at/workflows/devseed) - View all workflows
- [STAC API](https://api.explorer.eopf.copernicus.eu/stac) - Query API

### Troubleshooting

**No workflow created?**
```bash
kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=50
```

**Workflow failed?**
```bash
kubectl get wf -n devseed --sort-by=.metadata.creationTimestamp | tail -5
kubectl describe wf <workflow-name> -n devseed
```

**RabbitMQ connection issues?**
```bash
# Check port-forward
ps aux | grep "kubectl port-forward"
# Restart port-forward (re-run Setup above)
```