Permalink
executable file 228 lines (181 sloc) 8.76 KB
#!/usr/bin/env python
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import random
import time
import sys
import iothub_client
from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue
from iothub_client_args import get_iothub_opt, OptionError
# HTTP options
# Because it can poll "after 9 seconds" polls will happen effectively
# at ~10 seconds.
# Note that for scalabilty, the default value of minimumPollingTime
# is 25 minutes. For more information, see:
# https://azure.microsoft.com/documentation/articles/iot-hub-devguide/#messaging
TIMEOUT = 241000
MINIMUM_POLLING_TIME = 9
# messageTimeout - the maximum time in milliseconds until a message times out.
# The timeout period starts at IoTHubClient.send_event_async.
# By default, messages do not expire.
MESSAGE_TIMEOUT = 10000
RECEIVE_CONTEXT = 0
AVG_WIND_SPEED = 10.0
MIN_TEMPERATURE = 20.0
MIN_HUMIDITY = 60.0
MESSAGE_COUNT = 5
RECEIVED_COUNT = 0
TWIN_CONTEXT = 0
METHOD_CONTEXT = 0
# global counters
RECEIVE_CALLBACKS = 0
SEND_CALLBACKS = 0
BLOB_CALLBACKS = 0
TWIN_CALLBACKS = 0
SEND_REPORTED_STATE_CALLBACKS = 0
METHOD_CALLBACKS = 0
# chose HTTP, AMQP or MQTT as transport protocol
PROTOCOL = IoTHubTransportProvider.MQTT
# String containing Hostname, Device Id & Device Key in the format:
# "HostName=<host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>"
CONNECTION_STRING = "[Device Connection String]"
MSG_TXT = "{\"deviceId\": \"myPythonDevice\",\"windSpeed\": %.2f,\"temperature\": %.2f,\"humidity\": %.2f}"
def send_confirmation_callback(message, result, user_context):
global SEND_CALLBACKS
print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
map_properties = message.properties()
key_value_pair = map_properties.get_internals()
print ( " Properties: %s" % key_value_pair )
SEND_CALLBACKS += 1
print ( " Total calls confirmed: %d" % SEND_CALLBACKS )
def receive_message_callback(message, counter):
global RECEIVE_CALLBACKS
message_buffer = message.get_bytearray()
size = len(message_buffer)
print ( "Received Message [%d]:" % counter )
print ( " Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) )
map_properties = message.properties()
key_value_pair = map_properties.get_internals()
print ( " Properties: %s" % key_value_pair )
counter += 1
RECEIVE_CALLBACKS += 1
print ( " Total calls received: %d" % RECEIVE_CALLBACKS )
return IoTHubMessageDispositionResult.ACCEPTED
def device_twin_callback(update_state, payload, user_context):
global TWIN_CALLBACKS
print ( "\nTwin callback called with:\nupdateStatus = %s\npayload = %s\ncontext = %s" % (update_state, payload, user_context) )
TWIN_CALLBACKS += 1
print ( "Total calls confirmed: %d\n" % TWIN_CALLBACKS )
def send_reported_state_callback(status_code, user_context):
global SEND_REPORTED_STATE_CALLBACKS
print ( "Confirmation for reported state received with:\nstatus_code = [%d]\ncontext = %s" % (status_code, user_context) )
SEND_REPORTED_STATE_CALLBACKS += 1
print ( " Total calls confirmed: %d" % SEND_REPORTED_STATE_CALLBACKS )
def device_method_callback(method_name, payload, user_context):
global METHOD_CALLBACKS
print ( "\nMethod callback called with:\nmethodName = %s\npayload = %s\ncontext = %s" % (method_name, payload, user_context) )
METHOD_CALLBACKS += 1
print ( "Total calls confirmed: %d\n" % METHOD_CALLBACKS )
device_method_return_value = DeviceMethodReturnValue()
device_method_return_value.response = "{ \"Response\": \"This is the response from the device\" }"
device_method_return_value.status = 200
return device_method_return_value
def blob_upload_conf_callback(result, user_context):
global BLOB_CALLBACKS
print ( "Blob upload confirmation[%d] received for message with result = %s" % (user_context, result) )
BLOB_CALLBACKS += 1
print ( " Total calls confirmed: %d" % BLOB_CALLBACKS )
class HubManager(object):
def __init__(
self,
connection_string,
protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubClient(connection_string, protocol)
if protocol == IoTHubTransportProvider.HTTP:
self.client.set_option("timeout", TIMEOUT)
self.client.set_option("MinimumPollingTime", MINIMUM_POLLING_TIME)
# set the time until a message times out
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# some embedded platforms need certificate information
# self.set_certificates()
self.client.set_message_callback(receive_message_callback, RECEIVE_CONTEXT)
self.client.set_device_twin_callback(device_twin_callback, TWIN_CONTEXT)
self.client.set_device_method_callback(device_method_callback, METHOD_CONTEXT)
def set_certificates(self):
from iothub_client_cert import CERTIFICATES
try:
self.client.set_option("TrustedCerts", CERTIFICATES)
print ( "set_option TrustedCerts successful" )
except IoTHubClientError as iothub_client_error:
print ( "set_option TrustedCerts failed (%s)" % iothub_client_error )
def send_event(self, event, properties, send_context):
if not isinstance(event, IoTHubMessage):
event = IoTHubMessage(bytearray(event, 'utf8'))
if len(properties) > 0:
prop_map = event.properties()
for key in properties:
prop_map.add_or_update(key, properties[key])
self.client.send_event_async(
event, send_confirmation_callback, send_context)
def send_reported_state(self, reported_state, size, user_context):
self.client.send_reported_state(
reported_state, size,
send_reported_state_callback, user_context)
def upload_to_blob(self, destinationfilename, source, size, usercontext):
self.client.upload_blob_async(
destinationfilename, source, size,
blob_upload_conf_callback, usercontext)
def main(connection_string, protocol):
try:
print ( "\nPython %s\n" % sys.version )
print ( "IoT Hub Client for Python" )
hub_manager = HubManager(connection_string, protocol)
print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
filename = "hello_python_blob.txt"
content = "Hello World from Python Blob APi"
hub_manager.upload_to_blob(filename, content, len(content), 1001)
reported_state = "{\"newState\":\"standBy\"}"
hub_manager.send_reported_state(reported_state, len(reported_state), 1002)
while True:
# send a few messages every minute
print ( "IoTHubClient sending %d messages" % MESSAGE_COUNT )
for message_counter in range(0, MESSAGE_COUNT):
temperature = MIN_TEMPERATURE + (random.random() * 10)
humidity = MIN_HUMIDITY + (random.random() * 20)
msg_txt_formatted = MSG_TXT % (
AVG_WIND_SPEED + (random.random() * 4 + 2),
temperature,
humidity)
msg_properties = {
"temperatureAlert": 'true' if temperature > 28 else 'false'
}
hub_manager.send_event(msg_txt_formatted, msg_properties, message_counter)
print ( "IoTHubClient.send_event_async accepted message [%d] for transmission to IoT Hub." % message_counter )
# Wait for Commands or exit
print ( "IoTHubClient waiting for commands, press Ctrl-C to exit" )
status_counter = 0
while status_counter < 6:
status = hub_manager.client.get_send_status()
print ( "Send status: %s" % status )
time.sleep(10)
status_counter += 1
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubClient sample stopped" )
def usage():
print ( "Usage: iothub_client_sample_class.py -p <protocol> -c <connectionstring>" )
print ( " protocol : <amqp, http, mqtt>" )
print ( " connectionstring: <HostName=<host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>>" )
if __name__ == '__main__':
try:
(CONNECTION_STRING, PROTOCOL) = get_iothub_opt(sys.argv[1:], CONNECTION_STRING)
except OptionError as option_error:
print ( option_error )
usage()
sys.exit(1)
main(CONNECTION_STRING, PROTOCOL)