# Advanced Usage Patterns

Production-ready patterns and best practices for py-gdelt.

## Contents
1. Configuration Management
2. Error Handling and Retry Strategies
3. Deduplication Strategies
4. Memory-Efficient Streaming
5. Combining Multiple Data Sources

In [None]:
# Setup
import nest_asyncio


nest_asyncio.apply()

import logging
from datetime import date, timedelta

from py_gdelt import GDELTClient, GDELTSettings
from py_gdelt.filters import DateRange, EventFilter


logging.basicConfig(level=logging.INFO)

## 1. Configuration Management

### 1.1 Programmatic Configuration

In [None]:
# Custom settings for production
settings = GDELTSettings(
    timeout=60,  # Longer timeout for slow connections
    max_retries=5,  # More retries for reliability
    max_concurrent_downloads=10,  # Higher concurrency
    cache_ttl=7200,  # 2 hour cache
    validate_codes=True,  # Enable validation
)

async with GDELTClient(settings=settings) as client:
    print(f"Timeout: {client.settings.timeout}s")
    print(f"Max retries: {client.settings.max_retries}")
    print(f"Cache TTL: {client.settings.cache_ttl}s")

### 1.2 Environment Variables

```bash
export GDELT_TIMEOUT=60
export GDELT_MAX_RETRIES=5
export GDELT_CACHE_DIR=/path/to/cache
```

## 2. Error Handling and Retry Strategies

In [None]:
from py_gdelt.exceptions import APIError, DataError


async with GDELTClient() as client:
    try:
        # Query with error handling
        from py_gdelt.filters import DocFilter

        doc_filter = DocFilter(
            query="technology",
            timespan="24h",
            max_results=10,
        )

        articles = await client.doc.query(doc_filter)
        print(f"Success: Found {len(articles)} articles")

    except APIError as e:
        print(f"API Error: {e}")
        # Handle API-specific errors (rate limiting, etc.)

    except DataError as e:
        print(f"Data Error: {e}")
        # Handle data parsing errors

    except Exception as e:
        print(f"Unexpected error: {e}")
        # Handle any other errors

## 3. Deduplication Strategies

GDELT data often contains duplicates. The library provides multiple strategies:

In [None]:
async with GDELTClient() as client:
    yesterday = date.today() - timedelta(days=2)

    event_filter = EventFilter(
        date_range=DateRange(start=yesterday, end=yesterday),
    )

    try:
        # Without deduplication
        result_raw = await client.events.query(event_filter)
        print(f"Without dedup: {len(result_raw)} events")

        # With URL-based deduplication
        # result_dedup = await client.events.query(
        #     event_filter,
        #     deduplicate=True,
        #     dedupe_strategy=DedupeStrategy.URL_ONLY,
        # )
        # print(f"With URL dedup: {len(result_dedup)} events")

    except Exception as e:
        print(f"Error: {e}")

### Available Deduplication Strategies:

- `URL_ONLY` - Deduplicate by source URL only
- `URL_DATE` - Deduplicate by URL and date
- `URL_DATE_LOCATION` - Deduplicate by URL, date, and location
- `ACTOR_PAIR` - Deduplicate by actor pair
- `FULL` - Deduplicate by all available fields

## 4. Memory-Efficient Streaming

For large datasets, always prefer streaming over loading all data:

In [None]:
async with GDELTClient() as client:
    yesterday = date.today() - timedelta(days=2)

    event_filter = EventFilter(
        date_range=DateRange(start=yesterday, end=yesterday),
    )

    try:
        # Process events one at a time
        event_count = 0
        us_events = 0

        async for event in client.events.stream(event_filter):
            event_count += 1

            # Filter and process in-stream
            if hasattr(event, "actor1") and event.actor1:
                if hasattr(event.actor1, "country_code") and event.actor1.country_code == "US":
                    us_events += 1

            # Early exit for demo
            if event_count >= 100:
                break

        print(f"Processed {event_count} events, found {us_events} US events")

    except Exception as e:
        print(f"Error: {e}")

## 5. Combining Multiple Data Sources

Combine Events with GKG for enriched analysis:

In [None]:
from py_gdelt.filters import GKGFilter


async with GDELTClient() as client:
    yesterday = date.today() - timedelta(days=2)

    try:
        # Get events
        event_filter = EventFilter(
            date_range=DateRange(start=yesterday, end=yesterday),
            actor1_country="USA",
        )

        # Get GKG records for same period
        gkg_filter = GKGFilter(
            date_range=DateRange(start=yesterday, end=yesterday),
            themes=["ECON_STOCKMARKET"],
        )

        # Note: In production, you'd correlate these by URL or time
        print("Querying events...")
        events_count = 0
        async for event in client.events.stream(event_filter):
            events_count += 1
            if events_count >= 10:
                break

        print(f"Events sample: {events_count}")

        print("\nQuerying GKG...")
        gkg_count = 0
        async for record in client.gkg.stream(gkg_filter):
            gkg_count += 1
            if gkg_count >= 10:
                break

        print(f"GKG sample: {gkg_count}")

    except Exception as e:
        print(f"Error: {e}")

## Summary

You've learned:
- ✅ How to configure the client for production use
- ✅ Proper error handling strategies
- ✅ Deduplication techniques for cleaner data
- ✅ Memory-efficient streaming patterns
- ✅ Combining multiple GDELT data sources

**Best Practices:**
- Always use streaming for large datasets
- Handle errors gracefully with try/except
- Choose appropriate deduplication strategy for your use case
- Configure timeouts and retries based on your network
- Use caching to reduce API calls