Skip to content

Commit

Permalink
fix import (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mariane Previde committed Aug 27, 2020
1 parent 15a9108 commit 2243789
Showing 1 changed file with 35 additions and 32 deletions.
67 changes: 35 additions & 32 deletions DeviceManager/ImportHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

LOGGER = Log().color_log()


class ImportHandler:

kafka = KafkaInstanceHandler()
Expand All @@ -34,33 +35,36 @@ def __init__(self):
def drop_sequences():
db.session.execute("DROP SEQUENCE template_id")
db.session.execute("DROP SEQUENCE attr_id")
LOGGER.info(f" Removed sequences")

LOGGER.info(f" Removed sequences")

def replace_ids_by_import_ids(my_json):
new_json = json.loads(my_json)
return json.dumps(new_json).replace('\"id\":', '\"import_id\":')


def restore_template_sequence():
max_template_id = 1
current_max_template_id = db.session.query(func.max(DeviceTemplate.id)).scalar()
current_max_template_id = db.session.query(
func.max(DeviceTemplate.id)).scalar()
if current_max_template_id is not None:
max_template_id = current_max_template_id + 1
db.session.execute("CREATE SEQUENCE template_id START {}".format(str(max_template_id)))
db.session.execute(
"CREATE SEQUENCE template_id START {}".format(str(max_template_id)))

def restore_attr_sequence():
max_attr_id = 1
current_max_attr_id = db.session.query(func.max(DeviceAttr.id)).scalar()
current_max_attr_id = db.session.query(
func.max(DeviceAttr.id)).scalar()
if current_max_attr_id is not None:
max_attr_id = current_max_attr_id + 1
db.session.execute("CREATE SEQUENCE attr_id START {}".format(str(max_attr_id)))
db.session.execute(
"CREATE SEQUENCE attr_id START {}".format(str(max_attr_id)))

def restore_sequences():
ImportHandler.restore_template_sequence()
ImportHandler.restore_attr_sequence()
LOGGER.info(f" Restored sequences")
LOGGER.info(f" Restored sequences")

@classmethod
def notifies_deletion_to_kafka(cls, device, tenant):
data = serialize_full_device(device, tenant)
kafka_handler_instance = cls.kafka.getInstance(cls.kafka.kafkaNotifier)
Expand All @@ -76,24 +80,21 @@ def delete_records(tenant):
for device in devices:
db.session.delete(device)
ImportHandler.notifies_deletion_to_kafka(device, tenant)
LOGGER.info(f" Deleted devices")
LOGGER.info(f" Deleted devices")

templates = db.session.query(DeviceTemplate)
for template in templates:
db.session.delete(template)
LOGGER.info(f" Deleted templates")

LOGGER.info(f" Deleted templates")

def clear_db_config(tenant):
ImportHandler.drop_sequences()
ImportHandler.delete_records(tenant)
db.session.flush()


def restore_db_config():
ImportHandler.restore_sequences()


def save_templates(json_data, json_payload):
saved_templates = []
for template in json_data['templates']:
Expand All @@ -102,44 +103,42 @@ def save_templates(json_data, json_payload):
if(json['import_id'] == template["id"]):
load_attrs(json['attrs'], loaded_template, DeviceAttr, db)
db.session.add(loaded_template)
saved_templates.append(loaded_template)

LOGGER.info(f" Saved templates")
return saved_templates
saved_templates.append(loaded_template)

LOGGER.info(f" Saved templates")
return saved_templates

def set_templates_on_device(loaded_device, json, saved_templates):
loaded_device.templates = []
for template_id in json.get('templates', []):
for saved_template in saved_templates:
if(template_id == saved_template.id):
loaded_device.templates.append(saved_template)
loaded_device.templates.append(saved_template)

auto_create_template(json, loaded_device)


def save_devices(json_data, json_payload, saved_templates):
saved_devices = []
for device in json_data['devices']:
device.pop('templates', None)
loaded_device = Device(**device)
for json in json_payload['devices']:
if(json['id'] == device["id"]):
ImportHandler.set_templates_on_device(loaded_device, json, saved_templates)
ImportHandler.set_templates_on_device(
loaded_device, json, saved_templates)

db.session.add(loaded_device)
saved_devices.append(loaded_device)

LOGGER.info(f" Saved devices")
return saved_devices
saved_devices.append(loaded_device)

LOGGER.info(f" Saved devices")
return saved_devices

def notifies_creation_to_kafka(cls, saved_devices, tenant):
kafka_handler_instance = cls.kafka.getInstance(cls.kafka.kafkaNotifier)
for orm_device in saved_devices:
full_device = serialize_full_device(orm_device, tenant)
kafka_handler_instance.create(full_device, meta={"service": tenant})

kafka_handler_instance.create(
full_device, meta={"service": tenant})

@staticmethod
def import_data(data, token, content_type):
Expand All @@ -164,18 +163,21 @@ def import_data(data, token, content_type):
tenant = init_tenant_context(token, db)

ImportHandler.clear_db_config(tenant)

original_req_data = copy.copy(data)

original_payload = json.loads(original_req_data)

data = ImportHandler.replace_ids_by_import_ids(data)

json_data, json_payload = parse_payload(content_type, data, import_schema)
json_data, json_payload = parse_payload(
content_type, data, import_schema)

saved_templates = ImportHandler.save_templates(json_data, json_payload)
saved_templates = ImportHandler.save_templates(
json_data, json_payload)

saved_devices = ImportHandler.save_devices(json_data, original_payload, saved_templates)
saved_devices = ImportHandler.save_devices(
json_data, original_payload, saved_templates)

ImportHandler.restore_db_config()

Expand All @@ -186,11 +188,12 @@ def import_data(data, token, content_type):
except IntegrityError as e:
LOGGER.error(f' {e}')
db.session.rollback()
raise HTTPRequestError(400, 'Template attribute constraints are violated by the request')
raise HTTPRequestError(
400, 'Template attribute constraints are violated by the request')
except Exception as e:
LOGGER.error(f' {e}')
db.session.rollback()
raise HTTPRequestError(400, 'Failed to import data')
raise HTTPRequestError(400, 'Failed to import data')
finally:
db.session.close()

Expand Down

0 comments on commit 2243789

Please sign in to comment.