# DEMO  
## Anomaly dection using Milvus, Presto and Kafka on a Lakehouse

![Top](../images/cover1.png "cover1")


### Install pre-requisites

The pre-requisites should have been already installed during the setup process but we can run it again to be sure. No worries if you see many messages like "Requirement already satisfied".

In [None]:
%system python3 -m pip install pymilvus confluent-kafka astropy scikit-image

### 1. Initialize Milvus with vectorized images to create a private repository.

This section loads sky pictures into a Milvus collection. The images were taken in FITS format and contain the metadata in the header of the file. The collection is built with the original file name, the vector corresponding to the image and a few trailing fields as metadata. Actually, it would have not been necessary to store the metadata in the collection because it is suppoused to be managed separately in a Iceberg / Presto / Lakehouse environment. However, for demo purposes it is handy to include it in Milvus for easy experimentation.

You need to run this section only once or if you want to re-create the full collection from scratch.

First of all, we need to extract a certificate that we will use for authentication. As it will not change during the life of the environment, it is only necessary to run this cell the first time the demo is run and never again.

In [None]:
rc = %system echo QUIT | openssl s_client -showcerts -connect watsonxdata:8443 | \
        awk '/-----BEGIN CERTIFICATE-----/ {p=1}; p; /-----END CERTIFICATE-----/ {p=0}' > ./presto.crt 

Now, a few functions are implemented. They will be called during the main workflow of the section. This cell will trigger no action. It is only definitions.

In [None]:
import glob
import numpy as np

from pymilvus import(
    Milvus,
    IndexType,
    Status,
    connections,
    FieldSchema,
    DataType,
    Collection,
    CollectionSchema,
    utility,
    MilvusClient
)

from astropy.io import fits
from skimage.transform import resize


# Connects to Milvus database using the provided host, port, user, password, 
# and server certificate path.
def connect_to_milvus() :
    
    host         = 'watsonxdata'
    port         = 19530
    user         = 'ibmlhadmin'        
    key          = 'password'
    server_pem_path = 'presto.crt'
    connections.connect(alias='default',
                       host=host,
                       port=port,
                       user=user,
                       password=key,
                       server_pem_path=server_pem_path,
                       server_name='watsonxdata',
                       secure=True)  
 

# Creates a collection for storing image embeddings. 
# It is created with an IVF_FLAT index on the "embedding" field.
# Returns he created collection object
def create_collection():
    
    utility.drop_collection("image_embeddings")
    
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="file_path", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=16600),
        FieldSchema(name="image_width", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="image_height", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="image_utz", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="object_name", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="object_ra", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="object_dec", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="object_alt", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="object_az", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="camera_focus", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="local_temp", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="local_lat", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="local_long", dtype=DataType.VARCHAR, max_length=128),
        FieldSchema(name="local_weather", dtype=DataType.VARCHAR, max_length=128)

    ]
    schema = CollectionSchema(fields, "Embedding of FITS image file")
    
    fits_coll = Collection("image_embeddings", schema)

    index_params = {
            'metric_type':'L2',
            'index_type':"IVF_FLAT",
            'params':{"nlist":2048}
    }
    fits_coll.create_index(field_name="embedding", index_params=index_params)

    fits_coll.flush()
    
    return(fits_coll)

# Load a FITS file and return the header and resized data
# of the image file passed as parameter
def load_fits_file(file_path) :
    
    with fits.open(file_path) as hdul:
        image_header = hdul[0].header
        image_data = hdul[0].data
        image_resized = resize(image_data, (166, 100), mode='reflect')
        # The resize function resizes the image data to the 
        # dimensions (166, 100) (166 pixels wide and 100 pixels high).
        # The mode='reflect' parameter handles how the image is resized by 
        # reflecting the edge values beyond the image boundaries when 
        # resizing, which prevents boundary artifacts.

    return (image_header,image_resized) 

# Generates an embedding vector from an image
# and resturns it as a NumPy array
def generate_embedding(image_data) : 
    
    embedding = image_data.flatten() # The flatten() method converts the multi-dimensional 
                                     # FITS image data (which could be 2D or 3D) into a 
                                     # one-dimensional array (vector). This process simplifies 
                                     # the data into a long list of pixel values
    embedding = embedding / np.linalg.norm(embedding)   # The vector is then normalized by 
                                                        # dividing it by its L2 norm 
                                                        # (Euclidean norm). The L2 norm is 
                                                        # the square root of the sum of the 
                                                        # squares of the values in the array. 
                                                        # This operation scales the vector so 
                                                        # that its magnitude becomes 1, turning
                                                        #  it into a unit vector.
    
    return embedding

# Inserts an embedding into the Milvus collection
def insert_embedding(fits_coll, file_path, header, embedding):

    # To simplify things, all fields are strings
    image_width = str(header['NAXIS1'])
    image_height = str(header['NAXIS2'])
    image_utz = header['UT-OBS']
    object_name = header['OBJECT']
    object_ra = str(header['RA'])
    object_dec = str(header['DEC'])
    object_alt =str( header['TELALT'])
    object_az = str(header['TELAZ'])
    camera_focus = str(header['CAMFOCUS'])
    local_temp = str(header['TELTEMP'])
    local_lat = str(header['LATITUDE'])
    local_long = str(header['LONGITUD'])
    local_weather = str(header['WEATHER'])

    # Note the multiple brackets. It is the syntax imposed by the method
    fits_coll.insert([  [file_path], 
                        [embedding],
                        [image_width], 
                        [image_height], 
                        [image_utz], 
                        [object_name], 
                        [object_ra], 
                        [object_dec], 
                        [object_alt],
                        [object_az], 
                        [camera_focus], 
                        [local_temp], 
                        [local_lat], 
                        [local_long], 
                        [local_weather]   
                      ])
    fits_coll.load()

# Takes a bunch of images in FITS format and inserts them
# in a Milvus collection, generating vectors for the graphical content
def initialize_collection():
    fits_coll = create_collection()
    file_paths = glob.glob("./images/m31*.FITS")
    for image_file in sorted(file_paths):
        print("Inserting file: ", image_file)
        image_header, image_data = load_fits_file(image_file)
        embedding_vector = generate_embedding(image_data)
        insert_embedding(fits_coll, image_file, image_header, embedding_vector)
    return fits_coll

The next cell triggers the initalization of the Milvus collection using the functions defined in the previous cell.

In [None]:
connect_to_milvus()
fits_coll = initialize_collection()
connections.disconnect(alias="default")

### 2. Initialize Metadata in watsonx.data

This section is intended to illustrate how to extract the metadata of a set of images in FITS format and store it in a relational table using the Presto and Iceberg technology contained in watsonx.data 

In this cell, a few functions are implemented. They will be called during the main workflow of the section. This cell will trigger no action. It is only definitions.

In [None]:
import base64
import prestodb
import glob

import numpy as np
import pandas as pd

from astropy.io import fits

# Connect to watsonx.data using the Presto engine.
# Note that this function is intended to run on the same system
# where watsonx.data is installed. If you want to execute the code
# directly on your laptop, the address, the port and the certificate
# must be updated 
def connect_to_watsonxdata() :

    # Connection Parameters
    userid     = 'ibmlhadmin'
    password   = 'password'
    hostname   = 'watsonxdata'
    port       = '8443'
    catalog    = 'tpch'
    schema     = 'tiny'
    certfile   = "/certs/lh-ssl-ts.crt"

    # Connect Statement
    try:
        wxdconnection = prestodb.dbapi.connect(
                host=hostname,
                port=port,
                user=userid,
                catalog=catalog,
                schema=schema,
                http_scheme='https',
                auth=prestodb.auth.BasicAuthentication(userid, password)
        )
        if (certfile != None):
            wxdconnection._http_session.verify = certfile
        print("Connection successful")
        return wxdconnection
    except Exception as e:
        print("Unable to connect to the database.")
        print(repr(e))

# Creates an empty table to hold the metadata of the FITS image files
# For illustration, experimentation and demo purposes an extra field has 
# been added (filebytes) to hold the graphical contents too. 
def create_metadata_table(wxdconnection) :

    cursor = wxdconnection.cursor()

    sql = '''
        CREATE SCHEMA IF NOT EXISTS 
            iceberg_data.fits 
        WITH (location = 's3a://iceberg-bucket/fits') 
    '''
    try:
        cursor.execute(sql)
    except Exception as err:
        print(repr(err))
    
    sql = '''
        DROP TABLE IF EXISTS 
            iceberg_data.fits."fits-images"
    '''
    try:
        cursor.execute(sql)
    except Exception as err:
        print(repr(err))
        
    sql = '''
         CREATE TABLE 
            iceberg_data.fits."fits-images" (
                filename   VARCHAR,
                filebytes  VARCHAR,
                image_width VARCHAR,
                image_height VARCHAR,
                image_utz VARCHAR,
                object_name VARCHAR,
                object_ra VARCHAR,
                object_dec VARCHAR,
                object_alt VARCHAR,
                object_az VARCHAR,
                camera_focus VARCHAR,
                local_temp VARCHAR,
                local_lat VARCHAR,
                local_long VARCHAR,
                local_weather VARCHAR
            )
    '''
    try:
        cursor.execute(sql)
    except Exception as err:
        print(repr(err))

    cursor.close()

# Extract the metadata of a FITS file and store it into the table
# created in the previous function. 
# For illustration, experimentation and demo purposes an extra field has 
# been added (filebytes) to hold the graphical contents too. Note how there
# is no embedding or vector in this field, only the contents of the file.
# However, the file has been encoded with base64 in ortder to avoid potential 
# problems with control characters, escape sequences and other things that
# may cause and strange behaviuor.
def insert_file(wxdconnection, image_file):

    with open(image_file, 'rb') as file:
        file_content = file.read()

    encoded_file_content = base64.b64encode(file_content).decode('utf-8')

    with fits.open(image_file) as hdul:
        image_header = hdul[0].header

        image_width = str(image_header['NAXIS1'])
        image_height = str(image_header['NAXIS2'])
        image_utz = image_header['UT-OBS']
        object_name = image_header['OBJECT']
        object_ra = str(image_header['RA'])
        object_dec = str(image_header['DEC'])
        object_alt =str(image_header['TELALT'])
        object_az = str(image_header['TELAZ'])
        camera_focus = str(image_header['CAMFOCUS'])
        local_temp = str(image_header['TELTEMP'])
        local_lat = str(image_header['LATITUDE'])
        local_long = str(image_header['LONGITUD'])
        local_weather = str(image_header['WEATHER'])

    cursor = wxdconnection.cursor()

    # I know this is a crime
    sql = f'''
        INSERT INTO iceberg_data.fits."fits-images" (
            filename, 
            filebytes,
            image_width ,
            image_height ,
            image_utz ,
            object_name ,
            object_ra ,
            object_dec ,
            object_alt ,
            object_az ,
            camera_focus ,
            local_temp ,
            local_lat ,
            local_long ,
            local_weather 
        )
        VALUES ( 
            '{image_file}',
            '{encoded_file_content}',
            '{image_width}' ,
            '{image_height} ',
            '{image_utz}' ,
            '{object_name}' ,
            '{object_ra} ',
            '{object_dec}' ,
            '{object_alt}' ,
            '{object_az} ',
            '{camera_focus}' ,
            '{local_temp} ',
            '{local_lat}' ,
            '{local_long}' ,
            '{local_weather}'                     
        )
    '''  

    try:
        cursor.execute(sql)
        wxdconnection.commit() 
    except Exception as err:
        print(f"Error executing SQL: {repr(err)}")
    finally:
        cursor.close()  

The next cell triggers the metadata extraction of the image files that build up the private image repository. This metadata will be inserted intoa table in watsonx.data using the Presto engine. 
As in the previous section, this initialization must be run once. A re-creation from scratch can be done at any time, but it is not necessary.

In [None]:
wxdconnection = connect_to_watsonxdata()

create_metadata_table(wxdconnection)

file_paths = glob.glob("./images/m31*.FITS")
for image_file in sorted(file_paths):
        print("Inserting file: ", image_file)
        insert_file(wxdconnection,image_file)

### 3. Generate picture events

This section implements the role of the event generator described in the demo documentation. Essentially, it:

- reads pictures captured by the astronomical camera (FITS files)
- extracts the metadata
- composes an event consisting of:
  - the FITS file encoded as base64 to protect the content
  - the metadata
- optionally, creates a Kafka topic from scratch matching the event
- sends the event as a message through the topic

The next cell implements a few functions. They will be called during the main workflow of the section. This cell will trigger no action. It is only definitions.

In [None]:
import json
import time
import base64

from astropy.io import fits
from confluent_kafka import Producer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic

BROKER = 'watsonxdata:29092' 
topic = 'fits-images'  

# Creates a new Kafka topic with the specified name, number of partitions
# and replication factor. It first checks if the topic already exists by listing 
# all topics and checking if the given topic name is present. If the topic exists, 
# it deletes the topic and waits for it to be completely deleted before creating a new one.
def create_kafka_topic(topic_name, num_partitions=1, replication_factor=1):

    admin_client = AdminClient({'bootstrap.servers': BROKER})

    existing_topics = admin_client.list_topics(timeout=10).topics

    if topic_name in existing_topics:
        delete_futures = admin_client.delete_topics([topic_name], operation_timeout=30)
        for topic, future in delete_futures.items():
            try:
                future.result()  
                print(f"Topic '{topic}' has been marked for deletion.")
            except Exception as e:
                print(f"Failed to delete topic '{topic}': {e}")

        
        start_time = time.time()
        timeout = 10  # I will wait 10 seconds
        check_interval = 5 # I will sleep 5 seconds until waiting again

        while time.time() - start_time < timeout:
            
            metadata = admin_client.list_topics(timeout=timeout)
            
            if topic_name not in metadata.topics:
                print(f"Topic '{topic_name}' has been successfully deleted.")
                return
            else:
                print(f"Topic '{topic_name}' is still being deleted. Checking again in {check_interval} seconds...")

            time.sleep(check_interval)

    topic_list = [NewTopic(topic=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)]

    fs = admin_client.create_topics(new_topics=topic_list)

    for topic, f in fs.items():
        try:
            f.result()  
            print(f"Created Kafka topic: {topic}")
        except Exception as e:
            print(f"Failed to create topic {topic}: {e}")

# Creates a Kafka producer using the given configuration
def create_kafka_producer():
    conf = {
        'bootstrap.servers': BROKER,
        'client.id': 'fits_image_producer',
    }
    return Producer(conf)

# Reads a FITS image file and returns its header plus the full content as a base64-encoded string.
def read_fits_image_as_base64(fits_image_path) :

    with open(fits_image_path, 'rb') as file :
        file_content = file.read()
    encoded_file_content = base64.b64encode(file_content).decode('utf-8')
    with fits.open(fits_image_path) as hdul:
        image_header = hdul[0].header

    return image_header, encoded_file_content

# Send a FITS image file and its metadata to a Kafka topic
def send_file_image_to_kafka(producer, topic, file, image_base64, header):

    image_width = str(header['NAXIS1'])
    image_height = str(header['NAXIS2'])
    image_utz = header['UT-OBS']
    object_name = header['OBJECT']
    object_ra = str(header['RA'])
    object_dec = str(header['DEC'])
    object_alt =str( header['TELALT'])
    object_az = str(header['TELAZ'])
    camera_focus = str(header['CAMFOCUS'])
    local_temp = str(header['TELTEMP'])
    local_lat = str(header['LATITUDE'])
    local_long = str(header['LONGITUD'])
    local_weather = str(header['WEATHER'])

    event = {
        'file':         file,
        'image_width':  image_width,
        'image_height': image_height,
        'image_utz':    image_utz,
        'object_name':  object_name,
        'object_ra':    object_ra,
        'object_dec':   object_dec,
        'object_alt':   object_alt,
        'object_az':    object_az,
        'camera_focus': camera_focus,
        'local_temp':   local_temp,
        'local_lat':    local_lat,
        'local_long':   local_long,
        'local_weather':local_weather,   
        'image_data':   image_base64        
    }
    
    producer.produce(topic, key="fits_image", value=json.dumps(event), callback=delivery_report)
    producer.flush()

# This function is called by the Kafka producer when it has received a 
# delivery report from the broker. It is used to print out information
# about the message delivery status.
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

Run this only the first time you run the demo or if you want to recreate the topic from scratch. If you run it, you may need to register the topic in watsonx.data again (upload the json file) if you want to access the virtual kafka table.

In [None]:
create_kafka_topic(topic)

This cell runs the workflow of the event generation. It emulates the capture of two pictures and send them as Kafka messages. It can be run many times and you can change the files to be sent (`file_image_path`)

In [None]:

producer = create_kafka_producer()

# This is a normal picture
file_image_path = './images/m31.fits'  
image_header, image_base64 = read_fits_image_as_base64(file_image_path)
send_file_image_to_kafka(producer, topic, file_image_path, image_base64, image_header)

print(f'File "{file_image_path}" sent to Kafka topic "{topic}".')

# This is an anomaly
file_image_path = './images/m31dot.fits'  
image_header, image_base64 = read_fits_image_as_base64(file_image_path)
send_file_image_to_kafka(producer, topic, file_image_path, image_base64, image_header)

print(f'File "{file_image_path}" sent to Kafka topic "{topic}".')

### 4. Event reception and archival

In [None]:
import json
import time
import base64
import prestodb
import numpy as np
from astropy.io import fits
from confluent_kafka import Consumer, KafkaError

def create_kafka_consumer():
    conf = {
        'bootstrap.servers': 'watsonxdata:29092',  
        'group.id': 'fits_image_group',
        'auto.offset.reset': 'earliest'
    }
    return Consumer(conf)

def save_base64_fits_image(base64_image, output_fits_path):
    
    image_bytes = base64.b64decode(base64_image)

    with open(output_fits_path, 'wb') as f:
        f.write(image_bytes)

def connect_to_watsonxdata() :

    userid     = 'ibmlhadmin'
    password   = 'password'
    hostname   = 'watsonxdata'
    port       = '8443'
    catalog    = 'tpch'
    schema     = 'tiny'
    certfile   = "/certs/lh-ssl-ts.crt"

    try:
        wxdconnection = prestodb.dbapi.connect(
                host=hostname,
                port=port,
                user=userid,
                catalog=catalog,
                schema=schema,
                http_scheme='https',
                auth=prestodb.auth.BasicAuthentication(userid, password)
        )
        if (certfile != None):
            wxdconnection._http_session.verify = certfile
        print("Connection successful")
        return wxdconnection
    except Exception as e:
        print("Unable to connect to the database.")
        print(repr(e))


def insert_into_watsonxdata(wxdconnection,
                            file,
                            image_width  ,
                            image_height ,
                            image_utz    ,
                            object_name  ,
                            object_ra    ,
                            object_dec   ,
                            object_alt   ,
                            object_az    ,
                            camera_focus ,
                            local_temp   ,
                            local_lat    ,
                            local_long   ,
                            local_weather,  
                            image_base64
                            ) :

    cursor = wxdconnection.cursor()

    sql = '''
        CREATE SCHEMA IF NOT EXISTS 
            iceberg_data.angel 
        WITH (location = 's3a://iceberg-bucket/angel') 
    '''
    try:
        cursor.execute(sql)
    except Exception as err:
        print(repr(err))

    sql = '''
        drop table if exists iceberg_data.angel."fits-images-from-message"
    '''
    try:
        cursor.execute(sql)
    except Exception as err:
        print(repr(err))
    
    sql = '''
        create table iceberg_data.angel."fits-images-from-message" (     
            file VARCHAR,
            image_width VARCHAR,
            image_height VARCHAR,
            image_utz VARCHAR,
            object_name VARCHAR,
            object_ra VARCHAR,
            object_dec VARCHAR,
            object_alt VARCHAR,
            object_az VARCHAR,
            camera_focus VARCHAR,
            local_temp VARCHAR,
            local_lat VARCHAR,
            local_long VARCHAR,
            local_weather VARCHAR,
            image_data VARCHAR
        )
    '''
    try:
        cursor.execute(sql)
    except Exception as err:
        print(repr(err))


    # I know this is a crime
    sql = f'''
        INSERT INTO iceberg_data.angel."fits-images-from-message" (
            file             , 
            image_width      ,
            image_height     ,
            image_utz        ,
            object_name      ,
            object_ra        ,
            object_dec       ,
            object_alt       ,
            object_az        ,
            camera_focus     ,
            local_temp       ,
            local_lat        ,
            local_long       ,
            local_weather    ,
            image_data
        )
        VALUES ( 
            '{file}'          ,
            '{image_width}'   ,
            '{image_height}'  ,
            '{image_utz}'     ,
            '{object_name}'   ,
            '{object_ra}'     ,
            '{object_dec}'    ,
            '{object_alt}'    ,
            '{object_az}'     ,
            '{camera_focus}'  ,
            '{local_temp}'    ,
            '{local_lat}'     ,
            '{local_long}'    ,
            '{local_weather}' ,
            '{image_base64}'                       
        )
    
    '''  
    try:
        cursor.execute(sql)
        wxdconnection.commit()
    except Exception as err:
        print(f"Error executing SQL: {repr(err)}")
    finally:
        cursor.close() 

    print(f'Inserted: {file}')

Action

In [None]:

topic = 'fits-images'   


consumer = create_kafka_consumer()
consumer.subscribe([topic])


duration = 30
start_time = time.time()

print(f'Waiting "{duration}" seconds for messages on topic "{topic}"...')

try:
    while time.time() - start_time < duration:
        msg = consumer.poll(1.0)  

        if msg is None:
            continue 

        if msg.error():           
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition for topic {msg.topic()}, partition {msg.partition()}")
            elif msg.error():
                print(f"Error occurred: {msg.error()}")
                break

        else:
            event = json.loads(msg.value().decode('utf-8'))
            
            file         = event.get('file')
            image_width  = event.get('image_width')
            image_height = event.get('image_height')
            image_utz    = event.get('image_utz')
            object_name  = event.get('object_name')
            object_ra    = event.get('object_ra')
            object_dec   = event.get('object_dec')
            object_alt   = event.get('object_alt')
            object_az    = event.get('object_az')
            camera_focus = event.get('camera_focus')
            local_temp   = event.get('local_temp')
            local_lat    = event.get('local_lat')
            local_long   = event.get('local_long')
            local_weather= event.get('local_weather')
            image_base64 = event.get('image_data')
            
            print(f'Received message: {file}')

            wxdconnection = connect_to_watsonxdata()
            insert_into_watsonxdata(wxdconnection, 
                                    file,
                                    image_width  ,
                                    image_height ,
                                    image_utz    ,
                                    object_name  ,
                                    object_ra    ,
                                    object_dec   ,
                                    object_alt   ,
                                    object_az    ,
                                    camera_focus ,
                                    local_temp   ,
                                    local_lat    ,
                                    local_long   ,
                                    local_weather,  
                                    image_base64)

except KeyboardInterrupt:
    print("Shutting down consumer...")

finally:
    consumer.close()


### 5. Read image from watsonx.data and search Milvus

Preliminary stuff

In [None]:
import io
import base64
import prestodb

import numpy as np
import pandas as pd

from astropy.io import fits
from skimage.transform import resize

from pymilvus import(
    Milvus,
    IndexType,
    Status,
    connections,
    FieldSchema,
    DataType,
    Collection,
    CollectionSchema,
    utility,
    MilvusClient
)


def connect_to_milvus() :

    # This is for Baklarz's image
    host         = 'watsonxdata'
    port         = 19530
    user         = 'ibmlhadmin'
    key          = 'password'
    server_pem_path = 'presto.crt'
    connections.connect(alias='default',
                       host=host,
                       port=port,
                       user=user,
                       password=key,
                       server_pem_path=server_pem_path,
                       server_name='watsonxdata',
                       secure=True)  

    # This is for SaaS
    # host         = 'acb3dba1-2c32-4c99-9833-6d060a2e32b4.cqh2jh8d00ae3kp0jmpg.lakehouse.appdomain.cloud'
    # port         = 30969
    # user         = 'ibmlhapikey'
    # key          = 'Xndw8q4VKrLoqM2SB_zwbEuqfyH-9d2zwCyaKFIsEElF'
    # connections.connect(         
    #     host=host, 
    #     port=port,
    #     user=user,
    #     password=key,
    #     secure=True,
    # )
    
    print(f"\nList connections:")
    print(connections.list_connections())

    
def load_fits_file(file_path) :
    
    with fits.open(file_path) as hdul:
   
        image_data = hdul[0].data
        image_resized = resize(image_data, (166, 100), mode='reflect')

    return (image_resized ) 




def generate_embedding(image_data) : 
    
    embedding = image_data.flatten()
    embedding = embedding / np.linalg.norm(embedding)  # Normalizing the embedding
    
    return embedding
    
def search_image(search_collection, kafka_data) :

    file_contents = base64.b64decode(kafka_data)
    fits_file = io.BytesIO(file_contents)
    with fits.open(fits_file) as hdul:
        # hdul.info()
        image_data = hdul[0].data

    image_resized = resize(image_data, (166, 100), mode='reflect')

    embedding_vector = generate_embedding(image_resized)
    query_embedding = [embedding_vector]
    search_params = {"metric_type": "L2", "params": {"nprobe": 1000}}
    search_collection.load()
    results = search_collection.search(
        data=query_embedding,
        anns_field="embedding",
        param=search_params,
        limit=3,
        output_fields=["id", "file_path"],  
        expr=None
    )
    # Here, metric_type is set to "L2", which refers to the Euclidean distance metric. 
    # This means that the search will calculate the L2 distance (or squared 
    # Euclidean distance) between the query vector and the stored vectors in the collection. 

    for result in results[0]:
        print(f"Image File: {result.file_path}, Difference: {result.distance:.2%}")


def connect_to_watsonxdata() :

    import warnings

    # Suppress the specific UserWarning from pandas
    warnings.filterwarnings("ignore", message="pandas only supports SQLAlchemy connectable")

    # Connection Parameters
    userid     = 'ibmlhadmin'
    password   = 'password'
    hostname   = 'watsonxdata'
    port       = '8443'
    catalog    = 'tpch'
    schema     = 'tiny'
    certfile   = "/certs/lh-ssl-ts.crt"

    # Connect Statement
    try:
        wxdconnection = prestodb.dbapi.connect(
                host=hostname,
                port=port,
                user=userid,
                catalog=catalog,
                schema=schema,
                http_scheme='https',
                auth=prestodb.auth.BasicAuthentication(userid, password)
        )
        if (certfile != None):
            wxdconnection._http_session.verify = certfile
        cursor = wxdconnection.cursor()
        print("Connection successful")
        return wxdconnection
    except Exception as e:
        print("Unable to connect to the database.")
        print(repr(e))

def get_images_from_watsonxdata(wxdconnection) :

    sql = '''
    SELECT json_extract_scalar(_message, '$.file') AS "file",
           json_extract_scalar(_message, '$.image_data') AS "image_data"
    FROM "kafka"."default"."fits-images" 
    LIMIT 100 
    '''
    try:
        df = pd.read_sql(sql,wxdconnection)
        if (len(df) == 0):
            print("No rows found.")
    except Exception as e:
        print(repr(e))
    
    return df



Action

In [None]:
connect_to_milvus()
fits_coll = Collection("image_embeddings")

wxdconnection = connect_to_watsonxdata()
data_images = get_images_from_watsonxdata(wxdconnection)

for index, row in data_images.iterrows():
    print("\n\n")
    print("Comparing the file:", row['file'])
    search_image(fits_coll,row['image_data'])
    print("\n\n")


connections.disconnect(alias="default")