-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.py
109 lines (87 loc) · 3.29 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import sys
import threading
import signal
import json
import uuid
import traceback
from joblib import load
from azure.iot.device import IoTHubModuleClient, Message
# globals
RECEIVED_MESSAGES = 0
def create_client():
def load_model():
clf = load("models/temp_model.pkl")
return clf
def get_prediction_payload(telemetry_sample):
temp = telemetry_sample['temperature']
humid = telemetry_sample['humidity']
timestamp = telemetry_sample['timestamp']
prediction = clf.predict([[temp, humid]])
log_proba = clf.predict_proba([[temp, humid]])
return {"timestamp": timestamp,
"temperature": temp,
"humidity": humid,
"prediction": int(prediction[0]),
"probability": float(max(log_proba[0]))}
# define function for handling received messages
def receive_message_handler(message):
# NOTE: This function only handles messages sent to "telemetryInput".
# Messages sent to other inputs or to the default will be discarded.
global RECEIVED_MESSAGES
if message.input_name == "telemetryInput":
RECEIVED_MESSAGES += 1
print("Message received on telemetryInput. Beginning prediction...")
try:
telemetry_sample = json.loads(message.data)
prediction_str = json.dumps(get_prediction_payload(telemetry_sample))
out_msg = Message(prediction_str)
out_msg.message_id = uuid.uuid4()
except Exception as e:
print(e)
print(traceback.format_exc())
print( " Data: <<{}>>".format(out_msg.data) )
print( " Properties: {}".format(out_msg.custom_properties))
print( " Total calls received: {}".format(RECEIVED_MESSAGES))
print("Forwarding message to IoT Hub")
client.send_message_to_output(out_msg, "upstream_sink")
print("Message successfully forwarded")
else:
print("Message received on unknown input: {}".format(message.input_name))
print("Starting module. Creating client.")
client = IoTHubModuleClient.create_from_edge_environment()
print("Loading ML model")
clf = load_model()
print("Init message reception > ")
try:
# Set handler
client.on_message_received = receive_message_handler
except:
# Cleanup
client.shutdown()
return client
def main():
print ( "\nPython {}\n".format(sys.version) )
print ( "IoT Hub Client for Python" )
# Event indicating sample stop
stop_event = threading.Event()
# Define a signal handler that will indicate Edge termination of the Module
def module_termination_handler(signal, frame):
print ("IoTHubClient sample stopped")
stop_event.set()
# Attach the signal handler
signal.signal(signal.SIGTERM, module_termination_handler)
# Create the client
client = create_client()
try:
# This will be triggered by Edge termination signal
stop_event.wait()
except Exception as e:
print("Unexpected error %s " % e)
raise
finally:
# Graceful exit
print("Shutting down client")
client.shutdown()
if __name__ == '__main__':
main()