From 2e9cb0ce084b1b77ad8f3d8d3f19b4288db7f739 Mon Sep 17 00:00:00 2001 From: Yun Kim <35776586+Yun-Kim@users.noreply.github.com> Date: Thu, 22 Dec 2022 06:43:05 -0500 Subject: [PATCH] fix(botocore): keep newlines in json data for kinesis records [backport #4700 to 1.7] (#4818) ## Description Backports #4700 to 1.7. During our serialization & de-serialization of kinesis records, any `\n` at the end of the record is stripped by the json library. Customer pipelines rely on this `\n` delimiter to distinguish between different records. This change appends the line break to the end of the record string if it was originally there. ## Checklist - [ ] Followed the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) when writing a release note. - [ ] Add additional sections for `feat` and `fix` pull requests. - [ ] [Library documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs) and/or [Datadog's documentation site](https://github.com/DataDog/documentation/) is updated. Link to doc PR in description. ## Motivation Add support for Databricks which relies on the `\n` delimiter to be present at the end of the record. Look at the related issue for more details. ## Design If the incoming record has a `\n` at the end of the string, we add the `\n` into the string post serialization since it was stripped during the de-serialization process by the json library. ## Testing strategy - two unit tests have been added ``` pytest -k test_kinesis_put_records_newline_base64_trace_injection tests/contrib/botocore/test.py pytest -k test_kinesis_put_records_newline_json_trace_injection tests/contrib/botocore/test.py ``` ## Relevant issue(s) Fixes #4317 ## Testing strategy ## Reviewer Checklist - [x] Title is accurate. - [x] Description motivates each change. - [x] No unnecessary changes were introduced in this PR. - [x] Avoid breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Tests provided or description of manual testing performed is included in the code or PR. - [x] Release note has been added and follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines), or else `changelog/no-changelog` label added. - [x] All relevant GitHub issues are correctly linked. - [x] Backports are identified and tagged with Mergifyio. Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Brett Langdon Co-authored-by: Tahir H. Butt Co-authored-by: Yun Kim <35776586+Yun-Kim@users.noreply.github.com> Co-authored-by: Munir Abdinur ## Description ## Checklist - [ ] Followed the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) when writing a release note. - [ ] Add additional sections for `feat` and `fix` pull requests. - [ ] [Library documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs) and/or [Datadog's documentation site](https://github.com/DataDog/documentation/) is updated. Link to doc PR in description. ## Motivation ## Design ## Testing strategy ## Relevant issue(s) ## Testing strategy ## Reviewer Checklist - [ ] Title is accurate. - [ ] Description motivates each change. - [ ] No unnecessary changes were introduced in this PR. - [ ] Avoid breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [ ] Tests provided or description of manual testing performed is included in the code or PR. - [ ] Release note has been added and follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines), or else `changelog/no-changelog` label added. - [ ] All relevant GitHub issues are correctly linked. - [ ] Backports are identified and tagged with Mergifyio. Co-authored-by: Harvinder Ghotra Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Brett Langdon Co-authored-by: Tahir H. Butt Co-authored-by: Munir Abdinur --- ddtrace/contrib/botocore/patch.py | 25 +++- ...-newline-strip-issue-3a2419b6f29fcab8.yaml | 4 + tests/contrib/botocore/test.py | 118 ++++++++++++++++++ 3 files changed, 143 insertions(+), 4 deletions(-) create mode 100644 releasenotes/notes/fix-kinesis-newline-strip-issue-3a2419b6f29fcab8.yaml diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index d0074db0875..5586f5402db 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -10,6 +10,7 @@ from typing import Dict from typing import List from typing import Optional +from typing import Tuple import botocore.client import botocore.exceptions @@ -45,6 +46,8 @@ MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB MAX_EVENTBRIDGE_DETAIL_SIZE = 1 << 18 # 256KB +LINE_BREAK = "\n" + log = get_logger(__name__) @@ -176,8 +179,17 @@ def inject_trace_to_eventbridge_detail(params, span): entry["Detail"] = detail_json +def get_json_from_str(data_str): + # type: (str) -> Tuple[str, Optional[Dict[str, Any]]] + data_obj = json.loads(data_str) + + if data_str.endswith(LINE_BREAK): + return LINE_BREAK, data_obj + return "", data_obj + + def get_kinesis_data_object(data): - # type: (str) -> Optional[Dict[str, Any]] + # type: (str) -> Tuple[str, Optional[Dict[str, Any]]] """ :data: the data from a kinesis stream @@ -190,13 +202,14 @@ def get_kinesis_data_object(data): # check if data is a json string try: - return json.loads(data) + return get_json_from_str(data) except ValueError: pass # check if data is a base64 encoded json string try: - return json.loads(base64.b64decode(data).decode("ascii")) + data_str = base64.b64decode(data).decode("ascii") + return get_json_from_str(data_str) except ValueError: raise TraceInjectionDecodingError("Unable to parse kinesis streams data string") @@ -216,11 +229,15 @@ def inject_trace_to_kinesis_stream_data(record, span): return data = record["Data"] - data_obj = get_kinesis_data_object(data) + line_break, data_obj = get_kinesis_data_object(data) data_obj["_datadog"] = {} HTTPPropagator.inject(span.context, data_obj["_datadog"]) data_json = json.dumps(data_obj) + # if original string had a line break, add it back + if line_break: + data_json += line_break + # check if data size will exceed max size with headers data_size = len(data_json) if data_size >= MAX_KINESIS_DATA_SIZE: diff --git a/releasenotes/notes/fix-kinesis-newline-strip-issue-3a2419b6f29fcab8.yaml b/releasenotes/notes/fix-kinesis-newline-strip-issue-3a2419b6f29fcab8.yaml new file mode 100644 index 00000000000..db5a811b567 --- /dev/null +++ b/releasenotes/notes/fix-kinesis-newline-strip-issue-3a2419b6f29fcab8.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + botocore: Before this change, the botocore integration stripped newlines from the JSON string encoded in the data blob of Amazon Kinesis records. This change includes a terminating newline if it is present in the decoded data. diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 41fb67825ec..7210e948f95 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1795,6 +1795,124 @@ def test_kinesis_put_records_base64_trace_injection(self): client.delete_stream(StreamName=stream_name) + @mock_kinesis + def test_kinesis_put_records_newline_json_trace_injection(self): + client = self.session.create_client("kinesis", region_name="us-east-1") + + stream_name = "test" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + partition_key = "1234" + data = [ + {"Data": json.dumps({"Hello": "World"}) + "\n", "PartitionKey": partition_key}, + {"Data": json.dumps({"foo": "bar"}) + "\n", "PartitionKey": partition_key}, + ] + + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(client) + client.put_records(StreamName=stream_name, Records=data) + + # check if the appropriate span was generated + spans = self.get_spans() + assert spans + span = spans[0] + assert len(spans) == 1 + assert span.get_tag("aws.region") == "us-east-1" + assert span.get_tag("aws.operation") == "PutRecords" + assert span.get_tag("params.MessageBody") is None + assert_is_measured(span) + assert_span_http_status_code(span, 200) + assert span.service == "test-botocore-tracing.kinesis" + assert span.resource == "kinesis.putrecords" + records = span.get_tag("params.Records") + assert records is None + + resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON") + shard_iterator = resp["ShardIterator"] + + # ensure headers are present in received message + resp = client.get_records(ShardIterator=shard_iterator) + assert len(resp["Records"]) == 2 + records = resp["Records"] + record = records[0] + data_str = record["Data"].decode("ascii") + assert data_str.endswith("\n") + data = json.loads(data_str) + headers = data["_datadog"] + assert headers is not None + assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) + assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) + + record = records[1] + data_str = record["Data"].decode("ascii") + assert data_str.endswith("\n") + data = json.loads(data_str) + assert "_datadog" not in data + + client.delete_stream(StreamName=stream_name) + + @mock_kinesis + def test_kinesis_put_records_newline_base64_trace_injection(self): + client = self.session.create_client("kinesis", region_name="us-east-1") + + stream_name = "test" + client.create_stream(StreamName=stream_name, ShardCount=1) + stream = client.describe_stream(StreamName=stream_name)["StreamDescription"] + shard_id = stream["Shards"][0]["ShardId"] + + partition_key = "1234" + sample_string = json.dumps({"Hello": "World"}) + "\n" + sample_string_bytes = sample_string.encode("ascii") + base64_bytes = base64.b64encode(sample_string_bytes) + data_str = base64_bytes.decode("ascii") + data = [ + {"Data": data_str, "PartitionKey": partition_key}, + {"Data": data_str, "PartitionKey": partition_key}, + ] + + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(client) + client.put_records(StreamName=stream_name, Records=data) + + # check if the appropriate span was generated + spans = self.get_spans() + assert spans + span = spans[0] + assert len(spans) == 1 + assert span.get_tag("aws.region") == "us-east-1" + assert span.get_tag("aws.operation") == "PutRecords" + assert span.get_tag("params.MessageBody") is None + assert_is_measured(span) + assert_span_http_status_code(span, 200) + assert span.service == "test-botocore-tracing.kinesis" + assert span.resource == "kinesis.putrecords" + records = span.get_tag("params.Records") + assert records is None + + resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON") + shard_iterator = resp["ShardIterator"] + + # ensure headers are present in received message + resp = client.get_records(ShardIterator=shard_iterator) + assert len(resp["Records"]) == 2 + records = resp["Records"] + record = records[0] + data_str = record["Data"].decode("ascii") + assert data_str.endswith("\n") + data = json.loads(data_str) + headers = data["_datadog"] + assert headers is not None + assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) + assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) + + record = records[1] + data_str = base64.b64decode(record["Data"]).decode("ascii") + assert data_str.endswith("\n") + data = json.loads(data_str) + assert "_datadog" not in data + + client.delete_stream(StreamName=stream_name) + @unittest.skipIf(PY2, "Skipping for Python 2.7 since older moto doesn't support secretsmanager") def test_secretsmanager(self): from moto import mock_secretsmanager