# REST API Publishing Examples Validation (Python)

This notebook validates Python code examples from the Fiddler REST API publishing guides:
- [Publishing via REST API](../python-client-guides/publishing-production-data/publishing-via-rest-api.md) (Quick-start guide)
- [Advanced REST API Publishing](../python-client-guides/publishing-production-data/publishing-via-rest-api-advanced.md) (Advanced guide)

## Prerequisites

### Environment Variables
Set these environment variables before running:
```bash
export FIDDLER_API_KEY="your-api-key"
export FIDDLER_ENDPOINT="https://your-instance.fiddler.ai"
export MODEL_ID="your-model-uuid"
```

### Python Dependencies
```bash
pip install requests ipykernel
```

## Notebook Structure

1. **Setup** - Environment variables and test data creation
2. **Quick-Start Examples** - Python examples from quick-start guide
3. **Advanced Examples** - Production-ready Python publisher with retry logic
4. **Cleanup** - Remove temporary files

## Note on TypeScript Examples

For TypeScript examples, see the dedicated [TypeScript REST API Validation Notebook](validate-typescript-rest-api-example.nnb).

## Note on curl Examples

The markdown documentation includes curl examples for language-agnostic reference. This notebook uses Python `requests` for clarity and simplicity.

---
## Section 1: Setup

Initialize environment variables and create test data.

In [None]:
import json
import os
from datetime import datetime, timezone
from typing import Dict, List

import requests

# Load environment variables
api_key = os.environ.get("FIDDLER_API_KEY")
fiddler_endpoint = os.environ.get("FIDDLER_ENDPOINT")
model_id = os.environ.get("MODEL_ID")

# Validate environment
if not all([api_key, fiddler_endpoint, model_id]):
    raise ValueError(
        "Missing required environment variables. Set: "
        "FIDDLER_API_KEY, FIDDLER_ENDPOINT, MODEL_ID"
    )

print("✓ Environment configured")
print(f"  Endpoint: {fiddler_endpoint}")
print(f"  Model ID: {model_id}")

# Create test CSV file for batch publishing with bank churn schema
csv_content = """customer_id,creditscore,geography,gender,age,tenure,balance,numofproducts,hascrcard,isactivemember,estimatedsalary,predicted_churn,churn,timestamp
27c349a2,559,California,Male,52,2,0.0,1,1,0,129013.59,0.007447,no,2025-12-28T12:00:00Z
27c35cee,482,California,Male,55,5,97318.25,1,0,1,78416.14,0.804852,yes,2025-12-27T12:01:00Z
27c364f0,651,Florida,Female,46,4,89743.05,1,1,0,156425.57,0.012754,no,2025-12-29T12:02:00Z
"""

with open("churn_events.csv", "w") as f:
    f.write(csv_content)

print("✓ Created test CSV file: churn_events.csv")

---
## Section 2: Quick-Start Examples

Examples from [publishing-via-rest-api.md](../python-client-guides/publishing-production-data/publishing-via-rest-api.md)

### 2.1 Batch Upload - Python Example

Upload CSV file and save file_id for batch publishing.

In [None]:
# Upload file
with open("churn_events.csv", "rb") as f:
    response = requests.post(
        f"{fiddler_endpoint}/v3/files/upload",
        headers={"Authorization": f"Bearer {api_key}"},
        files={"file": f}
    )

file_id = response.json()["data"]["id"]
print(f"✓ File uploaded. File ID: {file_id}")

# Save for next step
with open("file_id.txt", "w") as f:
    f.write(file_id)

### 2.2 Batch Publish - Python Example

Publish events from the uploaded file.

In [None]:
# Read file_id from previous upload
with open("file_id.txt", "r") as f:
    file_id = f.read().strip()

payload = {
    "model_id": model_id,
    "env_type": "PRODUCTION",
    "source": {
        "type": "FILE",
        "file_id": file_id
    }
}

response = requests.post(
    f"{fiddler_endpoint}/v3/events",
    headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
    data=json.dumps(payload)
)

job_id = response.json()["data"]["job"]["id"]
print(f"✓ Batch publish started. Job ID: {job_id}")

# Save for reference
with open("job_id.txt", "w") as f:
    f.write(job_id)

### 2.3 Streaming - Python Example

Stream individual events directly to Fiddler.

In [None]:
events = [
    {
        "customer_id": "27c349a2",
        "creditscore": 559,
        "geography": "California",
        "gender": "Male",
        "age": 52,
        "tenure": 2,
        "balance": 0.0,
        "numofproducts": 1,
        "hascrcard": 1,
        "isactivemember": 0,
        "estimatedsalary": 129013.59,
        "predicted_churn": 0.007447,
        "churn": "no",
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
]

payload = {
    "model_id": model_id,
    "env_type": "PRODUCTION",
    "source": {
        "type": "EVENTS",
        "events": events
    }
}

response = requests.post(
    f"{fiddler_endpoint}/v3/events",
    headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
    data=json.dumps(payload)
)

event_ids = response.json()["data"]["event_ids"]
print(f"✓ Streaming publish completed. Published {len(event_ids)} event(s)")

# Save first event_id for updates
with open("event_id.txt", "w") as f:
    f.write(event_ids[0])

### 2.4 Updates - Python Example

Update existing events using PATCH.

**IMPORTANT:** Only PRODUCTION events can be updated. Non-production events (PRE_PRODUCTION) are immutable and cannot be modified via batch OR streaming processes.

In [None]:
updates = [
    {
        "customer_id": "27c349a2",
        "churn": "yes"
    }
]

payload = {
    "model_id": model_id,
    "env_type": "PRODUCTION",
    "source": {
        "type": "EVENTS",
        "events": updates
    }
}

response = requests.patch(
    f"{fiddler_endpoint}/v3/events",
    headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
    data=json.dumps(payload)
)

print(f"✓ Update completed. Response status: {response.status_code}")

---
## Section 3: Advanced Examples

Production-ready Python publisher from [publishing-via-rest-api-advanced.md](../python-client-guides/publishing-production-data/publishing-via-rest-api-advanced.md)

### 3.1 Python ProductionStreamingPublisher

Production-grade publisher with exponential backoff retry logic.

In [None]:
from typing import Optional


class ProductionStreamingPublisher:
    def __init__(
        self,
        api_key: str,
        endpoint: str,
        max_retries: int = 3,
        initial_delay: float = 1.0,
        backoff_multiplier: float = 2.0
    ):
        self.api_key = api_key
        self.endpoint = endpoint
        self.max_retries = max_retries
        self.initial_delay = initial_delay
        self.backoff_multiplier = backoff_multiplier

    def _get_headers(self, is_retry: bool = False) -> Dict[str, str]:
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }
        if is_retry:
            headers["X-Fiddler-Client-Retry"] = "true"
        return headers

    def _calculate_backoff(self, attempt: int) -> float:
        return self.initial_delay * (self.backoff_multiplier ** attempt)

    def _is_retryable(self, status_code: int) -> bool:
        return status_code in [429, 500, 502, 503, 504]

    def publish_events_with_retry(
        self,
        model_id: str,
        events: List[Dict]
    ) -> Optional[List[str]]:
        """Publish events with exponential backoff retry logic."""

        if not events:
            raise ValueError("Events list cannot be empty")

        if len(events) > 1000:
            raise ValueError(f"Batch size {len(events)} exceeds maximum 1000")

        payload = {
            "model_id": model_id,
            "env_type": "PRODUCTION",
            "source": {
                "type": "EVENTS",
                "events": events
            }
        }

        last_error = None

        for attempt in range(self.max_retries + 1):
            try:
                # Add delay and retry header for retries
                if attempt > 0:
                    delay = self._calculate_backoff(attempt - 1)
                    print(f"Retry attempt {attempt}/{self.max_retries} after {delay:.1f}s")
                    time.sleep(delay)

                headers = self._get_headers(is_retry=attempt > 0)

                response = requests.post(
                    f"{self.endpoint}/v3/events",
                    headers=headers,
                    data=json.dumps(payload),
                    timeout=30
                )

                # Streaming returns 202 Accepted (both batch and streaming use 202)
                if response.status_code == 200 or response.status_code == 202:
                    result = response.json()
                    if attempt > 0:
                        print(f"✓ Succeeded on retry attempt {attempt}")
                    return result["data"].get("event_ids", [])

                # Check if retryable
                if not self._is_retryable(response.status_code):
                    error_body = response.json()
                    raise Exception(
                        f"Non-retryable error ({response.status_code}): "
                        f"{error_body.get('message', 'Unknown error')}"
                    )

                error_body = response.json()
                last_error = Exception(
                    f"HTTP {response.status_code}: "
                    f"{error_body.get('message', 'Unknown error')}"
                )

            except requests.exceptions.RequestException as e:
                last_error = e
                # Network errors are retryable
                if attempt == self.max_retries:
                    break

        raise Exception(
            f"Max retries ({self.max_retries}) exceeded. "
            f"Last error: {str(last_error)}"
        )

    def publish_batch(
        self,
        model_id: str,
        events: List[Dict],
        batch_size: int = 1000
    ) -> List[str]:
        """Publish large event lists in batches with retry."""
        all_event_ids = []

        for i in range(0, len(events), batch_size):
            batch = events[i:i + batch_size]
            print(f"Publishing batch {i // batch_size + 1} ({len(batch)} events)")

            event_ids = self.publish_events_with_retry(model_id, batch)
            all_event_ids.extend(event_ids)

            print(f"✓ Published {len(event_ids)} events")

        return all_event_ids

print("✓ ProductionStreamingPublisher class defined")

# Test the class with sample events
publisher = ProductionStreamingPublisher(
    api_key=api_key,
    endpoint=fiddler_endpoint,
    max_retries=2,
    initial_delay=1.0
)

# Create test events with bank churn schema
test_events = [
    {
        "customer_id": "27c349a2",
        "creditscore": 559,
        "geography": "California",
        "gender": "Male",
        "age": 52,
        "tenure": 2,
        "balance": 0.0,
        "numofproducts": 1,
        "hascrcard": 1,
        "isactivemember": 0,
        "estimatedsalary": 129013.59,
        "predicted_churn": 0.007447,
        "churn": "no",
        "timestamp": datetime.now(timezone.utc).isoformat()
    },
    {
        "customer_id": "27c35cee",
        "creditscore": 482,
        "geography": "California",
        "gender": "Male",
        "age": 55,
        "tenure": 5,
        "balance": 97318.25,
        "numofproducts": 1,
        "hascrcard": 0,
        "isactivemember": 1,
        "estimatedsalary": 78416.14,
        "predicted_churn": 0.804852,
        "churn": "yes",
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
]

try:
    event_ids = publisher.publish_events_with_retry(
        model_id=model_id,
        events=test_events
    )
    print(f"✓ Successfully published {len(event_ids)} events with ProductionStreamingPublisher")
except Exception as e:
    print(f"Publishing test: {e}")

---
## Section 4: Cleanup

Remove temporary files created during validation.

In [None]:
import os

temp_files = [
    "churn_events.csv",
    "file_id.txt",
    "job_id.txt",
    "event_id.txt"
]

for file in temp_files:
    if os.path.exists(file):
        os.remove(file)
        print(f"✓ Removed {file}")

print("\n=== Validation Complete ===")
print("All Python code examples from REST API publishing guides have been tested.")