In [9]:
#OLLAMAPool Server Prototype Code
import os
EndPoint_Queries=os.environ.get('EndPoint_Queries')
Endpoint_Results=os.environ.get('Endpoint_Results')
EndPoint_NodeStatus=os.environ.get('EndPoint_NodeStatus')

def get_queue_name_from_connection_string(connection_string):
    # Split the connection string by ";" and find the EntityPath part
    key_value_pairs = connection_string.split(';')
    for pair in key_value_pairs:
        if pair.startswith('EntityPath='):
            # Return the part after 'EntityPath=' which is the queue name
            return pair.split('=')[1]
    return None

#Assert if the environment variables are set
if EndPoint_Queries is None:
    raise ValueError("EndPoint_Queries is not set")
if Endpoint_Results is None:
    raise ValueError("Endpoint_Results is not set")
if EndPoint_NodeStatus is None:
    raise ValueError("EndPoint_NodeStatus is not set")
print("Environment Variables are set OK")

#Get the queue names from the connection strings
QueueName_Queries = get_queue_name_from_connection_string(EndPoint_Queries) 
QueueName_Results = get_queue_name_from_connection_string(Endpoint_Results)
QueueName_NodeStatus = get_queue_name_from_connection_string(EndPoint_NodeStatus)

Environment Variables are set OK


In [2]:
#Busines Classes (Requests/Results)

import json
from typing import List
import uuid
from azure.servicebus import ServiceBusClient, ServiceBusMessage

class LLMRequest:
    def __init__(self,Model:str="",systemMessage:str="",query:str=""):
        self.UUID=str(uuid.uuid4())
        self.Model=Model
        self.systemMessage=systemMessage
        self.query=query
        self.Messages=[{'role': 'system', 'content': systemMessage},
                       {'role': 'user', 'content': query}]

    def to_json(self):
        return self.__dict__
    
    def from_json(self,json_str):
        self.__dict__=json.loads(json_str)
        
class LLMResult:
    def __init__(self,UUID:str="",result:str="",errorMsg:str="",timeDelta:str=""):
        self.UUID=UUID
        self.result=result
        self.errorMsg=errorMsg
        self.HasError=errorMsg!=""
        self.timeDelta=timeDelta

    def to_json(self):
        return self.__dict__
    
    def from_json(self,json_str):
        self.__dict__=json.loads(json_str)
    

In [16]:
#NodeStatus Class - tracks the health of the node, available models and manages communicating state
import socket
from ollama import Client

class NodeStatus():
    
    def __init__(self,Ollamahost:str,QueueName:str,ConnectionString:str):
        self.__Client__=ServiceBusClient.from_connection_string(ConnectionString)
        self.__sender__ = self.__Client__.get_queue_sender(queue_name=QueueName)       
        self.Host=socket.gethostname()
        self.Ollamahost=Ollamahost
        self.Models=[]
        self.Status="Initializing..."
        self.Message=""
        self.LastQueryTime=0
        self.Client=None

    def to_json(self):
        return {"Host":self.Host,
                "OllamaHost":self.Ollamahost,
                "Status":self.Status,
                "Message":self.Message,
                "Models":self.Models,
                "LastQueryTime":self.LastQueryTime}


    def from_json(self,json_str):
        self.__dict__=json.loads(json_str)

    def SyncStatus(self):
        try:
            message = ServiceBusMessage(json.dumps(self.to_json()))
            self.__sender__.send_messages(message)
            print("Sent status to Queue")
        except Exception as e:
            print(f"Error Sending Status to Queue: {str(e)}")        
        
    def SetStatus(self,Status:str,Message:str):
        print(f"Status: {Message}")
        self.Status=Status
        self.Message=Message
        self.SyncStatus()
        
    def SetErrorStatus(self,Message:str):
        print(f"Error: {Message}")
        self.Status="Error"
        self.Message=Message 
        self.SyncStatus()  
    
    def HasModel(self,Model:str)->bool:
        modelLatest=Model.lower()+":latest"
        return (Model in self.Models) or (modelLatest in self.Models)
    
    #Connects to OLLAMA server and gets the list of models
    def Connect(self):
        try:
            self.Client=Client(host=self.Ollamahost)
            models=self.Client.list()
            self.Models=[model["name"] for model in models["models"]]
            self.SyncStatus()
            return True
        except Exception as e:
            self.SetErrorStatus(str(e))
            self.Models=[]
            return False

#Test code
#node=NodeStatus("http://localhost:11434",QueueName_NodeStatus,EndPoint_NodeStatus)

# models=node.Connect()
# node.SetStatus("Ready","Connected to OLLAMA Server")
# # for model in models:
# #     print(model)
# node.HasModel("llama3.1")

In [25]:
#Ollama Processing Code
import datetime
from ollama import Client

#Handles running the LLMRequest and posting the result back to the results queue
class LLMRequestServer():
    
    def __init__(self,node:NodeStatus,ResultsConnectionString:str,ResultsQueueName:str):
        self.Client=Client(host=node.Ollamahost)
        self.ResultsConnectionString=ResultsConnectionString
        self.ResultsQueueName=ResultsQueueName
        self.node=node

    #Post to a service bus queue    
    def AzurePost_ServiceBus(self,json_payload):
        try:
            with ServiceBusClient.from_connection_string(self.ResultsConnectionString) as client:
                sender = client.get_queue_sender(queue_name=self.ResultsQueueName)
                message = ServiceBusMessage(json.dumps(json_payload))
                with sender:
                    sender.send_messages(message)
                    print(f"Queued message: {json_payload['UUID']}")
        except Exception as e:
            print(f"Error Sending to Queue: {str(e)}")

    def ProcessLLMRequest(self,request:LLMRequest)->LLMResult:
        try:
            
            #Download the model if it is not already downloaded
            if not self.node.HasModel(request.Model):
                self.node.SetStatus("Downloading",f"Downloading Model {request.Model}")
                self.Client.pull(request.Model)
                self.node.SetStatus("Ready",f"Model {request.Model} Downloaded")
            
            timerStart=datetime.datetime.now()
            self.node.SetStatus("Running",f"Processing{request.UUID}")    
            ret = self.Client.chat(
                model=request.Model,
                messages=request.Messages,
                stream=False)
        
            #get result/timing and post back to results queue
            timerEnd=datetime.datetime.now()
            timeDelta=timerEnd-timerStart
            result=LLMResult(UUID=request.UUID,result=ret,timeDelta=str(timeDelta))
            self.AzurePost_ServiceBus(result.to_json())
            self.node.LastQueryTime=timeDelta.total_seconds()
            self.node.SetStatus("Finsihed",f"Processing{request.UUID}")    
            
        except Exception as e:
            #Print Exeption Type and Message
            print("Exception!-------------------------------------------------")
            print(e)
            self.node.SetErrorStatus(f"Error Processing{request.UUID}: {str(e)}")
            print("Exception!-------------------------------------------------\n")
            result=LLMResult(UUID=request.UUID,errorMsg=str(e))
            self.AzurePost_ServiceBus(result.to_json())

            
        return result
    
#node=NodeStatus("http://localhost:11434",QueueName_NodeStatus,EndPoint_NodeStatus)
#llmserver=LLMRequestServer(node,Endpoint_Results,QueueName_Results)


In [27]:
#Main Message Handling Loop
import time
from typing import List
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import signal

#Startup
node=NodeStatus("http://localhost:11434",QueueName_NodeStatus,EndPoint_NodeStatus)
llmserver=LLMRequestServer(node,Endpoint_Results,QueueName_Results)
node.SetStatus("Ready","Waiting for Queries")
running=True

def handle_signal(signal_number, frame):
    global running
    print("Signal received:", signal_number)
    running = False

# Register the signal handler
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)

# Create a Service Bus client
servicebus_client = ServiceBusClient.from_connection_string(conn_str=EndPoint_Queries)

def receive_messages_from_queue(node:NodeStatus):
    # Create a receiver for the queue
    try:
        with servicebus_client.get_queue_receiver(queue_name=QueueName_Queries) as receiver:
            print("Receiving messages from the queue...")
            received_msgs = receiver.receive_messages(max_message_count=1, max_wait_time=30)
            for msg in received_msgs:
                print(f"Received message: {str(msg)}")
                receiver.complete_message(msg)
                llmRequest=LLMRequest()
                llmRequest.from_json(str(msg))
                result=llmserver.ProcessLLMRequest(llmRequest)
    except Exception as e:
        print(f"Error Receiving from Queue: {str(e)}")
        node.SetErrorStatus(f"Error Receiving from Queue: {str(e)}")
        time.sleep(5)
        return
            
#Main loop
try:
    while running:
        receive_messages_from_queue(node)
except Exception as e:
    print(f"Error in Main Loop: {str(e)}")
finally:
    node.SetStatus("Shutdown","Shut Down Complete")    

Status: Waiting for Queries
Sent status to Queue
Receiving messages from the queue...
Received message: {"UUID": "9aa8aa5e-daec-4425-8b3f-628740b6100a", "Model": "mistral-small", "systemMessage": "You are a science fiction fantasy writer writing beautifully creative descriptions from a list of character traits. \nYou have a list of character traits, be creative and expand on this (going so far as to write a short background story if you wish).\nTry to keep it within medieval fantasy RPG lore and appropriate to this context.\n", "query": "They have the followng traits:\nrace: gnome\ncomplection: ruddy\nheight: tall\nbuild: muscular\nfeature: sharp\neye: dull\nhair: curly\nclothes: elegant\nsophistication: refined\nmannerism: clumsy\nvirtue: brave\n", "Messages": [{"role": "system", "content": "You are a science fiction fantasy writer writing beautifully creative descriptions from a list of character traits. \nYou have a list of character traits, be creative and expand on this (going so 