# Part 3: Transaction

## 1. Setups

In [1]:
from elasticsearch import Elasticsearch, helpers
import pandas as pd
import json
from time import sleep
import threading
from pprint import pprint

In [2]:
es = Elasticsearch(
        "http://localhost:9200",
)
if es.ping():
    print("Connected to Elasticsearch")
else:
    print("Could not connect to Elasticsearch")

es.cluster.put_settings(
    transient={
        "cluster.routing.allocation.disk.watermark.low": "92%",
        "cluster.routing.allocation.disk.watermark.high": "95%",
        "cluster.routing.allocation.disk.watermark.flood_stage": "97%",
    }
)

Connected to Elasticsearch


ObjectApiResponse({'acknowledged': True, 'persistent': {}, 'transient': {'cluster': {'routing': {'allocation': {'disk': {'watermark': {'low': '92%', 'flood_stage': '97%', 'high': '95%'}}}}}}})

In [4]:
excel_file = "../../data/text/Radiologists Report.xlsx"
TEXT_INDEX = "radiology_reports"
df = pd.read_excel(excel_file)
df_clean = df.dropna(subset=['Patient ID', "Clinician's Notes"])
print(f"Dataset shape: {df.shape}")
print(df.head())

Dataset shape: (575, 2)
   Patient ID                                  Clinician's Notes
0           1  L4-5: degenerative annular disc bulge is noted...
1           2  No evidence of disc herniation.\nNo significan...
2           3  LSS MRI\nFeatures of muscle spasm.\nsmall cent...
3           4  Feature of muscle spasm.\nDiffuse disc bulges ...
4           5  LSS MRI :\nFeature of muscle spasm.\nDiffuse d...


In [5]:
# Define index mapping, with 1 shard and 1 replica
index_name = TEXT_INDEX
index_mapping = {
    "mappings": {
        "properties": {
            "patient_id": {
                "type": "integer"
            },
            "clinicians_notes": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword"
                    },
                    "english": {
                        "type": "text",
                    }
                }
            }
        }
    },
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1,
    }
}

# Delete index if it exists
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name, timeout='2m')

# # Observe indices
# indices = es.cat.indices(format="json")  # list of dicts
# for row in indices:
#     print(row["health"], row["status"], row["index"], row["docs.count"], row["store.size"])


# Create index
es.indices.create(index=index_name, body=index_mapping, timeout='2m')


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'radiology_reports'})

In [6]:
# Wait for index creation to propagate
sleep(0.1)  
# Check index shards and replicas location, likely that primary shard is on data node and replica on master node
shards_info = es.cat.shards(index=index_name, h=["index", "shard", "prirep", "node"], format="json")
print(json.dumps(shards_info.body, indent=2))

[
  {
    "index": "radiology_reports",
    "shard": "0",
    "prirep": "r",
    "node": "es_data"
  },
  {
    "index": "radiology_reports",
    "shard": "0",
    "prirep": "p",
    "node": "es_master"
  },
  {
    "index": "radiology_reports",
    "shard": "1",
    "prirep": "p",
    "node": "es_data"
  },
  {
    "index": "radiology_reports",
    "shard": "1",
    "prirep": "r",
    "node": "es_master"
  }
]


In [7]:
# Prepare documents for indexing 
documents = [] 
for idx, row in df_clean.iterrows(): 
    doc = { 
           "_index": index_name, 
           "_id": idx, 
           "_source": { 
               "patient_id": row['Patient ID'], 
               "clinicians_notes": row["Clinician's Notes"] 
            }
    } 
    documents.append(doc)

In [8]:
# Index the documents in bulk
try:
    success, failed = helpers.bulk(es, documents, stats_only=True)
    print(f"Successfully indexed {success} documents. Failed: {failed}")
except Exception as e:
    print(f"Error during bulk indexing: {e}")

sleep(1)

# Verify the data was indexed
count = es.count(index=index_name)['count']
print(f"Total documents in index: {count}")

Successfully indexed 515 documents. Failed: 0
Total documents in index: 515


## 2. Refresh segment

### 1. Example of searching for a not-yet-refreshed document

Search query can return old results if requested before index refreshes.

In [29]:
search_query = {
    "query": {
        "match": {
            "patient_id": 1
        }
    }
}

res = es.search(index=index_name, body=search_query)
hits = res['hits']['hits']

if not hits:
    print("No documents found for patient_id 1")
else:
    hit = hits[0]
    doc_id = hit['_id']
    try:
        es.update(
            index=index_name, 
            id=doc_id, 
            body={"doc": {"clinicians_notes": "unknown"}}
        )
        print(f"Updated document id {doc_id}")
    except Exception as e:
        print(f"Error updating id {doc_id}: {e}")

    # search will return the old value since refresh has not occurred yet
    result = es.search(index=index_name, body=search_query)
    print(json.dumps(result['hits']['hits'], indent=2))

Updated document id 0
[
  {
    "_index": "radiology_reports",
    "_id": "0",
    "_score": 1.0,
    "_source": {
      "patient_id": 1,
      "clinicians_notes": "L4-5: degenerative annular disc bulge is noted more to the left side compressing thecal sac, compressing left nerve root and narrowing right neural foramen. // Evidence of hyperintense signal within the annulus fibrosus at left paramedian/posterolateral area which probably represents a torn annulus."
    }
  }
]


... but will return updated results after index refreshes => eventual consistency.

In [30]:
search_query = {
    "query": {
        "match": {
            "patient_id": 1
        }
    }
}

result = es.search(index=index_name, body=search_query)
print(json.dumps(result['hits']['hits'], indent=2))

[
  {
    "_index": "radiology_reports",
    "_id": "0",
    "_score": 1.0,
    "_source": {
      "patient_id": 1,
      "clinicians_notes": "unknown"
    }
  }
]


### 2. ES provides an API to set the refresh interval of an index

In [33]:
es.indices.put_settings(
    index=index_name,
    body={
        "index": {
            "refresh_interval": "2s"
        }
    }
)

ObjectApiResponse({'acknowledged': True})

In [32]:
# query after refresh
es.indices.refresh(index=index_name)
search_query = {
    "query": {
        "match": {
            "patient_id": 11
        }
    }
}

result = es.search(index=index_name, body=search_query)
print(json.dumps(result['hits']['hits'], indent=2))

[
  {
    "_index": "radiology_reports",
    "_id": "10",
    "_score": 1.0,
    "_source": {
      "patient_id": 11,
      "clinicians_notes": "LSS MRI :\nAbout 3*2 cm lesion with inhomogenous signal intensity noted just posterior to L1 veretbral body ,largely compressing the Rt side of the thecal sac. contrast study is advsied.\nDiffuse disc bulge noted at L4/L5 level, compresing the thecal sac and encroaching upon both neural canals.\nWide base disc bulge noted at L5/S1 level, extending to Lt lateral recess , compressin the thecal sac and nerve roots, more to left side, associated with ligamnetum flavum hyperatrophy. \n  "
    }
  }
]


## 3. Full rewrite for document update

In [10]:
# initial insert
es.index(
    index=index_name,
    id="1001",
    body={
        "patient_id": 1001,
        "clinicians_notes": "Initial note"
    }
)

ObjectApiResponse({'_index': 'radiology_reports', '_id': '1001', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 228, '_primary_term': 1})

In [11]:
def show_segments(label):
    segs = es.indices.segments(index=TEXT_INDEX)
    shards = segs["indices"][TEXT_INDEX]["shards"]
    print(f"\n--- {label} ---")
    table_rows = []
    for shard_id, shard_list in shards.items():
        for shard in shard_list:
            for idx, segment in enumerate(shard['segments'].values()):
                table_rows.append([
                    shard_id,
                    list(shard['segments'].keys())[idx],
                    segment['num_docs'],
                    segment['deleted_docs']
                ])
    print(pd.DataFrame(table_rows, columns=["shard", "segments", "num_docs", "deleted_docs"]))

show_segments("After initial insert")


--- After initial insert ---
  shard segments  num_docs  deleted_docs
0     0       _0       287             0
1     0       _0       287             0
2     1       _0       228             0
3     1       _1         1             0
4     1       _0       228             0
5     1       _1         1             0


In [12]:
# Close the Elasticsearch connection
es.close()
print("Elasticsearch connection closed")

Elasticsearch connection closed


## 4. Delete is logical (tombstone) until merged

In [12]:
def show_segments(label):
    segs = es.indices.segments(index=TEXT_INDEX)
    shards = segs["indices"][TEXT_INDEX]["shards"]
    print(f"\n--- {label} ---")
    table_rows = []
    for shard_id, shard_list in shards.items():
        for shard in shard_list:
            for idx, segment in enumerate(shard['segments'].values()):
                table_rows.append([
                    shard_id,
                    list(shard['segments'].keys())[idx],
                    segment['num_docs'],
                    segment['deleted_docs']
                ])
    print(pd.DataFrame(table_rows, columns=["shard", "segments", "num_docs", "deleted_docs"]))

# delete the doc (logical delete => tombstone)
es.delete(index=TEXT_INDEX, id=18, ignore=[404])
sleep(2)
show_segments("After DELETE (tombstone present)")

# force merge to 1 segment (physical removal)
es.indices.forcemerge(index=TEXT_INDEX, max_num_segments=1)
sleep(2)
show_segments("After force merge (tombstone cleaned)")


  es.delete(index=TEXT_INDEX, id=18, ignore=[404])



--- After DELETE (tombstone present) ---
  shard segments  num_docs  deleted_docs
0     0       _0       287             0
1     0       _1         0             1
2     0       _0       287             0
3     0       _1         0             1
4     1       _0       228             0
5     1       _1         1             0
6     1       _0       228             0
7     1       _1         1             0

--- After force merge (tombstone cleaned) ---
  shard segments  num_docs  deleted_docs
0     0       _2       286             1
1     0       _2       286             1
2     1       _2       229             0
3     1       _2       229             0


In [13]:
es.indices.forcemerge(index=TEXT_INDEX, max_num_segments=1)
show_segments("After force merge (tombstone cleaned)")


--- After force merge (tombstone cleaned) ---
  shard segments  num_docs  deleted_docs
0     0       _3       286             0
1     0       _3       286             0
2     1       _2       229             0
3     1       _2       229             0
