diff --git a/DeviceManager/ImportHandler.py b/DeviceManager/ImportHandler.py index 96168eb..3796e7f 100644 --- a/DeviceManager/ImportHandler.py +++ b/DeviceManager/ImportHandler.py @@ -24,6 +24,7 @@ LOGGER = Log().color_log() + class ImportHandler: kafka = KafkaInstanceHandler() @@ -34,33 +35,36 @@ def __init__(self): def drop_sequences(): db.session.execute("DROP SEQUENCE template_id") db.session.execute("DROP SEQUENCE attr_id") - LOGGER.info(f" Removed sequences") - + LOGGER.info(f" Removed sequences") def replace_ids_by_import_ids(my_json): new_json = json.loads(my_json) return json.dumps(new_json).replace('\"id\":', '\"import_id\":') - def restore_template_sequence(): max_template_id = 1 - current_max_template_id = db.session.query(func.max(DeviceTemplate.id)).scalar() + current_max_template_id = db.session.query( + func.max(DeviceTemplate.id)).scalar() if current_max_template_id is not None: max_template_id = current_max_template_id + 1 - db.session.execute("CREATE SEQUENCE template_id START {}".format(str(max_template_id))) + db.session.execute( + "CREATE SEQUENCE template_id START {}".format(str(max_template_id))) def restore_attr_sequence(): max_attr_id = 1 - current_max_attr_id = db.session.query(func.max(DeviceAttr.id)).scalar() + current_max_attr_id = db.session.query( + func.max(DeviceAttr.id)).scalar() if current_max_attr_id is not None: max_attr_id = current_max_attr_id + 1 - db.session.execute("CREATE SEQUENCE attr_id START {}".format(str(max_attr_id))) + db.session.execute( + "CREATE SEQUENCE attr_id START {}".format(str(max_attr_id))) def restore_sequences(): ImportHandler.restore_template_sequence() ImportHandler.restore_attr_sequence() - LOGGER.info(f" Restored sequences") + LOGGER.info(f" Restored sequences") + @classmethod def notifies_deletion_to_kafka(cls, device, tenant): data = serialize_full_device(device, tenant) kafka_handler_instance = cls.kafka.getInstance(cls.kafka.kafkaNotifier) @@ -76,24 +80,21 @@ def delete_records(tenant): for device in devices: db.session.delete(device) ImportHandler.notifies_deletion_to_kafka(device, tenant) - LOGGER.info(f" Deleted devices") + LOGGER.info(f" Deleted devices") templates = db.session.query(DeviceTemplate) for template in templates: db.session.delete(template) - LOGGER.info(f" Deleted templates") - + LOGGER.info(f" Deleted templates") def clear_db_config(tenant): ImportHandler.drop_sequences() ImportHandler.delete_records(tenant) db.session.flush() - def restore_db_config(): ImportHandler.restore_sequences() - def save_templates(json_data, json_payload): saved_templates = [] for template in json_data['templates']: @@ -102,22 +103,20 @@ def save_templates(json_data, json_payload): if(json['import_id'] == template["id"]): load_attrs(json['attrs'], loaded_template, DeviceAttr, db) db.session.add(loaded_template) - saved_templates.append(loaded_template) - - LOGGER.info(f" Saved templates") - return saved_templates + saved_templates.append(loaded_template) + LOGGER.info(f" Saved templates") + return saved_templates def set_templates_on_device(loaded_device, json, saved_templates): loaded_device.templates = [] for template_id in json.get('templates', []): for saved_template in saved_templates: if(template_id == saved_template.id): - loaded_device.templates.append(saved_template) + loaded_device.templates.append(saved_template) auto_create_template(json, loaded_device) - def save_devices(json_data, json_payload, saved_templates): saved_devices = [] for device in json_data['devices']: @@ -125,21 +124,21 @@ def save_devices(json_data, json_payload, saved_templates): loaded_device = Device(**device) for json in json_payload['devices']: if(json['id'] == device["id"]): - ImportHandler.set_templates_on_device(loaded_device, json, saved_templates) + ImportHandler.set_templates_on_device( + loaded_device, json, saved_templates) db.session.add(loaded_device) - saved_devices.append(loaded_device) - - LOGGER.info(f" Saved devices") - return saved_devices + saved_devices.append(loaded_device) + LOGGER.info(f" Saved devices") + return saved_devices def notifies_creation_to_kafka(cls, saved_devices, tenant): kafka_handler_instance = cls.kafka.getInstance(cls.kafka.kafkaNotifier) for orm_device in saved_devices: full_device = serialize_full_device(orm_device, tenant) - kafka_handler_instance.create(full_device, meta={"service": tenant}) - + kafka_handler_instance.create( + full_device, meta={"service": tenant}) @staticmethod def import_data(data, token, content_type): @@ -164,18 +163,21 @@ def import_data(data, token, content_type): tenant = init_tenant_context(token, db) ImportHandler.clear_db_config(tenant) - + original_req_data = copy.copy(data) original_payload = json.loads(original_req_data) data = ImportHandler.replace_ids_by_import_ids(data) - json_data, json_payload = parse_payload(content_type, data, import_schema) + json_data, json_payload = parse_payload( + content_type, data, import_schema) - saved_templates = ImportHandler.save_templates(json_data, json_payload) + saved_templates = ImportHandler.save_templates( + json_data, json_payload) - saved_devices = ImportHandler.save_devices(json_data, original_payload, saved_templates) + saved_devices = ImportHandler.save_devices( + json_data, original_payload, saved_templates) ImportHandler.restore_db_config() @@ -186,11 +188,12 @@ def import_data(data, token, content_type): except IntegrityError as e: LOGGER.error(f' {e}') db.session.rollback() - raise HTTPRequestError(400, 'Template attribute constraints are violated by the request') + raise HTTPRequestError( + 400, 'Template attribute constraints are violated by the request') except Exception as e: LOGGER.error(f' {e}') db.session.rollback() - raise HTTPRequestError(400, 'Failed to import data') + raise HTTPRequestError(400, 'Failed to import data') finally: db.session.close()