/
clearblade_adapter_library.py
248 lines (199 loc) · 12.8 KB
/
clearblade_adapter_library.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import os, sys, argparse, string, logging, json, random
from clearblade.ClearBladeCore import System, Query, cbLogs
class AdapterLibrary:
DEFAULT_LOG_LEVEL = "info"
DEFAULT_PLATFORM_URL = "http://localhost:9000"
DEFAULT_MESSAGING_URL = "localhost:1883"
DEFAULT_ADAPTER_CONFIG_COLLECTION_NAME = "adapter_config"
SYSTEM_KEY_ARG_KEY = "CB_SYSTEM_KEY"
SYSTEM_SECRET_ARG_KEY = "CB_SYSTEM_SECRET"
DEVICE_NAME_ARG_KEY = "device_name"
ACTIVE_KEY_ARG_KEY = "active_key"
SERVICE_ACCOUNT_ARG_KEY = "CB_SERVICE_ACCOUNT"
SERVICE_ACCOUNT_TOKEN_ARG_KEY = "CB_SERVICE_ACCOUNT_TOKEN"
PLATFORM_URL_ARG_KEY = "platform_URL"
MESSAGING_URL_ARG_KEY = "messaging_URL"
ADAPTER_CONFIG_COLLECTION_NAME_ARG_KEY = "adapter_config_collection_name"
LOG_LEVEL_ARG_KEY = "log_level"
def __init__(self, adapter_name):
cbLogs.info("Initializing AdapterLibrary with adapter name: " + adapter_name)
self.adapter_name = adapter_name
self._args = {}
self._cb_system = None
self._device_client = None
self._sub_topic = None
self._cb_message_handler = None
def parse_arguments(self):
cbLogs.info("AdapterLibrary - parse_arguments - parsing environment variables and flags")
self.__parse_env_variables()
self.__parse_flags()
self._args[self.LOG_LEVEL_ARG_KEY] = self._args[self.LOG_LEVEL_ARG_KEY].upper()
logging.basicConfig(level=self._args[self.LOG_LEVEL_ARG_KEY])
if self._args[self.LOG_LEVEL_ARG_KEY] != "DEBUG":
cbLogs.DEBUG = False
cbLogs.MQTT_DEBUG = False
cbLogs.info("AdapterLibrary - parse_arguments - parsed adapter arguments: " + str(self._args))
cbLogs.info("AdapterLibrary - parse_arguments - Verifying required adapter arguments")
if self.SYSTEM_KEY_ARG_KEY not in self._args:
cbLogs.error("System Key is required, can be supplied with --systemKey flag or " + self.SYSTEM_KEY_ARG_KEY + " environment variable")
exit(-1)
if self.SYSTEM_SECRET_ARG_KEY not in self._args:
cbLogs.error("System Secret is required, can be supplied with --systemSecret flag or " + self.SYSTEM_SECRET_ARG_KEY + " environment variable")
exit(-1)
if self.ACTIVE_KEY_ARG_KEY not in self._args and self.SERVICE_ACCOUNT_ARG_KEY not in self._args:
cbLogs.error("Device Password is required when not using a Service Account, can be supplied with --password flag")
exit(-1)
if self.SERVICE_ACCOUNT_ARG_KEY in self._args and self.SERVICE_ACCOUNT_TOKEN_ARG_KEY not in self._args:
cbLogs.error("Service Account Token is required when a Service Account is specified, this should have automatically been supplied. Check for typos then try again")
exit(-1)
cbLogs.info("AdapterLibrary - parse_arguments - Adapter arguments parsed and verified!")
def initialize_clearblade(self):
cbLogs.info("AdapterLibrary - initialize_clearblade - initializing with ClearBlade")
self._cb_system = System(self._args[self.SYSTEM_KEY_ARG_KEY], self._args[self.SYSTEM_SECRET_ARG_KEY], self._args[self.PLATFORM_URL_ARG_KEY])
if self.SERVICE_ACCOUNT_ARG_KEY in self._args:
self.__auth_with_service_account()
else:
self.__auth_with_device()
try:
return self.__fetch_adapter_config()
except:
return
def connect_MQTT(self, topic="", cb_message_handler=None):
cbLogs.info("AdapterLibrary - connect_MQTT - Initializing the ClearBlade MQTT message broker")
self._cb_message_handler = cb_message_handler
self._cb_mqtt = self._cb_system.Messaging(self._device_client, client_id=self.adapter_name + "-" + str(random.randint(0, 10000)))
self._cb_mqtt.on_connect = self.__on_MQTT_connect
self._cb_mqtt.on_disconnect = self.__on_MQTT_disconnect
if topic != "":
self._cb_mqtt.on_subscribe = self.__on_MQTT_subscribe
self._cb_mqtt.on_message = self.__on_MQTT_message_received
self._sub_topic = topic
self._cb_mqtt.connect()
def publish(self, topic, message):
cbLogs.info("AdapterLibrary - publish - Publishing MQTT message on topic " + topic)
self._cb_mqtt.publish(topic, message)
def disconnect_MQTT(self):
cbLogs.info("AdapterLibrary - disconnect_MQTT - Disconnecting from ClearBlade MQTT message broker")
self._cb_mqtt.disconnect()
def __auth_with_service_account(self):
cbLogs.info("AdapterLibrary - __auth_with_service_account - Authenticating as service account")
self._device_client = self._cb_system.ServiceUser(self._args[self.SERVICE_ACCOUNT_ARG_KEY], self._args[self.SERVICE_ACCOUNT_TOKEN_ARG_KEY])
def __auth_with_device(self):
cbLogs.info("AdapterLibrary - __auth_with_device - Authenticating as device")
self._device_client = self._cb_system.Device(self._args[self.DEVICE_NAME_ARG_KEY], self._args[self.ACTIVE_KEY_ARG_KEY])
def __fetch_adapter_config(self):
cbLogs.info("AdapterLibrary - __fetch_adapter_config - Retrieving adapter config")
adapter_config = {"topic_root": self.adapter_name, "adapter_settings": ""}
collection = self._cb_system.Collection(self._device_client, collectionName=self._args[self.ADAPTER_CONFIG_COLLECTION_NAME_ARG_KEY])
query = Query()
query.equalTo("adapter_name", self.adapter_name)
rows = collection.getItems(query)
if len(rows) == 1:
if rows[0]["topic_root"] != "":
adapter_config["topic_root"] = str(rows[0]["topic_root"])
if rows[0]["adapter_settings"] != "":
raw_json = json.loads(str(rows[0]["adapter_settings"]))
adapter_config["adapter_settings"] = self.__byteify(raw_json)
else:
cbLogs.warn("No adapter config found for adapter name " + self.adapter_name + ". Using defaults")
cbLogs.info("AdapterLibrary - __fetch_adapter_config - Using adapter config: " + str(adapter_config))
return adapter_config
def __on_MQTT_connect(self, client, userdata, flags, rc):
cbLogs.info("AdapterLibrary - __on_MQTT_connect - MQTT successfully connected!")
if self._sub_topic != None:
self._cb_mqtt.subscribe(self._sub_topic)
def __on_MQTT_disconnect(self, client, userdata, rc):
cbLogs.info("AdapterLibrary - __on_MQTT_disconnect - MQTT disconnected with rc " + str(rc))
if self._sub_topic != None and rc == 1:
cbLogs.warn("AdapterLibrary - __on_MQTT_disconnect - Verify that your service account has permission to subscribe to the topic: " + self._sub_topic)
def __on_MQTT_subscribe(self, client, userdata, mid, granted_qos):
cbLogs.info("AdapterLibrary - __on_MQTT_subscribe - MQTT successfully subcribed to topic " + self._sub_topic)
def __on_MQTT_message_received(self, client, userdata, message):
cbLogs.info("AdapterLibrary - __on_MQTT_message_received - MQTT message received on topic " + message.topic)
if self._cb_message_handler != None:
cbLogs.info("calling message handler")
self._cb_message_handler(client, message)
def __parse_env_variables(self):
"""Parse environment variables"""
env = os.environ
possible_vars = [self.SYSTEM_KEY_ARG_KEY, self.SYSTEM_SECRET_ARG_KEY, self.SERVICE_ACCOUNT_ARG_KEY, self.SERVICE_ACCOUNT_TOKEN_ARG_KEY]
for var in possible_vars:
if var in env:
cbLogs.info("Setting adapter arguments from environment variable: " + var + ": " + str(env[var]))
self._args[var] = env[var]
def __parse_flags(self):
"""Parse the command line arguments"""
parser = argparse.ArgumentParser(description='ClearBlade Adapter')
parser.add_argument('--systemKey', dest=self.SYSTEM_KEY_ARG_KEY, help='The System Key of the ClearBlade \
Plaform "System" the adapter will connect to.')
parser.add_argument('--systemSecret', dest=self.SYSTEM_SECRET_ARG_KEY, help='The System Secret of the \
ClearBlade Plaform "System" the adapter will connect to.')
parser.add_argument('--deviceName', dest=self.DEVICE_NAME_ARG_KEY, default=self.adapter_name, help='The name of the device that will be used for device \
authentication against the ClearBlade Platform or Edge, defined \
within the devices table of the ClearBlade platform. The default is ' + self.adapter_name)
parser.add_argument('--password', dest=self.ACTIVE_KEY_ARG_KEY, help='The password (active key) of the device that will be used for device \
authentication against the ClearBlade Platform or Edge, defined within \
the devices table of the ClearBlade platform.')
parser.add_argument('--platformURL', dest=self.PLATFORM_URL_ARG_KEY, default=self.DEFAULT_PLATFORM_URL, \
help='The HTTP URL of the ClearBlade Platform or Edge the adapter will \
connect to (including port if non-standard). The default is ' + self.DEFAULT_PLATFORM_URL)
parser.add_argument('--messagingURL', dest=self.MESSAGING_URL_ARG_KEY, default=self.DEFAULT_MESSAGING_URL, \
help='The MQTT URL of the ClearBlade Platform or Edge the adapter will \
connect to (including port if non-standard). The default is ' + self.DEFAULT_MESSAGING_URL)
parser.add_argument('--adapterConfigCollection', dest=self.ADAPTER_CONFIG_COLLECTION_NAME_ARG_KEY, \
default=self.DEFAULT_ADAPTER_CONFIG_COLLECTION_NAME, \
help='The name of the ClearBlade Platform data collection which contains \
runtime configuration settings for the adapter. The default is ' + self.DEFAULT_ADAPTER_CONFIG_COLLECTION_NAME)
parser.add_argument('--logLevel', dest=self.LOG_LEVEL_ARG_KEY, default=self.DEFAULT_LOG_LEVEL, choices=['fatal', 'error', \
'warn', 'info', 'debug'], help='The level of logging that \
should be utilized by the adapter. The default is ' + self.DEFAULT_LOG_LEVEL)
args = vars(parser.parse_args(args=sys.argv[1:]))
for var in args:
if args[var] != "" and args[var] != None:
cbLogs.info("Setting adapter arguments from command line argument: " + var + ": " + str(args[var]))
self._args[var] = args[var]
def __byteify(self, input):
cbLogs.info("in byteify")
# helper function for python 2.7 to convert unicode to strings in a dict created with json.loads
# https://stackoverflow.com/a/13105359
if isinstance(input, dict):
return {self.__byteify(key): self.__byteify(value)
for key, value in input.iteritems()}
elif isinstance(input, list):
return [self.__byteify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input
if __name__ == "__main__":
"""
Start up command for connecting to platform brokers -
python clearblade_adapter_library.py \
--systemKey="<your system key>" \
--systemSecret="<your system secret>" \
--platformURL="https://<your instance name>.clearblade.com" \
--messagingURL="<your instance name>.clearblade.com"
Start up command on the edges -
python clearblade_adapter_library.py
"""
import time
def on_message(client, message):
payload = json.loads(message.payload.decode("utf-8"))
print("Message Recieved: ", str(payload))
client.publish("pub_topic", "hello")
print("Message Published")
# Uncomment the two lines below only when authenticating with service user
# and when you want to connect to any platform broker. After uncommenting,
# add the appropriate service_user account email and token. If you dont
# uncomment these two lines you'll get an error "CB Error: Device Password
# is required when not using a Service Account, can be supplied with --password flag"
# os.environ['CB_SERVICE_ACCOUNT'] = ""
# os.environ['CB_SERVICE_ACCOUNT_TOKEN'] = ""
adapter = AdapterLibrary("test_adapter")
_ = adapter.parse_arguments()
_ = adapter.initialize_clearblade()
adapter.connect_MQTT(topic="sub_topic", cb_message_handler=on_message)
print('\nConnected..\n\n')
while 1:
print('Listening...')
time.sleep(1)