Skip to content

Commit

Permalink
feat: separate stream from ingest
Browse files Browse the repository at this point in the history
Closing ENG-1598
  • Loading branch information
cowan-macady authored and Septimus4 committed Aug 29, 2023
1 parent c458820 commit 1e094ee
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 251 deletions.
2 changes: 1 addition & 1 deletion examples/spaces/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pypi"
connexion = {version = ">=2.6.0", extras = ["swagger-ui"]}
swagger-ui-bundle = ">=0.0.4"
python-dateutil = ">=2.6.0"
indykite-sdk-python = {ref = "v1.22.0", git = "https://github.com/indykite/indykite-sdk-python"}
indykite-sdk-python = {ref = "v1.27.0", git = "https://github.com/indykite/indykite-sdk-python"}
python-jose = "3.3.0"
gql = "3.4.0"
requests = ">=2.31.0"
Expand Down
380 changes: 189 additions & 191 deletions examples/spaces/Pipfile.lock

Large diffs are not rendered by default.

39 changes: 22 additions & 17 deletions indykite_sdk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import logging
from indykite_sdk.utils.message_to_value import arg_to_value
from indykite_sdk import api_helper
import re


class ParseKwargs(argparse.Action):
Expand Down Expand Up @@ -716,6 +717,8 @@ def main():
delete_record_relation_parser = subparsers.add_parser("delete_record_relation")
delete_record_node_parser = subparsers.add_parser("delete_record_node")
stream_records_parser = subparsers.add_parser("stream_records")
edges_parser = subparsers.add_parser("edges")


# get_schema_helpers
get_schema_helpers_parser = subparsers.add_parser("get_schema_helpers")
Expand Down Expand Up @@ -925,7 +928,7 @@ def main():
state="DIGITAL_TWIN_STATE_ACTIVE",
password=PasswordCredential(
email=EmailIdentity(
email="test2314@example.com",
email="test562@example.com",
verified=True
),
value="password"
Expand All @@ -952,7 +955,7 @@ def main():
state="DIGITAL_TWIN_STATE_ACTIVE",
password=PasswordCredential(
email=EmailIdentity(
email="test2315@example.com",
email="test563@example.com",
verified=True
),
value="password"
Expand All @@ -964,7 +967,7 @@ def main():
state="DIGITAL_TWIN_STATE_ACTIVE",
password=PasswordCredential(
email=EmailIdentity(
email="test2316@example.com",
email="test564@example.com",
verified=True
),
value="password"
Expand Down Expand Up @@ -2751,14 +2754,14 @@ def main():

elif command == "ingest_record_digital_twin":
# replace with actual values
record_id = "745898"
external_id = "external-dt-id3"
record_id = "7614125"
external_id = "external-dt-id7611"
kind = "DIGITAL_TWIN_KIND_PERSON"
tenant_id = os.getenv('TENANT_ID')
type = "CarOwner"
identity_property = client_ingest.identity_property("customIdProp", "456")
identity_property = client_ingest.identity_property("customIdProp7611", "456")
identity_properties = [identity_property]
ingest_property = client_ingest.ingest_property("customProp", "741")
ingest_property = client_ingest.ingest_property("customProp17611", "741")
properties = [ingest_property]
upsert = client_ingest.upsert_data_node_digital_twin(
external_id,
Expand All @@ -2776,10 +2779,10 @@ def main():
return ingest_record_digital_twin

elif command == "ingest_record_resource":
record_id = "745899"
record_id = "74158100"
external_id = "lot-1"
type = "ParkingLot"
ingest_property = client_ingest.ingest_property("customProp", "9654")
ingest_property = client_ingest.ingest_property("customProp100", "9654")
properties = [ingest_property]
tags = []
upsert = client_ingest.upsert_data_node_resource(
Expand All @@ -2796,7 +2799,7 @@ def main():
return ingest_record_resource

elif command == "ingest_record_relation":
record_id = "745890"
record_id = "7415890"
type = "CAN_USE"
source_match = client_ingest.node_match("vehicle-1", "Vehicle")
target_match = client_ingest.node_match("lot-1", "ParkingLot")
Expand Down Expand Up @@ -2874,13 +2877,15 @@ def main():

elif command == "stream_records":
# replace with actual values
record_id = "145898"
external_id = "external-dt-id1"
record_id = "14589904"
external_id = "external-dt-id904"
tenant_id = os.getenv('TENANT_ID')
type = "Person"
tags = []
identity_properties = []
properties = []
identity_property = client_ingest.identity_property("customIdPropST904", "456")
identity_properties = [identity_property]
ingest_property = client_ingest.ingest_property("customPropST1904", "741")
properties = [ingest_property]
upsert = client_ingest.upsert_data_node_digital_twin(
external_id,
type,
Expand All @@ -2890,10 +2895,10 @@ def main():
properties)
record = client_ingest.record_upsert(record_id, upsert)

record_id2 = "145899"
external_id = "lot-1"
record_id2 = "14589905"
external_id = "lot-905"
type = "ParkingLot"
ingest_property = client_ingest.ingest_property("customProp", "9654")
ingest_property = client_ingest.ingest_property("customProp905", "9654")
properties = [ingest_property]
tags = []
upsert2 = client_ingest.upsert_data_node_resource(
Expand Down
3 changes: 2 additions & 1 deletion indykite_sdk/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ def __init__(self, token_source=None):
from .ingest_record import ingest_record, upsert_data_node_digital_twin, identity_property, ingest_property, \
upsert_data_node_resource, upsert_data_relation, relation_match, node_match, node_property_match, \
relation_property_match, ingest_record, delete_data_node, delete_data_relation, delete_data_node_property, \
delete_data_relation_property, generate_records_request, stream_records, record_upsert, record_delete
delete_data_relation_property, record_upsert, record_delete
from .stream_records import generate_records_request, stream_records
43 changes: 2 additions & 41 deletions indykite_sdk/ingest/ingest_record.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from indykite_sdk.indykite.ingest.v1beta2 import ingest_api_pb2 as pb2
from indykite_sdk.indykite.ingest.v1beta2 import model_pb2
from indykite_sdk.model.ingest_record import StreamRecordsResponse, IngestRecordResponse
from indykite_sdk.model.ingest_record import IngestRecordResponse
import sys
import indykite_sdk.utils.logger as logger
from indykite_sdk.utils.message_to_value import arg_to_value
Expand Down Expand Up @@ -38,7 +38,7 @@ def record_upsert(self, id, upsert):
"""
sys.excepthook = logger.handle_excepthook
try:
record=model_pb2.Record(
record = model_pb2.Record(
id=str(id),
upsert=upsert
)
Expand Down Expand Up @@ -339,42 +339,3 @@ def delete_data_relation_property(self, relation_property):
return delete
except Exception as exception:
return logger.logger_error(exception)


def generate_records_request(self, records):
"""Create iterator for record requests."""
for record in records:
record_request = pb2.StreamRecordsRequest(record=record)
yield record_request


def stream_records(self, record):
sys.excepthook = logger.handle_excepthook
try:
record_iterator = self.generate_records_request(record)
response_iterator = self.stub.StreamRecords(record_iterator)
responses = list(response_iterator)
res = [StreamRecordsResponse.deserialize(response) for response in responses]
return res

except Exception as exception:
return logger.logger_error(exception)


def change(self, id, data_type):
"""
create change
:param self:
:param id: change id
:param data_type Change.DataType object
:return: change
"""
sys.excepthook = logger.handle_excepthook
try:
change = model_pb2.Change(
id=str(id),
data_type=data_type
)
return change
except Exception as exception:
return logger.logger_error(exception)
38 changes: 38 additions & 0 deletions indykite_sdk/ingest/stream_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from indykite_sdk.indykite.ingest.v1beta2 import ingest_api_pb2 as pb2
from indykite_sdk.model.ingest_record import StreamRecordsResponse, IngestRecordResponse
import sys
import indykite_sdk.utils.logger as logger


def generate_records_request(self, records):
"""
Create iterator for record requests
:param self:
:param records: list of records
:return: yield record_request
"""
"""Create iterator for record requests."""
for record in records:
record_request = pb2.StreamRecordsRequest(record=record)
yield record_request


def stream_records(self, records):
"""
send records in stream
:param self:
:param records: list of records
:return: list of deserialized StreamRecordsResponses
"""
sys.excepthook = logger.handle_excepthook
try:
record_iterator = self.generate_records_request(records)
response_iterator = self.stub.StreamRecords(record_iterator)
responses = list(response_iterator)
res = [StreamRecordsResponse.deserialize(response) for response in responses]
return res

except Exception as exception:
return logger.logger_error(exception)


0 comments on commit 1e094ee

Please sign in to comment.