Skip to content

Commit

Permalink
feat: update ingest methods
Browse files Browse the repository at this point in the history
ENG-1602
  • Loading branch information
cowan-macady committed Aug 23, 2023
1 parent 60d00be commit 1b02eff
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 76 deletions.
23 changes: 15 additions & 8 deletions indykite_sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ def main():
else:
print("Invalid customer id")
time.sleep(180)
client_config2 = ConfigClient(False, client_config.token_source)
client_config2 = ConfigClient(client_config.token_source)
print(client_config2.token_source.token.access_token)
# read_customer_by_name method: to get customer info from customer name
customer_name = args.customer_name
Expand Down Expand Up @@ -2767,7 +2767,8 @@ def main():
tenant_id,
identity_properties,
properties)
ingest_record_digital_twin = client_ingest.ingest_record_upsert(record_id, upsert)
record = client_ingest.record_upsert(record_id, upsert)
ingest_record_digital_twin = client_ingest.ingest_record(record)
if ingest_record_digital_twin:
api_helper.print_response(ingest_record_digital_twin)
else:
Expand All @@ -2786,7 +2787,8 @@ def main():
type,
tags,
properties)
ingest_record_resource = client_ingest.ingest_record_upsert(record_id, upsert)
record = client_ingest.record_upsert(record_id, upsert)
ingest_record_resource = client_ingest.ingest_record(record)
if ingest_record_resource:
api_helper.print_response(ingest_record_resource)
else:
Expand All @@ -2804,7 +2806,8 @@ def main():
upsert = client_ingest.upsert_data_relation(
match,
properties)
ingest_record_relation = client_ingest.ingest_record_upsert(record_id, upsert)
record = client_ingest.record_upsert(record_id, upsert)
ingest_record_relation = client_ingest.ingest_record(record)
if ingest_record_relation:
api_helper.print_response(ingest_record_relation)
else:
Expand All @@ -2815,7 +2818,8 @@ def main():
record_id = "745890"
node = client_ingest.node_match("vehicle-1", "Vehicle")
delete = client_ingest.delete_data_node(node)
delete_record_node = client_ingest.ingest_record_delete(id=record_id, delete=delete)
record = client_ingest.record_delete(record_id, delete)
delete_record_node = client_ingest.ingest_record(record)
if delete_record_node:
api_helper.print_response(delete_record_node)
else:
Expand All @@ -2829,7 +2833,8 @@ def main():
target_match = client_ingest.node_match("lot-1", "ParkingLot")
relation = client_ingest.relation_match(source_match, target_match, type)
delete = client_ingest.delete_data_relation(relation)
delete_record_relation = client_ingest.ingest_record_delete(id=record_id, delete=delete)
record = client_ingest.record_delete(record_id, delete)
delete_record_relation = client_ingest.ingest_record(record)
if delete_record_relation:
api_helper.print_response(delete_record_relation)
else:
Expand All @@ -2842,7 +2847,8 @@ def main():
key = "nodePropertyName"
node_property = client_ingest.node_property_match(match, key)
delete = client_ingest.delete_data_node_property(node_property)
delete_record_node_property = client_ingest.ingest_record_delete(id=record_id, delete=delete)
record = client_ingest.record_delete(record_id, delete)
delete_record_node_property = client_ingest.ingest_record(record)
if delete_record_node_property:
api_helper.print_response(delete_record_node_property)
else:
Expand All @@ -2858,7 +2864,8 @@ def main():
key = "relationPropertyName"
relation_property = client_ingest.relation_property_match(match, key)
delete = client_ingest.delete_data_relation_property(relation_property)
delete_record_relation_property = client_ingest.ingest_record_delete(id=record_id, delete=delete)
record = client_ingest.record_delete(record_id, delete)
delete_record_relation_property = client_ingest.ingest_record(record)
if delete_record_relation_property:
api_helper.print_response(delete_record_relation_property)
else:
Expand Down
6 changes: 3 additions & 3 deletions indykite_sdk/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, token_source=None):
return logger_error(exception)

# Imported methods
from .ingest_record import ingest_record_upsert, upsert_data_node_digital_twin, identity_property, ingest_property, \
from .ingest_record import ingest_record, upsert_data_node_digital_twin, identity_property, ingest_property, \
upsert_data_node_resource, upsert_data_relation, relation_match, node_match, node_property_match, \
relation_property_match, ingest_record_delete, delete_data_node, delete_data_relation, delete_data_node_property, \
delete_data_relation_property, generate_records_request, stream_records, record_upsert
relation_property_match, ingest_record, delete_data_node, delete_data_relation, delete_data_node_property, \
delete_data_relation_property, generate_records_request, stream_records, record_upsert, record_delete
98 changes: 45 additions & 53 deletions indykite_sdk/ingest/ingest_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,70 @@
from indykite_sdk.utils.message_to_value import arg_to_value


def ingest_record_upsert(self, id, upsert):
def ingest_record(self, record):
"""
data ingestion
:param self:
:param id: id record for client ref
:param upsert: UpsertData object
:param record: Record object
:return: record_error
"""
sys.excepthook = logger.handle_excepthook
try:
response = self.stub.IngestRecord(
pb2.IngestRecordRequest(
record=model_pb2.Record(
id=str(id),
upsert=upsert
)
record=record
)
)
if not response:
return None

return IngestRecordResponse.deserialize(response)

except Exception as exception:
return logger.logger_error(exception)


def record_upsert(self, id, upsert):
"""
create record
:param self:
:param id: id record for client ref
:param upsert: UpsertData object
:return: record
"""
sys.excepthook = logger.handle_excepthook
try:
record=model_pb2.Record(
id=str(id),
upsert=upsert
)
if not record:
return None
return record
except Exception as exception:
return logger.logger_error(exception)


def record_delete(self, id, delete):
"""
create record
:param self:
:param id: id record for client ref
:param delete: DeleteData object
:return: record
"""
sys.excepthook = logger.handle_excepthook
try:
record = model_pb2.Record(
id=str(id),
delete=delete
)
if not record:
return None
return record
except Exception as exception:
return logger.logger_error(exception)


def upsert_data_node_digital_twin(self,
external_id,
type,
Expand Down Expand Up @@ -304,51 +341,6 @@ def delete_data_relation_property(self, relation_property):
return logger.logger_error(exception)


def ingest_record_delete(self, id, delete):
"""
IngestRecord delete
:param self:
:param id: record id
:param delete: DeleteData object (node, relation, node_property or relation_property)
:return: record_error
"""
sys.excepthook = logger.handle_excepthook
try:
response = self.stub.IngestRecord(
pb2.IngestRecordRequest(
record=model_pb2.Record(
id=str(id),
delete=delete
)
)
)
if not response:
return None
return IngestRecordResponse.deserialize(response)

except Exception as exception:
return logger.logger_error(exception)


def record_upsert(self, id, upsert):
"""
create record
:param self:
:param id: id record for client ref
:param upsert: UpsertData object
:return: record
"""
sys.excepthook = logger.handle_excepthook
try:
record = model_pb2.Record(
id=str(id),
upsert=upsert
)
return record
except Exception as exception:
return logger.logger_error(exception)


def generate_records_request(self, records):
"""Create iterator for record requests."""
for record in records:
Expand Down
43 changes: 31 additions & 12 deletions tests/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def test_ingest_record_digital_twin_success():
identity_properties,
properties)
assert isinstance(upsert, model_pb2.UpsertData)

response = client.ingest_record_upsert(record_id, upsert)
record = client.record_upsert(record_id, upsert)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)


Expand All @@ -45,7 +46,9 @@ def test_ingest_record_resource_success():
[],
properties)
assert isinstance(upsert, model_pb2.UpsertData)
response = client.ingest_record_upsert(record_id, upsert)
record = client.record_upsert(record_id, upsert)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)


Expand All @@ -67,13 +70,17 @@ def test_ingest_record_relation_success():
match,
properties)
assert isinstance(upsert, model_pb2.UpsertData)
response = client.ingest_record_upsert(record_id, upsert)
record = client.record_upsert(record_id, upsert)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)

node = client.node_match("vehicle-1", "Vehicle")
delete = client.delete_data_node(node)
assert isinstance(delete, model_pb2.DeleteData)
response = client.ingest_record_delete(id=record_id, delete=delete)
record = client.record_delete(record_id, delete)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)


Expand All @@ -93,12 +100,16 @@ def test_ingest_record_relation_delete():
match,
properties)
assert isinstance(upsert, model_pb2.UpsertData)
response = client.ingest_record_upsert(record_id, upsert)
record = client.record_upsert(record_id, upsert)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)

delete = client.delete_data_relation(match)
assert isinstance(delete, model_pb2.DeleteData)
response = client.ingest_record_delete(id=record_id, delete=delete)
record = client.record_delete(record_id, delete)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)


Expand All @@ -118,14 +129,18 @@ def test_delete_record_node_property():
match,
properties)
assert isinstance(upsert, model_pb2.UpsertData)
response = client.ingest_record_upsert(record_id, upsert)
record = client.record_upsert(record_id, upsert)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)

key = "nodePropertyName"
node_property = client.node_property_match(source_match, key)
delete = client.delete_data_node_property(node_property)
assert isinstance(delete, model_pb2.DeleteData)
response = client.ingest_record_delete(id=record_id, delete=delete)
record = client.record_delete(record_id, delete)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)


Expand All @@ -145,14 +160,17 @@ def test_delete_record_relation_property():
match,
properties)
assert isinstance(upsert, model_pb2.UpsertData)
response = client.ingest_record_upsert(record_id, upsert)
record = client.record_upsert(record_id, upsert)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)

key = "relationPropertyName"
relation_property = client.relation_property_match(match, key)
delete = client.delete_data_relation_property(relation_property)
assert isinstance(delete, model_pb2.DeleteData)
response = client.ingest_record_delete(id=record_id, delete=delete)
record = client.record_delete(record_id, delete)
assert isinstance(record, model_pb2.Record)
response = client.ingest_record(record)
assert isinstance(response, IngestRecordResponse)


Expand Down Expand Up @@ -282,6 +300,7 @@ def test_ingest_record_delete_error(capsys):
record_id = "745890"
node = client.node_match("vehicle-11111111111", "Vehicle111111111")
delete = client.delete_data_node(node)
delete_record_node = client.ingest_record_delete(id=record_id, delete=delete)
record = client.record_delete(record_id, delete)
delete_record_node = client.ingest_record(record)
captured = capsys.readouterr()
assert "ERROR" in captured.err

0 comments on commit 1b02eff

Please sign in to comment.