In [1]:
pip install opensearch-py

Note: you may need to restart the kernel to use updated packages.


In [2]:
from opensearchpy import OpenSearch,RequestsHttpConnection, helpers
import boto3
import gzip
from io import BytesIO
import json
import re

In [3]:
host = 'search-opensearch-domain-veqeg5v3lo36rov2j7w3hjdtty.us-west-2.es.amazonaws.com' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com
port = 443
region = 'us-west-2' # e.g. us-west-1


credentials = ("","")

In [4]:
client = OpenSearch(
    hosts = [f'{host}:{port}'],
    http_auth = credentials,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [5]:
info = client.info()

In [31]:
index_name = "stream_index"
if not client.indices.exists(index=index_name):
    # Define the index settings and mappings
    index_body = {
        'settings': {
            'number_of_shards': 1,
            'number_of_replicas': 1
        },
        'mappings': {
            'properties': {
                'patientId': {'type': 'keyword'},
                'name': {'type': 'text'},
                'age': {'type': 'integer'},
                'heartRate': {'type': 'integer'},
                'respiratoryRate': {'type': 'integer'},
                'oxygenSaturation': {'type': 'integer'},
                'seizureDetected': {'type': 'boolean'},
                'seizureDuration': {'type': 'integer'},
                'seizureSeverity': {'type': 'keyword'},
                'emergencyContact': {
                    'properties': {
                        'name': {'type': 'text'},
                        'relationship': {'type': 'keyword'},
                        'phone': {'type': 'keyword'},
                        'email': {'type': 'keyword'}
                    }
                },
                'location': {
                    'properties': {
                        'currentLocation': {'type': 'text'},
                        'hospitalId': {'type': 'keyword'}
                    }
                },
                'createdAt': {
                    'type': 'date'
                },
                'eventTimestamp': {
                    'type': 'date'
                }
            }
        }
    }

    # Create the index
    client.indices.create(index=index_name, body=index_body)
    print(f"Index {index_name} created successfully.")
else:
    print(f"Index {index_name} already exists.")


Index stream_index created successfully.


In [32]:
s3 = boto3.client("s3")

def process_gz_file(bucket_name, key):
    
    # Get the GZ file from S3
    response = s3.get_object(Bucket=bucket_name, Key=key)
    
    # Read and extract the GZ file
    with gzip.GzipFile(fileobj=response["Body"]) as gzipfile:
        content = gzipfile.read()

    json_objects = str(content).split("}{")
     
    processed_jsons = []  
    for json_object in json_objects:
            try:
                json_object = '{' + json_object + '}'
                data = json.loads(json_object)
                processed_jsons.append(data)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
                continue  # Skip invalid JSON entries
        
    return processed_jsons

In [33]:
bucket_name = 'streamanalyticsstack-rawdatabucket57f26c03-5wauf1ctwth3'
prefix = 'raw/'  

result = s3.list_objects(Bucket=bucket_name, Prefix = prefix)
processed_jsons = []

# Process each file in the result
if 'Contents' in result:
    for obj in result['Contents']:
        key = obj['Key']
        if key.endswith('.gz'):  # Check if the file is a TGZ file
            print(f"Processing file: {key}")

            processed_jsons = process_gz_file(bucket_name, key)

Processing file: raw/2024/08/13/09/StreamRawToS3-1-2024-08-13-09-52-05-eab92269-aea3-4edf-9894-58b74f82c64a.gz
Error decoding JSON: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
Error decoding JSON: Extra data: line 1 column 570 (char 569)
Processing file: raw/2024/08/14/09/StreamRawToS3-1-2024-08-14-09-43-51-ee72ba55-219e-4891-944c-307048a86e9c.gz
Error decoding JSON: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
Error decoding JSON: Extra data: line 1 column 560 (char 559)
Processing file: raw/2024/08/14/09/StreamRawToS3-1-2024-08-14-09-44-52-2b729741-0f74-4cfe-98cb-8f6703a15279.gz
Error decoding JSON: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
Error decoding JSON: Extra data: line 1 column 578 (char 577)
Processing file: raw/2024/08/14/09/StreamRawToS3-1-2024-08-14-09-45-53-7d8ccb75-0495-470c-a8fe-fcad8ab6233e.gz
Error decoding JSON: Expecting property name enclosed in double quotes: line 1 

In [34]:
print(len(processed_jsons))

25


In [35]:
from datetime import datetime

def format_date(date_str):
    # Parse the date string and remove microseconds
    date_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
    return date_obj.strftime("%Y-%m-%dT%H:%M:%S")  # ISO format without microseconds

In [36]:
def prepare_data_for_indexing(docs, index):
    for i in range(len(docs)):
        docs[i]["_index"] = index
        docs[i]['createdAt'] = format_date(docs[i]['createdAt'])
        docs[i]['eventTimestamp'] = format_date(docs[i]['eventTimestamp'])
    return docs

In [37]:
post_processed_jsons = prepare_data_for_indexing(processed_jsons, index_name)
print(post_processed_jsons)


[{'patientId': 'e946f336-02aa-43de-8ac1-0ecd2d0475c7', 'name': 'Jason Serrano', 'age': 52, 'heartRate': 109, 'respiratoryRate': 29, 'oxygenSaturation': 89, 'seizureDetected': False, 'seizureDuration': 0, 'seizureSeverity': 'None', 'emergencyContact': {'name': 'Shirley Graves', 'relationship': 'Guardian', 'phone': '6295751247', 'email': 'thernandez@example.org'}, 'location': {'currentLocation': '9806 Brian Rue\\nLake Yvonne, MH 88166', 'hospitalId': 'MSFRGBGB'}, 'createdAt': '2024-08-14T15:36:57', 'eventTimestamp': '2024-08-14T15:36:57', '_index': 'stream_index'}, {'patientId': '94b273c5-f46a-45e6-b920-7706fbfa8eee', 'name': 'Kathy Webster', 'age': 63, 'heartRate': 84, 'respiratoryRate': 13, 'oxygenSaturation': 96, 'seizureDetected': False, 'seizureDuration': 0, 'seizureSeverity': 'None', 'emergencyContact': {'name': 'Stephen Castro', 'relationship': 'Child', 'phone': '842-707-4800x2626', 'email': 'raguirre@example.org'}, 'location': {'currentLocation': '0474 Blankenship Lock\\nNorth St

In [38]:
response = helpers.bulk(client, processed_jsons, max_retries=3)
print(response)

(25, [])


In [39]:
# search for all documents in the 'amit-pdf-index' index
response = client.search(index=index_name)

# extract the count of hits from the response
hits_count = response['hits']['total']['value']


# print the count of hits
print("Total Hits: ", hits_count)

Total Hits:  25


In [30]:
response = client.indices.delete(
    index = index_name
)

print(response)

{'acknowledged': True}
