Skip to content

Commit

Permalink
General refactoring, updates for adding real time monitornig & data l…
Browse files Browse the repository at this point in the history
…ogging via InfluxDB
  • Loading branch information
MarkhamLee committed Mar 29, 2024
1 parent 962578a commit 27f4978
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
20 changes: 9 additions & 11 deletions api/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ def __init__(self):
# load variables
self.load_variables()

# get MQTT clientID
self.clientID = str(uuid.uuid4())

# get MQTT client
self.client = self.mqttClient()

# Load variables
def load_variables(self):

self.userName = os.environ['MQTT_USER']
self.user_name = os.environ['MQTT_USER']
self.pwd = os.environ['MQTT_SECRET']
self.host = os.environ['MQTT_BROKER']
self.port = os.environ['MQTT_PORT']
self.port = int(os.environ['MQTT_PORT'])

def get_client_id(self):

return str(uuid.uuid4())

# Generate MQTT client
def mqttClient(self):
def mqttClient(self, client_id):

def connectionStatus(client, userdata, flags, code):

Expand All @@ -40,8 +38,8 @@ def connectionStatus(client, userdata, flags, code):
else:
logger.debug(f'connection error occured, return code: {code}, retrying...') # noqa: E501

client = mqtt.Client(self.clientID)
client.username_pw_set(username=self.userName, password=self.pwd)
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id)
client.username_pw_set(username=self.user_name, password=self.pwd)
client.on_connect = connectionStatus

code = client.connect(self.host, self.port)
Expand Down
4 changes: 2 additions & 2 deletions api/score_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def cosine_score(reference: float, sample: float) -> float:
cosd = F.cosine_similarity(reference, sample)
score = (1 - cosd.item())

logger.debug(f'Cosine distance calculated: {score}')
logger.info(f'Cosine distance calculated: {score}')

return score

Expand All @@ -35,7 +35,7 @@ def euclidean_distance(reference: float, sample: float) -> float:
# pull the float value out of the tensor object
dist = dist.item()

logger.debug(f'Euclidean distance is: {dist}')
logger.info(f'Euclidean distance is: {dist}')

return dist

Expand Down
37 changes: 31 additions & 6 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@
com_utilities = ReportingCommunication()

# get MQTT Client
mqttClient = com_utilities.mqttClient()

# get client ID
client_id = com_utilities.get_client_id()

# mqtt client
mqttClient, code = com_utilities.mqttClient(client_id)
logger.info('Communications class instantiated')

MONITORING_TOPIC = os.environ['TOPIC']
MONITORING_TOPIC = os.environ['MONITORING_TOPIC']


# endpoint for API health check
Expand Down Expand Up @@ -93,7 +98,7 @@ def embeddings():
score_type, threshold)

# send data to MQTT topic for data logging/real time monitoring
mqttClient.publish(MONITORING_TOPIC, resultjson)
send_monitoring_message(resultjson)

logger.info('response sent back to client')
return flask.Response(response=resultjson, status=200,
Expand Down Expand Up @@ -141,13 +146,19 @@ def cached():
# and builds response payload
resultjson = build_response(latency, cached_tensor, sample_tensor,
score_type, threshold)
logger.info(resultjson)

# send data to MQTT topic for data logging/real time monitoring
send_monitoring_message(resultjson)

return flask.Response(response=resultjson, status=200,
mimetype='application/json')


# method that aggregates data, prepares json response and sends the data
# back to the client
# TODO: move this and the methods below to a separate class, add field
# for the endpoint the data was received on.
def build_response(latency: float, tensor1: object, tensor2: object,
score_type: float, threshold: float) -> dict:

Expand All @@ -169,20 +180,34 @@ def build_response(latency: float, tensor1: object, tensor2: object,
# return data
results = {"match_status": status,
"score": score,
"score_type": 'cosine distance',
"score_type": score_type,
"score_threshold": threshold,
"Inferencing_latency": latency_message}
"inferencing_latency": latency_message}

resultjson = json.dumps(results)

return resultjson


def load_images(image):
# loading images
# TODO: may need to add transformations in the future
def load_images(image: object) -> object:

with Image.open(image) as photo:
photo.load()

app.logger.info('photo loaded')

return photo


# TODO: add QOS parameters and message re-send logic
def send_monitoring_message(message: dict):

try:
result = mqttClient.publish(MONITORING_TOPIC, message)
status = result[0]
logger.info(f'Monitoring message sent successfully with status {status}') # noqa: E501

except Exception as e:
logger.error(f'MQTT publishing failed with error: {e}')

0 comments on commit 27f4978

Please sign in to comment.