Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .config_startup.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"EE_ID": "XXXXXXXXXX",
"SECURED" : true,
"IO_FORMATTER" : "",
"MAIN_LOOP_RESOLUTION" : 5,
"MAIN_LOOP_RESOLUTION" : 10,

"SYSTEM_TEMPERATURE_CHECK" : false,

Expand Down
141 changes: 77 additions & 64 deletions plugins/business/tutorials/net_config_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@

**BasePlugin.CONFIG,
'ALLOW_EMPTY_INPUTS' : True,

'MAX_INPUTS_QUEUE_SIZE' : 16,
'PLUGIN_LOOP_RESOLUTION' : 50, # we force this to be 50 Hz from the standard 20 Hz
'MAX_INPUTS_QUEUE_SIZE' : 128, # increase the queue size to 128 from std 1

'PROCESS_DELAY' : 0,

Expand Down Expand Up @@ -80,7 +80,8 @@ def __get_active_nodes(self, netmon_current_network : dict) -> dict:
if v.get("working", False) == self.const.DEVICE_STATUS_ONLINE
}
return active_network



def __get_active_nodes_summary_with_peers(self, netmon_current_network: dict):
"""
Looks in all whitelists and finds the nodes that is allowed by most other nodes.
Expand Down Expand Up @@ -172,8 +173,77 @@ def __maybe_send(self):
#endif have allowed nodes
#endif time to send
return




def __maybe_process_netmon(self, current_network : dict):
if len(current_network) == 0:
self.P("Received NET_MON_01 data without CURRENT_NETWORK data.", color='r ')
else:
self.__new_nodes_this_iter = 0
peers_status = self.__get_active_nodes_summary_with_peers(current_network)
if self.__debug_netmon_count > 0:
# self.P(f"NetMon debug:\n{self.json_dumps(self.__get_active_nodes(current_network), indent=2)}")
self.P(f"Peers status:\n{self.json_dumps(peers_status, indent=2)}")
self.__debug_netmon_count -= 1
for addr in peers_status:
if addr == self.ee_addr:
# its us, no need to check whitelist
continue
if peers_status[addr]["allows_me"]:
# we have found a whitelist that contains our address
if addr not in self.__allowed_nodes:
self.__allowed_nodes[addr] = {
"whitelist" : peers_status[addr]["whitelist"],
"last_config_get" : 0
}
self.__new_nodes_this_iter += 1
#endif addr not in __allowed_nodes
#endif addr allows me
#endfor each addr in peers_status
if self.__new_nodes_this_iter > 0:
self.P(f"Found {self.__new_nodes_this_iter} new peered nodes.")
#endif len(current_network) == 0
return


def __maybe_process_update_monitor_data(self, data: dict):
sender = data.get(self.const.PAYLOAD_DATA.EE_SENDER, None)
is_encrypted = data.get(self.const.PAYLOAD_DATA.EE_IS_ENCRYPTED, False)
encrypted_data = data.get(self.const.PAYLOAD_DATA.EE_ENCRYPTED_DATA, None)
if is_encrypted and encrypted_data is not None:
self.P("Received UPDATE_MONITOR_01 encrypted data. Decrypting...")
str_decrypted_data = self.bc.decrypt_str(
str_b64data=encrypted_data,
str_sender=sender,
)
decrypted_data = self.json_loads(str_decrypted_data)
if decrypted_data is not None:
received_pipelines = decrypted_data.get("EE_PIPELINES", [])
self.P("Decrypted data size {} with {} pipelines (speed: {:.1f} Hz):\n{}".format(
len(str_decrypted_data), len(received_pipelines),
self.actual_plugin_resolution,
self.json_dumps([
{
k:v for k,v in x.items()
if k in ["NAME", "TYPE", "MODIFIED_BY_ADDR", "LAST_UPDATE_TIME"]
}
for x in received_pipelines],
indent=2),
))
sender_no_prefix = self.bc.maybe_remove_prefix(sender)
self.__allowed_nodes[sender_no_prefix]["pipelines"] = received_pipelines
# now we can add the pipelines to the netmon cache
else:
self.P("Failed to decrypt data.", color='r')
#endif decrypted_data is not None
else:
self.P("Received unencrypted data.")
if sender in self.__allowed_nodes:
#
self.P(f"Updated last_config_get for node '{sender}'")
return


def __maybe_process_received(self):
data = self.dataapi_struct_data()
if data is not None:
Expand All @@ -190,68 +260,11 @@ def __maybe_process_received(self):
return
if signature == "NET_MON_01":
current_network = data.get("CURRENT_NETWORK", {})
if len(current_network) == 0:
self.P("Received NET_MON_01 data without CURRENT_NETWORK data.", color='r ')
else:
self.__new_nodes_this_iter = 0
peers_status = self.__get_active_nodes_summary_with_peers(current_network)
if self.__debug_netmon_count > 0:
# self.P(f"NetMon debug:\n{self.json_dumps(self.__get_active_nodes(current_network), indent=2)}")
self.P(f"Peers status:\n{self.json_dumps(peers_status, indent=2)}")
self.__debug_netmon_count -= 1
for addr in peers_status:
if addr == self.ee_addr:
# its us, no need to check whitelist
continue
if peers_status[addr]["allows_me"]:
# we have found a whitelist that contains our address
if addr not in self.__allowed_nodes:
self.__allowed_nodes[addr] = {
"whitelist" : peers_status[addr]["whitelist"],
"last_config_get" : 0
}
self.__new_nodes_this_iter += 1
#endif addr not in __allowed_nodes
#endif addr allows me
#endfor each addr in peers_status
if self.__new_nodes_this_iter > 0:
self.P(f"Found {self.__new_nodes_this_iter} new peered nodes.")
#endif nodes_data is not None
self.__maybe_process_netmon(current_network)
#endif signature == "NET_MON_01"

elif signature == "UPDATE_MONITOR_01":
is_encrypted = data.get(self.const.PAYLOAD_DATA.EE_IS_ENCRYPTED, False)
encrypted_data = data.get(self.const.PAYLOAD_DATA.EE_ENCRYPTED_DATA, None)
if is_encrypted and encrypted_data is not None:
self.P("Received UPDATE_MONITOR_01 encrypted data. Decrypting...")
str_decrypted_data = self.bc.decrypt_str(
str_b64data=encrypted_data,
str_sender=sender,
)
decrypted_data = self.json_loads(str_decrypted_data)
if decrypted_data is not None:
received_pipelines = decrypted_data.get("EE_PIPELINES", [])
self.P("Decrypted data size {} with {} pipelines:\n{}".format(
len(str_decrypted_data), len(received_pipelines),
self.json_dumps([
{
k:v for k,v in x.items()
if k in ["NAME", "TYPE", "MODIFIED_BY_ADDR", "LAST_UPDATE_TIME"]
}
for x in received_pipelines],
indent=2),
))
sender_no_prefix = self.bc.maybe_remove_prefix(sender)
self.__allowed_nodes[sender_no_prefix]["pipelines"] = received_pipelines
# now we can add the pipelines to the netmon cache
else:
self.P("Failed to decrypt data.", color='r')
#endif decrypted_data is not None
else:
self.P("Received unencrypted data.")
if sender in self.__allowed_nodes:
#
self.P(f"Updated last_config_get for node '{sender}'")
self.__maybe_process_update_monitor_data(data)
#endif signature == "UPDATE_MONITOR_01"

return
Expand Down
2 changes: 1 addition & 1 deletion ver.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VER__ = '2.0.64'
__VER__ = '2.0.65'