Skip to content

Commit

Permalink
fix: return response in stream_records()
Browse files Browse the repository at this point in the history
  • Loading branch information
martasd committed Sep 6, 2022
1 parent c415675 commit 6c4f09d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def change_password(self, token, new_password):
### Ingest records

```python
def ingest_records(self, config_id, records):
def stream_records(self, config_id, records):
record_iterator = self.generate_records_request(config_id, records)
response_iterator = self.stub.StreamRecords(record_iterator)

Expand Down
12 changes: 9 additions & 3 deletions jarvis_sdk/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ def generate_records_request(self, config_id, records):
record_request = pb2.StreamRecordsRequest(mapping_config_id=config_id, record=record)
yield record_request

def ingest_records(self, config_id, records):
def stream_records(self, config_id, records):
record_iterator = self.generate_records_request(config_id, records)
response_iterator = self.stub.StreamRecords(record_iterator)
responses = []

try:
for response in response_iterator:
print(response)
if not response.record_error.property_errors:
print(f"Record {response.record_id} ingested successfully")
else:
print(f"Record {response.record_id} has errors: \n{response.record_error}")

responses.append(response)
except Exception as exception:
print(exception)
return None

return "Data has been ingested successfully"
return responses
67 changes: 48 additions & 19 deletions tests/test_stream_records.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest

from jarvis_sdk.ingest import IngestClient
from jarvis_sdk.indykite.ingest.v1beta1 import model_pb2
from jarvis_sdk.indykite.ingest.v1beta1 import model_pb2, ingest_api_pb2 as pb2
from jarvis_sdk.indykite.objects.v1beta1 import struct_pb2
from tests.helpers import data

Expand All @@ -16,31 +18,45 @@ def test_stream_record_nonexisting_config_id(capsys):
client = IngestClient()
assert client is not None

client.ingest_records(config_id, [record])
client.stream_records(config_id, [record])
captured = capsys.readouterr()

assert "Ingest mapping config gid:AAAAFBtaAlxjDE8GuIWAPEFoSPs not found" in captured.out
assert "Ingest mapping config 'gid:AAAAFBtaAlxjDE8GuIWAPEFoSPs' not found" in captured.out


def test_stream_record_wrong_external_id(capsys):
def test_stream_record_error():
config_id = data.get_config_id()
record_data = {
"playerId": struct_pb2.Value(string_value="125"),
"firstname": struct_pb2.Value(string_value="Marius"),
"gender": struct_pb2.Value(string_value="m"),
}
record = model_pb2.Record(id="2", external_id="wrongId", data=record_data)
record = model_pb2.Record(id="2", external_id="playerId", data=record_data)

client = IngestClient()
assert client is not None

client.ingest_records(config_id, [record])
captured = capsys.readouterr()
def mocked_stream_records(request_iter: pb2.StreamRecordsRequest):
for request in request_iter:
error = model_pb2.PropertyError()
error.messages.append("problem")

yield pb2.StreamRecordsResponse(
record_id=request.record.id,
record_error=model_pb2.RecordError(property_errors={"reason": error}),
)

client.stub.StreamRecords = mocked_stream_records
responses = client.stream_records(config_id, [record])
head, *tail = responses

assert "found no matching ingest mapping entity for the record external_id: wrongId" in captured.out
assert head.record_id is "2"
assert len(head.record_error.property_errors) is 1
assert head.record_error.property_errors["reason"].messages == ["problem"]
assert tail == []


def test_stream_record_success(capsys):
def test_stream_record_success():
config_id = data.get_config_id()
record_data = {
"playerId": struct_pb2.Value(string_value="125"),
Expand All @@ -52,13 +68,20 @@ def test_stream_record_success(capsys):
client = IngestClient()
assert client is not None

client.ingest_records(config_id, [record])
captured = capsys.readouterr()
def mocked_stream_records(request_iter: pb2.StreamRecordsRequest):
for request in request_iter:
yield pb2.StreamRecordsResponse(record_id=request.record.id, record_error=model_pb2.RecordError())

client.stub.StreamRecords = mocked_stream_records
responses = client.stream_records(config_id, [record])
head, tail = responses[0], responses[1:]

assert 'record_id: "2"\nrecord_error {\n}\n\n' == captured.out
assert head.record_id is "2"
assert len(head.record_error.property_errors) is 0
assert tail == []


def test_stream_multiple_records_success(capsys):
def test_stream_multiple_records_success():
config_id = data.get_config_id()
record1_data = {
"playerId": struct_pb2.Value(string_value="125"),
Expand All @@ -76,10 +99,16 @@ def test_stream_multiple_records_success(capsys):
client = IngestClient()
assert client is not None

client.ingest_records(config_id, [record1, record2])
captured = capsys.readouterr()
def mocked_stream_records(request_iter: pb2.StreamRecordsRequest):
for request in request_iter:
yield pb2.StreamRecordsResponse(record_id=request.record.id, record_error=model_pb2.RecordError())

client.stub.StreamRecords = mocked_stream_records
responses = client.stream_records(config_id, [record1, record2])
first, second, tail = responses[0], responses[1], responses[2:]

assert (
'record_id: "1"\nrecord_error {\n}\n\nrecord_id: "2"\nrecord_index: 1\nrecord_error {\n}\n\n'
== captured.out
)
assert first.record_id is "1"
assert len(first.record_error.property_errors) is 0
assert second.record_id is "2"
assert len(second.record_error.property_errors) is 0
assert tail == []

0 comments on commit 6c4f09d

Please sign in to comment.