In [203]:
import os
import json
from opensearchpy import OpenSearch, RequestsHttpConnection, helpers
import boto3
from requests_aws4auth import AWS4Auth
from openai import OpenAI
from config import *

In [204]:
# ─── Configuration ──────────────────────────────────────────────────────────────
OPENSEARCH_ENDPOINT = "search-redcoat-express-wvba3hxtx72luhtzgh7tb56k4i.aos.eu-north-1.on.aws"
INDEX_NAME  = "redcoat-express"
AWS_REGION = "eu-north-1"

MASTER_USER        = "admin"              # the internal username you configured
MASTER_PASSWORD    = "ASINnumber1!"       # the password you set

DATA_DIRS = [email_chunks_dir, attachment_chunks_dir, thread_documents_dir]

Access over IAM (Switched Off)

In [205]:
# # Creating a session for a specific local user "rag-admin-user"
# session = boto3.Session(profile_name="rag-admin-user")

# # Accessing STS to assume the role with the right permissions"
# sts = session.client("sts")
# response = sts.assume_role(
#     RoleArn="arn:aws:iam::169189304433:role/RedcoatExpressRagAdmin",
#     RoleSessionName="OpenSearchUpload"
# )

# # Extracting credentials from the session
# credentials = response["Credentials"]

# # Passing credentials to AWSAUTH so we can authenticate to OpenSearch
# awsauth = AWS4Auth(
#     credentials["AccessKeyId"],
#     credentials["SecretAccessKey"],
#     REGION,
#     "es",
#     session_token=credentials["SessionToken"]
# )

# # Creating OpenSearch client with A.) my created OpenSearch domain and B.) my AWSAUTH credentials
# client = OpenSearch(
#     hosts=[{"host": DOMAIN, "port": 443}],
#     http_auth=awsauth,
#     use_ssl=True,
#     verify_certs=True,
#     connection_class=RequestsHttpConnection
# )

# # Testing connection:
# print(client.cat.indices(format="json"))

Access over OpenSearch Master User

In [206]:

client = OpenSearch(
    hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
    http_auth=(MASTER_USER, MASTER_PASSWORD),
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Test it:
indecies = client.cat.indices(format="json")
for i in indecies[:3]:
    print(i)
    print()


{'health': 'green', 'status': 'open', 'index': '.plugins-ml-model-group', 'uuid': 'GK4aZPJnTUGosjC69CfIyQ', 'pri': '1', 'rep': '2', 'docs.count': '0', 'docs.deleted': '0', 'store.size': '11.2kb', 'pri.store.size': '228b'}

{'health': 'green', 'status': 'open', 'index': 'otel-v1-apm-service-map-sample', 'uuid': 'InKqxx9zRB-xtgzA46VoUQ', 'pri': '1', 'rep': '2', 'docs.count': '49', 'docs.deleted': '0', 'store.size': '52.9kb', 'pri.store.size': '17.6kb'}

{'health': 'green', 'status': 'open', 'index': 'otel-v1-apm-span-sample', 'uuid': 'd8hxfImBTbG9YDzanqYMqw', 'pri': '1', 'rep': '2', 'docs.count': '13061', 'docs.deleted': '0', 'store.size': '19.1mb', 'pri.store.size': '6.8mb'}



Clean the existing Index if necessary

In [196]:
if client.indices.exists(INDEX_NAME):
    client.indices.delete(index=INDEX_NAME)
    print(f"[INFO] Deleted index {INDEX_NAME!r}")

[INFO] Deleted index 'redcoat-express'


In [197]:
if not client.indices.exists(INDEX_NAME):
    print(f"[INFO] creating index {INDEX_NAME!r}")
    print()
    mapping = {
        "settings": {"index": {"knn": True}},
        "mappings": {
            "dynamic": True,
            "properties": {
                "doc_id":      {"type": "keyword"},
                "thread_id":   {"type": "keyword"},
                "message_id":  {"type": "keyword"},
                "embedding":   {"type": "knn_vector", "dimension": 1536},
                "type":        {"type": "keyword"},
                "date":        {"type": "date"},
                "subject":     {"type": "text"},
                "chunk_text":  {"type": "text"},
                "chunk_index": {"type": "integer"},
                "filename":    {"type": "keyword"},
                "summary_text":{"type": "text"},
                "participants":{"type": "keyword"}
            }
        }
    }
    client.indices.create(INDEX_NAME, body=mapping)

[INFO] creating index 'redcoat-express'



Uploading Emails to Index

In [198]:

def actionas_generator():
    for directory in DATA_DIRS:
        print(f"Pulling data from: '{directory}'")
        print()
        for filename in os.listdir(directory):
            if not filename.endswith(".json"):
                continue
            path = os.path.join(directory, filename)
            with open(path, "r", encoding="utf-8") as f:
                doc = json.load(f)
            yield {
                "_index":  INDEX_NAME,
                "_id":     doc.get("doc_id", filename),
                "_source": doc
                }


print(f"[INFO] Documents ready to index")
total = 0
for i in DATA_DIRS:
    total += len(os.listdir(i))

success = 0
for idx, (ok, result) in enumerate(
    helpers.streaming_bulk(
        client,
        actionas_generator(),
        chunk_size=100,           # smaller batches
        request_timeout=60,       # give it a minute per request
        max_retries=2,
        initial_backoff=2,
        max_backoff=10,
        yield_ok=True
        )
    ):
    if ok:
        success += 1
    else:
        # result looks like {"index": {"_id": "...", "error": {...}}}
        print(f"[ERROR] Doc #{idx} failed to index:", result)

    if idx % verbosity == 0:
        print(f"[INFO] {idx}/{total} uploaded -> {round(success/total*100, 0)}%")

print(f"[DONE ] Finished indexing: {success}/{total} succeeded -> {round(success/total*100, 0)}%")

print("Refreshing index...")
client.indices.refresh(index=INDEX_NAME)

print("Index refreshed.")

[INFO] Documents ready to index
Pulling data from: 'C:\Users\jklas\email_processor\email_rag\data\chunked_emails'

[INFO] 0/363 uploaded -> 0.0%
[INFO] 20/363 uploaded -> 6.0%
[INFO] 40/363 uploaded -> 11.0%
[INFO] 60/363 uploaded -> 17.0%
[INFO] 80/363 uploaded -> 22.0%
Pulling data from: 'C:\Users\jklas\email_processor\email_rag\data\chunked_attachments'

[INFO] 100/363 uploaded -> 28.0%
[INFO] 120/363 uploaded -> 33.0%
[INFO] 140/363 uploaded -> 39.0%
[INFO] 160/363 uploaded -> 44.0%
[INFO] 180/363 uploaded -> 50.0%
[INFO] 200/363 uploaded -> 55.0%
[INFO] 220/363 uploaded -> 61.0%
[INFO] 240/363 uploaded -> 66.0%
[INFO] 260/363 uploaded -> 72.0%
[INFO] 280/363 uploaded -> 77.0%
Pulling data from: 'C:\Users\jklas\email_processor\email_rag\data\thread_documents'

[INFO] 300/363 uploaded -> 83.0%
[INFO] 320/363 uploaded -> 88.0%
[INFO] 340/363 uploaded -> 94.0%
[INFO] 360/363 uploaded -> 99.0%
[DONE ] Finished indexing: 363/363 succeeded -> 100.0%
Refreshing index...
Index refreshed.


In [None]:
# Connect to OpenSearch
client = OpenSearch(
    hosts=[{"host": OPENSEARCH_ENDPOINT, "port": 443}],
    http_auth=(MASTER_USER, MASTER_PASSWORD),
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)


def inspect_index(client):
    
    # Searching the index w/ max 5 results
    resp = client.search(
        index=INDEX_NAME,
        body={
        "size": 0,
        "aggs": {
            "types": {
            "terms": { "field": "type", "size": 5 }
            }
        }
        }
    )

    print("Type buckets:")
    for bucket in resp["aggregations"]["types"]["buckets"]:
        print(f"  {bucket['key']}  → {bucket['doc_count']} docs")


inspect_index(client)


Type buckets:
  attachment  → 191 docs
  email  → 120 docs
  thread  → 52 docs


In [201]:
"""
── OPENSEARCH DATA STRUCTURE ──────────────────────────────────────────────────────────

{
  "settings": {
    "index": {
      "knn": true                                       // k-NN plugin enabled
    }
  },
  "mappings": {
    "dynamic": true,                                   // allow extra fields beyond those listed
    "properties": {
      "doc_id":       { "type": "keyword"        },
      "thread_id":    { "type": "keyword"        },
      "message_id":   { "type": "keyword"        },
      "embedding":    { "type": "knn_vector",
                        "dimension": 1536       },     // embedding vector field
      "type":         { "type": "keyword"        }, 
      "date":         { "type": "date"           },
      "subject":      { "type": "text"           },
      "chunk_text":   { "type": "text"           },
      "summary_text": { "type": "text"           },
      "chunk_index":  { "type": "integer"        },
      "filename":     { "type": "keyword"        },
      "participants": { "type": "keyword"        }
    }
  }
}
"""

"""
── OPENSEARCH EXAMPLE ──────────────────────────────────────────────────────────

{
  "_index": "email_rag",
  "_id":    "thread123-0",          // unique per chunk
  "_source": {
    "doc_id":      "XXXXXX",
    "type":        "email",
    "thread_id":   "thread123",
    "message_id":  "msg-abc-001",
    "date":        "2025-05-12T09:39:48Z",
    "subject":     "Re: Please book collection for Thursday",
    "participants":[
                    "alice@example.com",
                    "bob@example.com"
                   ],
    "chunk_index": 0,
    "chunk_text":  "Great, thank you!\n<END OF MESSAGE>",
    "embedding":   [ -0.02526, 0.01429, … ]  // length 1536
  }
}
"""

'\n── OPENSEARCH EXAMPLE ──────────────────────────────────────────────────────────\n\n{\n  "_index": "email_rag",\n  "_id":    "thread123-0",          // unique per chunk\n  "_source": {\n    "doc_id":      "XXXXXX",\n    "type":        "email",\n    "thread_id":   "thread123",\n    "message_id":  "msg-abc-001",\n    "date":        "2025-05-12T09:39:48Z",\n    "subject":     "Re: Please book collection for Thursday",\n    "participants":[\n                    "alice@example.com",\n                    "bob@example.com"\n                   ],\n    "chunk_index": 0,\n    "chunk_text":  "Great, thank you!\n<END OF MESSAGE>",\n    "embedding":   [ -0.02526, 0.01429, … ]  // length 1536\n  }\n}\n'

In [None]:
print( client.get(index=INDEX_NAME, id="fd88ebfc0b7840138b4f3c665698d892@thy.com")["_source"])

NotFoundError: NotFoundError(404, '{"_index":"redcoat-express","_id":"fd88ebfc0b7840138b4f3c665698d892@thy.com","found":false}')

In [186]:
mapping = client.indices.get_mapping(index=INDEX_NAME)
print(json.dumps(mapping[INDEX_NAME]["mappings"]["properties"]["type"], indent=2))


{
  "type": "keyword"
}


In [187]:
def keyword_search(text, field="summary_text", size=5):
    body = {
      "size": size,
      "query": { "match": { field: text }}
    }
    return client.search(index=INDEX_NAME, body=body)["hits"]["hits"]

def knn_search(query_text, size=5):
    llm_client = OpenAI(api_key=SECRET_KEY)

    q_vec = llm_client.embeddings.create(
        model="text-embedding-ada-002", input=query_text
    ).data[0].embedding

    body = {
      "size": size,
      "query": {
        "knn": {
          "embedding": { "vector": q_vec, "k": size }
        }
      }
    }
    return client.search(index=INDEX_NAME, body=body)["hits"]["hits"]


In [188]:
text = "Who is the managing director of Redcoat?"
search = keyword_search(text)[0]["_source"]
search = knn_search(text)[0]["_source"]

for i in search:
    if i == "embedding":
        continue
    print(f"{i}: {search[i]}")

type: thread
thread_id: cabgx6kfhqu8ctuwch2w=6no6q54d18osbqxdjfcnfuhhfumvsg@mail.gmail.com
subject: Swissport Imports LHR.HCHImports@swissport.com
participants: ['Jason <jason.leadbeatter@redcoat.org.uk>', 'Simon leadbeatter <simon.leadbeatter@redcoat.org.uk>']
first_date: 2025-05-12T08:33:52+00:00
last_date: 2025-05-12T08:33:52+00:00
summary_text: This email thread contains a single message which is a signature block from Simon Leadbeatter, the Operations Manager at Redcoat Express Ltd, providing his contact information and the company's address.
