# Setting up Elasticsearch

The process we'll follow is:

1. Create an Elasticsearch client
2. Check if the required ingest pipeline exists, and create it if not
3. Check if the index with proper mappings exists, and create it if not
4. Load the Reddit data into the index

This ensures we don't overwrite existing settings if the index is already properly configured - and, importantly, don't ingest duplicate data!

In [None]:
import glob
import json
import os
import time
import uuid
from collections.abc import Iterator

from decouple import config
from tqdm.notebook import tqdm

from elasticsearch import Elasticsearch

# Load environment variables from .env file
ES_CLOUD_ID = config("ES_CLOUD_ID", default="")
ES_API_KEY = config("ES_API_KEY", default="")

# define Elasticsearch config files
es_index_name = "demo-chatroom.data-reddit"
es_index_settings_file = "../elasticsearch/indices/demo-chatroom.data-reddit.json"

es_ingest_pipeline_name = "reddit-chat-data-pipeline"
es_ingest_pipeline_file = "../elasticsearch/pipelines/reddit-chat-data-pipeline.json"

# define data directory
data_dir = "../data/reddit"

In [None]:
# Initialize Elasticsearch client
if not ES_CLOUD_ID or not ES_API_KEY:
    raise ValueError(
        "ES_CLOUD_ID and ES_API_KEY must be set in the environment variables."
    )

es_client = Elasticsearch(
    cloud_id=ES_CLOUD_ID,
    api_key=ES_API_KEY,
)

es_client.info()

In [None]:
def create_ingest_pipeline(
    es_client: Elasticsearch, pipeline_file: str, pipeline_name: str
) -> bool:
    """Create the ingest pipeline if it doesn't already exist.

    Args:
        es_client: Elasticsearch client
        pipeline_file: Path to the pipeline definition file
        pipeline_name: Name of the pipeline to create

    Returns:
        bool: True if pipeline was created, False if it already existed
    """
    # Check if pipeline exists
    try:
        es_client.ingest.get_pipeline(id=pipeline_name)
        print(f"Pipeline '{pipeline_name}' already exists")
        return False

    except Exception:
        print(f"Pipeline '{pipeline_name}' not found, creating it...")

        # Load pipeline definition from file
        with open(pipeline_file) as file:
            pipeline_definition = json.load(file)

        # Create the pipeline
        es_client.ingest.put_pipeline(id=pipeline_name, body=pipeline_definition)

        print(f"Pipeline '{pipeline_name}' created successfully")
        return True

In [None]:
def create_index(es_client: Elasticsearch, index_file: str, index_name: str):
    """Create the index if it doesn't already exist.

    Args:
        es_client: Elasticsearch client
        index_file: Path to the index definition file
        index_name: Name of the index to create

    Returns:
        bool: True if index was created, False if it already existed
    """
    # Check if index exists and store the result
    if es_client.indices.exists(index=index_name).body:
        print(f"Index '{index_name}' already exists")
    else:
        # If index does not exist, create it
        print(f"Index '{index_name}' not found, creating it with proper mappings...")

        # Load index definition from file
        with open(index_file) as file:
            index_definition = json.load(file)

        # Create the index with settings and mappings
        if es_client.indices.create(index=index_name, body=index_definition).body:
            print(f"Index '{index_name}' created successfully")
            return True
        else:
            print(f"Failed to create index '{index_name}'")
            return False

In [None]:
def reddit_ndjson_generator(filepath: str) -> Iterator[dict]:
    """Yield one JSON object at a time from an NDJSON file."""
    with open(filepath, encoding="utf-8") as file:
        for line in file:
            try:
                yield json.loads(line)
            except json.JSONDecodeError:
                print(f"Skipping malformed JSON in {filepath}")
                continue

In [None]:
def load_reddit_data_to_elasticsearch(
    es_client: Elasticsearch,
    es_index_name: str,
    data_dir: str = "../data/reddit",
    excluded_subreddits: list | None = None,
    retry_count: int = 3,
    timeout: int = 30,
):
    """Read Reddit NDJSON files, construct documents, and load to Elasticsearch using create (no duplicates).

    Args:
        es_client: Elasticsearch client
        es_index_name: The index name to load data into
        data_dir: Directory containing Reddit json files
        excluded_subreddits: optional list of subreddits to exclude from processing
        retry_count: Number of times to retry on timeout
        timeout: Timeout in seconds for operations
    """
    all_files = glob.glob(f"{data_dir}/*.ndjson")
    excluded_subreddits = excluded_subreddits or []

    files_to_process = [
        file
        for file in all_files
        if not any(
            excluded in os.path.basename(file) for excluded in excluded_subreddits
        )
    ]
    print(f"Found {len(files_to_process)} files to process")

    total_docs = 0

    for filepath in files_to_process:
        subreddit = os.path.basename(filepath).split("-")[2]
        print(f"Processing {subreddit}...")

        doc_count = 0

        with open(filepath, encoding="utf-8") as file:
            total_lines = sum(1 for _ in file)

        for data in tqdm(
            reddit_ndjson_generator(filepath),
            desc=f"Loading {subreddit}",
            total=total_lines,
            unit="doc",
        ):
            try:
                doc_id = str(uuid.uuid4())
                success = False
                attempts = 0
                while not success and attempts < retry_count:
                    try:
                        es_client.create(
                            index=es_index_name,
                            id=doc_id,
                            document=data,
                            timeout=f"{timeout}s",
                        )
                        success = True
                        doc_count += 1
                    except Exception as e:
                        if hasattr(e, "status_code") and e.status_code == 409:
                            # Document already exists, skip
                            success = True
                        else:
                            attempts += 1
                            print(f"Attempt {attempts}/{retry_count} failed: {e!s}")
                            if attempts >= retry_count:
                                print(
                                    f"Failed to upload document after {retry_count} attempts"
                                )
                                raise
                            time.sleep(2)
            except json.JSONDecodeError:
                print(f"Skipping malformed JSON in {filepath}")
                continue
        total_docs += doc_count
        print(f"Loaded {doc_count} documents from {subreddit}")
    print(f"Total documents loaded: {total_docs}")

In [None]:
# Create pipeline and index if they don't exist
create_ingest_pipeline(
    es_client=es_client,
    pipeline_file=es_ingest_pipeline_file,
    pipeline_name=es_ingest_pipeline_name,
)

create_index(
    es_client=es_client, index_file=es_index_settings_file, index_name=es_index_name
)

In [None]:
excluded_subreddits = [
    # "mildlyinteresting",
    # "personalfinance",
    # "philosophy",
    # "podcasts",
    # "programming",
    # "relationship_advice",
    # "science",
    # "scifi",
    # "Showerthoughts",
    # "SkincareAddiction",
    # "socialskills",
    # "space",
    # "sports",
    # "suggestmeabook",
    # "technology",
    # "tifu",
    # "todayilearned",
    # "travel",
    # "UpliftingNews",
    # "WritingPrompts",
    # "YouShouldKnow",
]

In [None]:
# Execute the function to load Reddit data to Elasticsearch
load_reddit_data_to_elasticsearch(
    es_client=es_client,
    es_index_name=es_index_name,
    data_dir=data_dir,
    excluded_subreddits=excluded_subreddits,
)