# NER Entity Extraction from Parliamentary Speeches

This notebook extracts Named Entity Recognition (NER) entities from Turkish parliamentary speeches and stores them in Elasticsearch.

## ‚ö° PERFORMANCE OPTIMIZATIONS:
- **Batch Processing**: Processes multiple texts simultaneously (64-128 texts per batch)
- **GPU Acceleration**: Automatically uses GPU if available (CUDA)
- **Large ES Batches**: Bulk updates 500-1000 documents at once
- **Memory Efficient**: Processes in chunks to handle large datasets
- **Parallel Processing**: Ready for Wikipedia linking parallelization

## Workflow:
1. Connect to Elasticsearch (local or GCP VM)
2. Load TerminatorPower/nerT Turkish NER model (with GPU support)
3. Process all speeches in batches and extract entities (PERSON, LOCATION, ORGANIZATION)
4. Optionally link entities to Wikipedia via Wikidata API
5. Update Elasticsearch documents with `ner_entities` field using bulk operations

## Features:
- **NER Model**: TerminatorPower/nerT (Turkish language model)
- **Entity Types**: PERSON, LOCATION, ORGANIZATION
- **Wikipedia Linking**: Optional Wikidata API integration
- **Caching**: Wikipedia lookups are cached to avoid redundant API calls
- **Batch Processing**: Processes 64+ texts simultaneously for 10-50x speedup
- **GPU Support**: Automatic GPU detection and usage for faster inference


## 1. Installation & Setup

Install required packages (run this first):


In [28]:
# Install required packages
%pip install -q transformers "elasticsearch==8.6.2" requests tqdm torch


## 2. Configuration

Set your Elasticsearch connection details. For local development, use `localhost:9200`. For GCP VM, use the VM's IP address.


In [29]:
import os
import sys

# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(''))))

# Configuration
# For local: use "http://localhost:9200"
# For GCP VM: use "http://VM_IP:9200"
ELASTICSEARCH_HOST = os.getenv("ELASTICSEARCH_HOST", "https://cab-teach-src-oven.trycloudflare.com")
ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX", "parliament_speeches")

print(f"üì° Elasticsearch Host: {ELASTICSEARCH_HOST}")
print(f"üìä Index Name: {ELASTICSEARCH_INDEX}")


üì° Elasticsearch Host: https://cab-teach-src-oven.trycloudflare.com
üìä Index Name: parliament_speeches


## 3. Import Libraries


In [30]:
import collections
import time
import warnings
from typing import List, Dict, Any, Optional
from transformers import pipeline
import requests
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import torch
import numpy as np

# Suppress NumPy 2.0 deprecation warnings from transformers library
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*np.float_.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*np.int_.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*np.complex_.*")

# Wikidata API headers
HEADERS = {
    # Optional: Add User-Agent header
     "User-Agent": "Turkish-NEL-Research/1.0"
}

# Check for GPU availability
device = 0 if torch.cuda.is_available() else -1
print(f"‚úÖ All libraries imported successfully!")
print(f"üîß Device: {'GPU (CUDA)' if device == 0 else 'CPU'}")


‚úÖ All libraries imported successfully!
üîß Device: GPU (CUDA)


## 4. Helper Functions

Define functions for Wikidata search, entity extraction, and processing.


In [31]:
def wikidata_search(entity: str, lang: str = "tr", limit: int = 1, sleep: float = 2.0, max_retries: int = 3) -> List[Dict]:
    """Search for entity in Wikidata with rate limiting and retries."""
    url = "https://www.wikidata.org/w/api.php"
    params = {
        "action": "wbsearchentities",
        "search": entity,
        "language": lang,
        "format": "json",
        "limit": limit
    }

    for attempt in range(max_retries):
        try:
            r = requests.get(url, params=params, headers=HEADERS, timeout=10)

            # Handle rate limiting
            if r.status_code == 429:
                wait_time = (2 ** attempt) * 5  # Exponential backoff: 5s, 10s, 20s
                if attempt < max_retries - 1:
                    print(f"[Wikidata 429] Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{max_retries}...")
                    time.sleep(wait_time)
                    continue
                else:
                    print(f"[Wikidata 429] Entity: {entity} | Max retries reached. Skipping.")
                    return []

            r.raise_for_status()
            data = r.json()
            time.sleep(sleep)  # Standard delay between requests
            return data.get("search", [])

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 403:
                print(f"[Wikidata 403] Entity: {entity} | Add User-Agent header")
            elif e.response.status_code == 429:
                wait_time = (2 ** attempt) * 5
                if attempt < max_retries - 1:
                    print(f"[Wikidata 429] Rate limited. Waiting {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    print(f"[Wikidata 429] Entity: {entity} | Max retries reached. Skipping.")
            else:
                print(f"[Wikidata error] Entity: {entity} | {e}")
            return []
        except Exception as e:
            print(f"[Wikidata error] Entity: {entity} | {e}")
            return []

    return []


def wikidata_to_wikipedia(qid: str, lang: str = "tr") -> Optional[str]:
    """Convert Wikidata QID to Wikipedia URL."""
    url = "https://www.wikidata.org/w/api.php"
    params = {
        "action": "wbgetentities",
        "ids": qid,
        "props": "sitelinks",
        "format": "json"
    }

    try:
        r = requests.get(url, params=params, headers=HEADERS, timeout=10)
        r.raise_for_status()
        data = r.json()
        entity = data["entities"][qid]
        key = f"{lang}wiki"
        return entity["sitelinks"][key]["url"] if key in entity["sitelinks"] else None
    except Exception:
        return None


def aggregate_tokens(entities: List[Dict]) -> List[Dict]:
    """Aggregate subword tokens (handle ## prefixes from BERT tokenization)."""
    if not entities:
        return []

    merged = []
    i = 0

    while i < len(entities):
        token = entities[i]
        word = token["word"]
        entity_group = token["entity_group"]
        score = token["score"]
        start = token["start"]
        end = token["end"]

        # Keep merging consecutive ## tokens of the same entity type
        j = i + 1
        while j < len(entities):
            next_token = entities[j]
            # Check if next token is a subword (starts with ##) and same entity group
            if next_token["word"].startswith("##") and next_token["entity_group"] == entity_group:
                word += next_token["word"][2:]  # remove ##
                end = next_token["end"]
                # Average the scores
                score = (score + next_token["score"]) / 2
                j += 1
            else:
                break

        merged.append({
            "entity_group": entity_group,
            "word": word,
            "score": score,
            "start": start,
            "end": end
        })

        i = j if j > i + 1 else i + 1

    return merged


print("‚úÖ Helper functions defined!")


‚úÖ Helper functions defined!


In [32]:
def extract_entities(text: str, ner_pipeline) -> List[Dict]:
    """Extract entities from text using NER pipeline (single text)."""
    if not text or not text.strip():
        return []

    try:
        # Suppress warnings for this specific call
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=DeprecationWarning)
            raw_entities = ner_pipeline(text)

        if not raw_entities:
            return []

        # Aggregate subword tokens
        entities = aggregate_tokens(raw_entities)

        # Count entity frequencies
        entity_counter = collections.Counter(
            e["word"] for e in entities
        )

        # Build entity list with metadata
        entity_list = []
        for entity_name, freq in entity_counter.items():
            # Find the first occurrence for metadata
            first_occurrence = next(
                (e for e in entities if e["word"] == entity_name),
                None
            )

            if first_occurrence:
                entity_list.append({
                    "entity": entity_name,
                    "entity_group": first_occurrence["entity_group"],
                    "frequency": freq,
                    "confidence": float(first_occurrence["score"])  # Ensure float type
                })

        return entity_list
    except Exception as e:
        # Only log actual errors, not deprecation warnings
        if "np.float_" not in str(e) and "np.int_" not in str(e):
            print(f"[NER error] {e}")
        return []


def extract_entities_batch(texts: List[str], ner_pipeline) -> List[List[Dict]]:
    """Extract entities from multiple texts using batch processing (MUCH FASTER)."""
    if not texts:
        return []

    # Filter out empty texts and track indices
    valid_indices = []
    valid_texts = []
    for i, text in enumerate(texts):
        if text and text.strip():
            valid_indices.append(i)
            valid_texts.append(text)

    if not valid_texts:
        return [[] for _ in texts]

    try:
        # Suppress warnings for this specific call
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=DeprecationWarning)
            # Process batch - pipeline handles batching automatically
            # The pipeline returns a list of lists, one per input text
            raw_entities_batch = ner_pipeline(valid_texts)

        # Initialize results list
        results = [[] for _ in texts]

        # Process each text's entities
        for batch_idx, orig_idx in enumerate(valid_indices):
            if batch_idx < len(raw_entities_batch):
                raw_entities = raw_entities_batch[batch_idx]

                if not raw_entities:
                    continue

                # Handle case where pipeline returns single list vs nested list
                if isinstance(raw_entities, list) and len(raw_entities) > 0 and isinstance(raw_entities[0], dict):
                    # Aggregate subword tokens
                    entities = aggregate_tokens(raw_entities)

                    # Count entity frequencies
                    entity_counter = collections.Counter(
                        e["word"] for e in entities
                    )

                    # Build entity list with metadata
                    entity_list = []
                    for entity_name, freq in entity_counter.items():
                        # Find the first occurrence for metadata
                        first_occurrence = next(
                            (e for e in entities if e["word"] == entity_name),
                            None
                        )

                        if first_occurrence:
                            entity_list.append({
                                "entity": entity_name,
                                "entity_group": first_occurrence["entity_group"],
                                "frequency": freq,
                                "confidence": float(first_occurrence["score"])
                            })

                    results[orig_idx] = entity_list

        return results
    except Exception as e:
        # Fallback to individual processing if batch fails
        if "np.float_" not in str(e) and "np.int_" not in str(e):
            print(f"[NER batch error, falling back to individual processing] {e}")
        return [extract_entities(text, ner_pipeline) for text in texts]


def link_entities_to_wikipedia(entities: List[Dict], cache: Dict[str, Optional[str]] = None) -> List[Dict]:
    """Link entities to Wikipedia via Wikidata API with caching."""
    if cache is None:
        cache = {}

    linked_entities = []

    for entity_data in entities:
        entity_name = entity_data["entity"]

        # Check cache first
        if entity_name in cache:
            wiki_url = cache[entity_name]
        else:
            # Search Wikidata
            candidates = wikidata_search(entity_name)

            if candidates:
                qid = candidates[0]["id"]
                wiki_url = wikidata_to_wikipedia(qid)
            else:
                wiki_url = None

            # Cache the result (even if None to avoid retrying)
            cache[entity_name] = wiki_url

        # Add Wikipedia URL if found
        entity_data["wikipedia_url"] = wiki_url

        linked_entities.append(entity_data)

    return linked_entities


def process_speech_document(doc: Dict[str, Any], ner_pipeline, link_wikipedia: bool = True, cache: Dict[str, Optional[str]] = None) -> Optional[Dict]:
    """Process a single speech document and extract entities."""
    source = doc.get("_source", {})
    content = source.get("content", "")

    if not content:
        return None

    # Extract entities
    entities = extract_entities(content, ner_pipeline)

    if not entities:
        return None

    # Link to Wikipedia if requested
    if link_wikipedia:
        entities = link_entities_to_wikipedia(entities, cache=cache)

    # Return update document
    return {
        "_id": doc["_id"],
        "_source": {
            "ner_entities": entities
        }
    }


print("‚úÖ Entity extraction functions defined!")


‚úÖ Entity extraction functions defined!


## 6. Connect to Elasticsearch


In [33]:
# Connect to Elasticsearch
print(f"üì° Connecting to Elasticsearch at {ELASTICSEARCH_HOST}...")
es = Elasticsearch(hosts=[ELASTICSEARCH_HOST])

try:
    if not es.ping():
        raise Exception("Failed to ping Elasticsearch")
    print("‚úÖ Connected to Elasticsearch")
    print(f"   Version: {es.info()['version']['number']}")
except Exception as e:
    print(f"‚ùå Connection error: {e}")
    print(f"   Make sure Elasticsearch is running at {ELASTICSEARCH_HOST}")
    raise


üì° Connecting to Elasticsearch at https://cab-teach-src-oven.trycloudflare.com...
‚úÖ Connected to Elasticsearch
   Version: 8.6.1


## 7. Update Elasticsearch Mapping

Ensure the index has the correct mapping for `ner_entities` field.


In [34]:
def update_elasticsearch_mapping(es: Elasticsearch, index_name: str):
    """Update Elasticsearch mapping to include ner_entities field."""
    mapping = {
        "properties": {
            "ner_entities": {
                "type": "nested",
                "properties": {
                    "entity": {"type": "keyword"},
                    "entity_group": {"type": "keyword"},
                    "frequency": {"type": "integer"},
                    "wikipedia_url": {"type": "keyword"},
                    "confidence": {"type": "float"}
                }
            }
        }
    }

    try:
        es.indices.put_mapping(index=index_name, body=mapping)
        print(f"‚úÖ Updated mapping for '{index_name}' with ner_entities field")
    except Exception as e:
        print(f"‚ö†Ô∏è  Warning: Could not update mapping: {e}")
        print("   The field will be added dynamically, but explicit mapping is recommended.")


# Update mapping
print(f"\nüîß Updating Elasticsearch mapping...")
update_elasticsearch_mapping(es, ELASTICSEARCH_INDEX)



üîß Updating Elasticsearch mapping...


  es.indices.put_mapping(index=index_name, body=mapping)


‚úÖ Updated mapping for 'parliament_speeches' with ner_entities field


## 8. Load NER Model

Load the TerminatorPower/nerT Turkish NER model. This may take a few minutes on first run.


In [35]:
# Load NER model with GPU support and batch processing
print(f"\nü§ñ Loading NER model (TerminatorPower/nerT)...")
print(f"   Device: {'GPU (CUDA)' if device == 0 else 'CPU'}")
print("   This may take a few minutes on first run...")

try:
    ner_pipeline = pipeline(
        "token-classification",
        model="TerminatorPower/nerT",
        aggregation_strategy="simple",
        device=device,  # Use GPU if available
        batch_size=32  # Process 32 texts at once (adjust based on GPU memory)
    )
    print("‚úÖ NER model loaded successfully")
    if device == 0:
        print("   üöÄ Using GPU acceleration for faster processing!")
except Exception as e:
    print(f"‚ùå Failed to load NER model: {e}")
    raise



ü§ñ Loading NER model (TerminatorPower/nerT)...
   Device: GPU (CUDA)
   This may take a few minutes on first run...


Device set to use cuda:0


‚úÖ NER model loaded successfully
   üöÄ Using GPU acceleration for faster processing!


## 9. Configuration - OPTIMIZED FOR SPEED

Set your processing preferences. With sufficient RAM/GPU, you can significantly increase batch sizes:
- **NER Batch Size**: Number of texts processed simultaneously (64-128 recommended for GPU, 16-32 for CPU)
- **ES Batch Size**: Documents per Elasticsearch bulk update (500-1000 recommended)
- **Wikipedia Linking**: Adds ~0.5s per unique entity (disable for maximum speed)


In [36]:
# Configuration - OPTIMIZED FOR SPEED
# Set these variables to control processing behavior

# Link entities to Wikipedia? (True/False)
# WARNING: Wikipedia linking adds ~0.5s per unique entity
# Without caching, this can take 30+ hours for 8,930 documents
# With caching (recommended), it's much faster after initial lookups
LINK_WIKIPEDIA = False  # Set to True to enable Wikipedia linking

# Batch processing settings
NER_BATCH_SIZE = 1024  # Number of texts to process at once (increase if you have GPU/RAM)
ES_BATCH_SIZE = 2000  # Number of documents to bulk update at once (increase for faster updates)
PARALLEL_WORKERS = 4  # Number of parallel workers for Wikipedia linking (if enabled)

# Memory optimization
MAX_TEXT_LENGTH = 0  # Truncate very long texts to avoid memory issues (0 = no limit)

print("=" * 80)
print("Configuration (OPTIMIZED FOR SPEED):")
print("=" * 80)
print(f"   Wikipedia Linking: {LINK_WIKIPEDIA}")
print(f"   NER Batch Size: {NER_BATCH_SIZE} texts per batch")
print(f"   ES Batch Size: {ES_BATCH_SIZE} documents per bulk update")
print(f"   Parallel Workers: {PARALLEL_WORKERS}")
print(f"   Max Text Length: {MAX_TEXT_LENGTH if MAX_TEXT_LENGTH > 0 else 'Unlimited'}")
if LINK_WIKIPEDIA:
    print("   ‚úÖ Wikipedia linking enabled (with caching)")
    print("   First pass will be slower, but subsequent entities will be instant")
else:
    print("   ‚úÖ Wikipedia linking disabled - faster processing")
print("=" * 80)


Configuration (OPTIMIZED FOR SPEED):
   Wikipedia Linking: False
   NER Batch Size: 1024 texts per batch
   ES Batch Size: 2000 documents per bulk update
   Parallel Workers: 4
   Max Text Length: Unlimited
   ‚úÖ Wikipedia linking disabled - faster processing


## 10. Get Document Count

Check how many documents need to be processed.


In [37]:
# Get total document count
print(f"\nüìä Counting documents in '{ELASTICSEARCH_INDEX}'...")
try:
    count_response = es.count(index=ELASTICSEARCH_INDEX)
    total_docs = count_response["count"]
    print(f"   Found {total_docs:,} documents")
except Exception as e:
    print(f"‚ùå Error counting documents: {e}")
    raise

if total_docs == 0:
    print("‚ö†Ô∏è  No documents found in index. Exiting.")
    raise ValueError("No documents in index")



üìä Counting documents in 'parliament_speeches'...
   Found 27,662 documents


## 11. Process Documents

Process all speeches and extract NER entities. This may take a while depending on the number of documents.


In [38]:
# Initialize cache for Wikipedia lookups
wiki_cache: Dict[str, Optional[str]] = {}
NER_BATCH_SIZE = 2048
HEADERS = {
    # Optional: Add User-Agent header
     "User-Agent": "Turkish-NEL-Research/1.0"
}

# Process documents with BATCH PROCESSING for maximum speed
print("\n" + "=" * 80)
print("Processing speeches with BATCH PROCESSING (OPTIMIZED)...")
print("=" * 80)

# Collect all documents first (or process in chunks)
print("üì• Loading documents from Elasticsearch...")
all_docs = []
for doc in tqdm(scan(es, query={"query": {"match_all": {}}, "_source": ["content"]},
                     index=ELASTICSEARCH_INDEX), total=total_docs, desc="Loading"):
    all_docs.append(doc)

print(f"‚úÖ Loaded {len(all_docs):,} documents into memory")

# Process in batches
processed = 0
updated = 0
errors = 0
es_batch = []

# Process documents in batches
print(f"\nüöÄ Processing {len(all_docs):,} documents in batches of {NER_BATCH_SIZE}...")

for batch_start in tqdm(range(0, len(all_docs), NER_BATCH_SIZE), desc="Batches"):
    batch_end = min(batch_start + NER_BATCH_SIZE, len(all_docs))
    doc_batch = all_docs[batch_start:batch_end]

    try:
        # Extract texts and IDs
        texts = []
        doc_ids = []
        doc_indices = []

        for idx, doc in enumerate(doc_batch):
            content = doc.get("_source", {}).get("content", "")
            if MAX_TEXT_LENGTH > 0 and len(content) > MAX_TEXT_LENGTH:
                content = content[:MAX_TEXT_LENGTH]  # Truncate if needed

            texts.append(content)
            doc_ids.append(doc["_id"])
            doc_indices.append(batch_start + idx)

        # Process batch with NER model (MUCH FASTER than individual processing)
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=DeprecationWarning)
            entities_batch = extract_entities_batch(texts, ner_pipeline)

        # Process each document's entities
        for doc_idx, (doc_id, entities) in enumerate(zip(doc_ids, entities_batch)):
            processed += 1

            if not entities:
                continue

            # Link to Wikipedia if requested
            if LINK_WIKIPEDIA:
                entities = link_entities_to_wikipedia(entities, cache=wiki_cache)

            # Add to ES batch
            es_batch.append({
                "_op_type": "update",
                "_index": ELASTICSEARCH_INDEX,
                "_id": doc_id,
                "doc": {"ner_entities": entities}
            })
            updated += 1

            # Bulk update when ES batch is full
            if len(es_batch) >= ES_BATCH_SIZE:
                try:
                    success, failed = bulk(es, es_batch, stats_only=False, raise_on_error=False)
                    if failed:
                        tqdm.write(f"\n‚ö†Ô∏è  Warning: {len(failed)} bulk update failures")
                        errors += len(failed)
                except Exception as bulk_err:
                    tqdm.write(f"\n‚ùå Bulk update error: {bulk_err}")
                    errors += len(es_batch)
                es_batch = []

        # Progress update
        if processed % (NER_BATCH_SIZE * 10) == 0:
            cache_info = ""
            if LINK_WIKIPEDIA:
                cache_hits = len([v for v in wiki_cache.values() if v is not None])
                cache_total = len(wiki_cache)
                cache_info = f" | Cache: {cache_total} entities ({cache_hits} linked)"
            tqdm.write(f"Processed: {processed:,} | Updated: {updated:,} | Errors: {errors}{cache_info}")

    except Exception as e:
        error_msg = str(e)
        if "np.float_" not in error_msg and "np.int_" not in error_msg and "np.complex_" not in error_msg:
            errors += len(doc_batch)
            if errors <= 10:
                tqdm.write(f"\n‚ö†Ô∏è  Error processing batch {batch_start}-{batch_end}: {e}")

# Process remaining ES batch
if es_batch:
    try:
        success, failed = bulk(es, es_batch, stats_only=False, raise_on_error=False)
        if failed:
            print(f"\n‚ö†Ô∏è  Warning: {len(failed)} failures in final batch")
            errors += len(failed)
    except Exception as bulk_err:
        print(f"\n‚ùå Final batch error: {bulk_err}")
        errors += len(es_batch)

print("\n‚úÖ Batch processing complete!")



Processing speeches with BATCH PROCESSING (OPTIMIZED)...
üì• Loading documents from Elasticsearch...


Loading: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 27662/27662 [02:56<00:00, 156.79it/s]


‚úÖ Loaded 27,662 documents into memory

üöÄ Processing 27,662 documents in batches of 2048...


Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connectionpool.py", line 787, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connectionpool.py", line 534, in _make_request
    response = conn.getresponse()
               ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connection.py", line 565, in getresponse
    httplib_response = super().getresponse()
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/http/client.py", line 1430, in getresponse
    response.begin()
  File "/usr/lib/python3.12/http/client.py", line 331, in begin
    version, status, reason = self._read_status()
                              ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/http/client.py", line 300, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
http.client.RemoteDisconnected: Remote end cl

Processed: 20,480 | Updated: 19,604 | Errors: 0


Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 14/14 [12:13<00:00, 52.37s/it]



‚úÖ Batch processing complete!


In [45]:
# Force refresh to ensure all updates are visible
print("\n‚ôªÔ∏è  Refreshing index to ensure updates are committed...")
try:
    es.indices.refresh(index=ELASTICSEARCH_INDEX)
    print("‚úÖ Index refreshed")
except Exception as refresh_err:
    print(f"‚ö†Ô∏è  Refresh warning: {refresh_err}")

# Display summary
print("\n" + "=" * 80)
print("‚úÖ Processing complete!")
print("=" * 80)
print(f"   Total processed: {processed:,}")
print(f"   Documents updated: {updated:,}")
print(f"   Errors: {errors}")
if LINK_WIKIPEDIA:
    cache_total = len(wiki_cache)
    cache_linked = len([v for v in wiki_cache.values() if v is not None])
    print(f"   Unique entities cached: {cache_total:,}")
    print(f"   Entities with Wikipedia links: {cache_linked:,}")
print("=" * 80)



‚ôªÔ∏è  Refreshing index to ensure updates are committed...
‚úÖ Index refreshed

‚úÖ Processing complete!
   Total processed: 27,662
   Documents updated: 26,769
   Errors: 0


## 13. Verify Results

Check a sample document to verify that NER entities were extracted correctly.


In [47]:
# Get a sample document with NER entities
# FIXED: Use nested query for nested fields + updated Elasticsearch 8.x API

print("üîç Checking for documents with NER entities...")

try:
    # First, check total count
    total_count = es.count(index=ELASTICSEARCH_INDEX)['count']
    print(f"   Total documents in index: {total_count:,}")

    # Use nested query to check for ner_entities (required for nested fields)
    count_query = {
        "query": {
            "nested": {
                "path": "ner_entities",
                "query": {
                    "exists": {"field": "ner_entities.entity"}
                }
            }
        },
        "size": 0  # Just get count
    }

    # Updated for Elasticsearch 8.x API (no 'body' parameter)
    response = es.search(index=ELASTICSEARCH_INDEX, **count_query)
    docs_with_ner = response['hits']['total']['value']

    print(f"   Documents with NER entities: {docs_with_ner:,}")

    if docs_with_ner > 0:
        # Get a sample document
        sample_query = {
            "query": {
                "nested": {
                    "path": "ner_entities",
                    "query": {
                        "exists": {"field": "ner_entities.entity"}
                    }
                }
            },
            "size": 1
        }

        # Updated for Elasticsearch 8.x API (no 'body' parameter)
        response = es.search(index=ELASTICSEARCH_INDEX, **sample_query)
        sample_doc = response['hits']['hits'][0]['_source']

        print("\n‚úÖ Sample document with NER entities:")
        print(f"\n   Document ID: {response['hits']['hits'][0]['_id']}")
        print(f"   Speech Giver: {sample_doc.get('speech_giver', 'N/A')}")
        print(f"   Term: {sample_doc.get('term', 'N/A')}, Year: {sample_doc.get('year', 'N/A')}")
        print(f"\n   Found {len(sample_doc.get('ner_entities', []))} entities:")

        # Group entities by type
        entities_by_type = {}
        for entity in sample_doc.get('ner_entities', []):
            entity_type = entity.get('entity_group', 'UNKNOWN')
            if entity_type not in entities_by_type:
                entities_by_type[entity_type] = []
            entities_by_type[entity_type].append(entity)

        for entity_type, entities in entities_by_type.items():
            print(f"\n   {entity_type}:")
            for entity in entities[:5]:  # Show first 5 of each type
                wiki_link = entity.get('wikipedia_url', '')
                wiki_info = f" [Wikipedia: {wiki_link}]" if wiki_link else ""
                print(f"      - {entity.get('entity')} (freq: {entity.get('frequency', 0)}, conf: {entity.get('confidence', 0):.3f}){wiki_info}")
            if len(entities) > 5:
                print(f"      ... and {len(entities) - 5} more")
    else:
        print("\n‚ö†Ô∏è  No documents with NER entities found yet")
        print("\n   Possible reasons:")
        print("   1. Processing is still running (check the progress bar)")
        print("   2. Processing completed but no entities were found in documents")
        print("   3. Bulk updates haven't been committed yet")
        print("\n   Checking a random document to see its structure...")

        # Check a random document
        random_query = {"query": {"match_all": {}}, "size": 1}
        random_response = es.search(index=ELASTICSEARCH_INDEX, **random_query)
        if random_response['hits']['total']['value'] > 0:
            random_doc = random_response['hits']['hits'][0]['_source']
            has_content = 'content' in random_doc and len(random_doc.get('content', '')) > 0
            has_ner = 'ner_entities' in random_doc
            print(f"   Random document has 'content' field: {has_content}")
            print(f"   Random document has 'ner_entities' field: {has_ner}")
            if has_ner:
                print(f"   NER entities count: {len(random_doc.get('ner_entities', []))}")

except Exception as e:
    print(f"‚ùå Error verifying results: {e}")
    import traceback
    traceback.print_exc()


üîç Checking for documents with NER entities...
   Total documents in index: 27,662
   Documents with NER entities: 10,000

‚úÖ Sample document with NER entities:

   Document ID: term26-year3-session51-26
   Speech Giver: Ahmet Yƒ±ldƒ±rƒ±m
   Term: 26, Year: 3

   Found 5 entities:

   PER:
      - ahmet yildirim (freq: 3, conf: 0.999)

   LOC:
      - mus (freq: 3, conf: 0.970)
      - ege (freq: 1, conf: 0.995)

   ORG:
      - tbmm (freq: 1, conf: 0.997)
      - turkiye buyuk millet meclisinin (freq: 1, conf: 0.988)


## 14. Next Steps

After running this notebook locally:

1. **Verify Results**: Check that documents have `ner_entities` populated
2. **Sync to GCP VM**: Copy the enriched data to your GCP VM Elasticsearch instance
3. **Test API**: Verify that API endpoints return NER entities correctly

### Syncing to GCP VM

You can sync the data using Elasticsearch reindex API or by:
- Exporting documents with NER entities from local ES
- Importing them into GCP VM ES

Alternatively, you can run this notebook on a GCP VM or Vertex AI Workbench instance connected to your GCP VM Elasticsearch.
