diff --git a/.config_startup.json b/.config_startup.json index 60b99b0d..e38751de 100644 --- a/.config_startup.json +++ b/.config_startup.json @@ -2,7 +2,7 @@ "EE_ID": "XXXXXXXXXX", "SECURED" : true, "IO_FORMATTER" : "", - "MAIN_LOOP_RESOLUTION" : 5, + "MAIN_LOOP_RESOLUTION" : 10, "SYSTEM_TEMPERATURE_CHECK" : false, diff --git a/plugins/business/tutorials/net_config_monitor.py b/plugins/business/tutorials/net_config_monitor.py index 847b19d1..fb17fc58 100644 --- a/plugins/business/tutorials/net_config_monitor.py +++ b/plugins/business/tutorials/net_config_monitor.py @@ -39,8 +39,8 @@ **BasePlugin.CONFIG, 'ALLOW_EMPTY_INPUTS' : True, - - 'MAX_INPUTS_QUEUE_SIZE' : 16, + 'PLUGIN_LOOP_RESOLUTION' : 50, # we force this to be 50 Hz from the standard 20 Hz + 'MAX_INPUTS_QUEUE_SIZE' : 128, # increase the queue size to 128 from std 1 'PROCESS_DELAY' : 0, @@ -80,7 +80,8 @@ def __get_active_nodes(self, netmon_current_network : dict) -> dict: if v.get("working", False) == self.const.DEVICE_STATUS_ONLINE } return active_network - + + def __get_active_nodes_summary_with_peers(self, netmon_current_network: dict): """ Looks in all whitelists and finds the nodes that is allowed by most other nodes. @@ -172,8 +173,77 @@ def __maybe_send(self): #endif have allowed nodes #endif time to send return - - + + + def __maybe_process_netmon(self, current_network : dict): + if len(current_network) == 0: + self.P("Received NET_MON_01 data without CURRENT_NETWORK data.", color='r ') + else: + self.__new_nodes_this_iter = 0 + peers_status = self.__get_active_nodes_summary_with_peers(current_network) + if self.__debug_netmon_count > 0: + # self.P(f"NetMon debug:\n{self.json_dumps(self.__get_active_nodes(current_network), indent=2)}") + self.P(f"Peers status:\n{self.json_dumps(peers_status, indent=2)}") + self.__debug_netmon_count -= 1 + for addr in peers_status: + if addr == self.ee_addr: + # its us, no need to check whitelist + continue + if peers_status[addr]["allows_me"]: + # we have found a whitelist that contains our address + if addr not in self.__allowed_nodes: + self.__allowed_nodes[addr] = { + "whitelist" : peers_status[addr]["whitelist"], + "last_config_get" : 0 + } + self.__new_nodes_this_iter += 1 + #endif addr not in __allowed_nodes + #endif addr allows me + #endfor each addr in peers_status + if self.__new_nodes_this_iter > 0: + self.P(f"Found {self.__new_nodes_this_iter} new peered nodes.") + #endif len(current_network) == 0 + return + + + def __maybe_process_update_monitor_data(self, data: dict): + sender = data.get(self.const.PAYLOAD_DATA.EE_SENDER, None) + is_encrypted = data.get(self.const.PAYLOAD_DATA.EE_IS_ENCRYPTED, False) + encrypted_data = data.get(self.const.PAYLOAD_DATA.EE_ENCRYPTED_DATA, None) + if is_encrypted and encrypted_data is not None: + self.P("Received UPDATE_MONITOR_01 encrypted data. Decrypting...") + str_decrypted_data = self.bc.decrypt_str( + str_b64data=encrypted_data, + str_sender=sender, + ) + decrypted_data = self.json_loads(str_decrypted_data) + if decrypted_data is not None: + received_pipelines = decrypted_data.get("EE_PIPELINES", []) + self.P("Decrypted data size {} with {} pipelines (speed: {:.1f} Hz):\n{}".format( + len(str_decrypted_data), len(received_pipelines), + self.actual_plugin_resolution, + self.json_dumps([ + { + k:v for k,v in x.items() + if k in ["NAME", "TYPE", "MODIFIED_BY_ADDR", "LAST_UPDATE_TIME"] + } + for x in received_pipelines], + indent=2), + )) + sender_no_prefix = self.bc.maybe_remove_prefix(sender) + self.__allowed_nodes[sender_no_prefix]["pipelines"] = received_pipelines + # now we can add the pipelines to the netmon cache + else: + self.P("Failed to decrypt data.", color='r') + #endif decrypted_data is not None + else: + self.P("Received unencrypted data.") + if sender in self.__allowed_nodes: + # + self.P(f"Updated last_config_get for node '{sender}'") + return + + def __maybe_process_received(self): data = self.dataapi_struct_data() if data is not None: @@ -190,68 +260,11 @@ def __maybe_process_received(self): return if signature == "NET_MON_01": current_network = data.get("CURRENT_NETWORK", {}) - if len(current_network) == 0: - self.P("Received NET_MON_01 data without CURRENT_NETWORK data.", color='r ') - else: - self.__new_nodes_this_iter = 0 - peers_status = self.__get_active_nodes_summary_with_peers(current_network) - if self.__debug_netmon_count > 0: - # self.P(f"NetMon debug:\n{self.json_dumps(self.__get_active_nodes(current_network), indent=2)}") - self.P(f"Peers status:\n{self.json_dumps(peers_status, indent=2)}") - self.__debug_netmon_count -= 1 - for addr in peers_status: - if addr == self.ee_addr: - # its us, no need to check whitelist - continue - if peers_status[addr]["allows_me"]: - # we have found a whitelist that contains our address - if addr not in self.__allowed_nodes: - self.__allowed_nodes[addr] = { - "whitelist" : peers_status[addr]["whitelist"], - "last_config_get" : 0 - } - self.__new_nodes_this_iter += 1 - #endif addr not in __allowed_nodes - #endif addr allows me - #endfor each addr in peers_status - if self.__new_nodes_this_iter > 0: - self.P(f"Found {self.__new_nodes_this_iter} new peered nodes.") - #endif nodes_data is not None + self.__maybe_process_netmon(current_network) #endif signature == "NET_MON_01" elif signature == "UPDATE_MONITOR_01": - is_encrypted = data.get(self.const.PAYLOAD_DATA.EE_IS_ENCRYPTED, False) - encrypted_data = data.get(self.const.PAYLOAD_DATA.EE_ENCRYPTED_DATA, None) - if is_encrypted and encrypted_data is not None: - self.P("Received UPDATE_MONITOR_01 encrypted data. Decrypting...") - str_decrypted_data = self.bc.decrypt_str( - str_b64data=encrypted_data, - str_sender=sender, - ) - decrypted_data = self.json_loads(str_decrypted_data) - if decrypted_data is not None: - received_pipelines = decrypted_data.get("EE_PIPELINES", []) - self.P("Decrypted data size {} with {} pipelines:\n{}".format( - len(str_decrypted_data), len(received_pipelines), - self.json_dumps([ - { - k:v for k,v in x.items() - if k in ["NAME", "TYPE", "MODIFIED_BY_ADDR", "LAST_UPDATE_TIME"] - } - for x in received_pipelines], - indent=2), - )) - sender_no_prefix = self.bc.maybe_remove_prefix(sender) - self.__allowed_nodes[sender_no_prefix]["pipelines"] = received_pipelines - # now we can add the pipelines to the netmon cache - else: - self.P("Failed to decrypt data.", color='r') - #endif decrypted_data is not None - else: - self.P("Received unencrypted data.") - if sender in self.__allowed_nodes: - # - self.P(f"Updated last_config_get for node '{sender}'") + self.__maybe_process_update_monitor_data(data) #endif signature == "UPDATE_MONITOR_01" return diff --git a/ver.py b/ver.py index 14093252..0d1f0f65 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.0.64' +__VER__ = '2.0.65'