Skip to content

Commit

Permalink
fix(botocore): keep newlines in json data for kinesis records [backport
Browse files Browse the repository at this point in the history
#4700 to 1.7] (#4818)

## Description
Backports #4700 to 1.7.

<!-- If this is a breaking change, explain why it is necessary. Breaking
changes must append `!` after the type/scope. See
https://ddtrace.readthedocs.io/en/stable/contributing.html for more
details. -->

During our serialization & de-serialization of kinesis records, any `\n`
at the end of the record is stripped by the json library. Customer
pipelines rely on this `\n` delimiter to distinguish between different
records. This change appends the line break to the end of the record
string if it was originally there.

## Checklist
- [ ] Followed the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines)
when writing a release note.
- [ ] Add additional sections for `feat` and `fix` pull requests.
- [ ] [Library
documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs)
and/or [Datadog's documentation
site](https://github.com/DataDog/documentation/) is updated. Link to doc
PR in description.

<!-- Copy and paste the relevant snippet based on the type of pull
request -->

<!-- START feat -->

## Motivation
<!-- Expand on why the change is required, include relevant context for
reviewers -->

Add support for Databricks which relies on the `\n` delimiter to be
present at the end of the record. Look at the related issue for more
details.

## Design 
<!-- Include benefits from the change as well as possible drawbacks and
trade-offs -->
If the incoming record has a `\n` at the end of the string, we add the
`\n` into the string post serialization since it was stripped during the
de-serialization process by the json library.

## Testing strategy
<!-- Describe the automated tests and/or the steps for manual testing.

<!-- END feat -->

- two unit tests have been added

```
pytest -k test_kinesis_put_records_newline_base64_trace_injection tests/contrib/botocore/test.py
pytest -k test_kinesis_put_records_newline_json_trace_injection tests/contrib/botocore/test.py
```

<!-- START fix -->

## Relevant issue(s)
<!-- Link the pull request to any issues related to the fix. Use
keywords for links to automate closing the issues once the pull request
is merged. -->

Fixes #4317

## Testing strategy
<!-- Describe any added regression tests and/or the manual testing
performed. -->

<!-- END fix -->

## Reviewer Checklist
- [x] Title is accurate.
- [x] Description motivates each change.
- [x] No unnecessary changes were introduced in this PR.
- [x] Avoid breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Tests provided or description of manual testing performed is
included in the code or PR.
- [x] Release note has been added and follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines),
or else `changelog/no-changelog` label added.
- [x] All relevant GitHub issues are correctly linked.
- [x] Backports are identified and tagged with Mergifyio.

Co-authored-by: mergify[bot]
<37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Brett Langdon <brett.langdon@datadoghq.com>
Co-authored-by: Tahir H. Butt <tahir.butt@datadoghq.com>
Co-authored-by: Yun Kim <35776586+Yun-Kim@users.noreply.github.com>
Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com>

## Description
<!-- Briefly describe the change and why it was required. -->

<!-- If this is a breaking change, explain why it is necessary. Breaking
changes must append `!` after the type/scope. See
https://ddtrace.readthedocs.io/en/stable/contributing.html for more
details. -->

## Checklist
- [ ] Followed the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines)
when writing a release note.
- [ ] Add additional sections for `feat` and `fix` pull requests.
- [ ] [Library
documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs)
and/or [Datadog's documentation
site](https://github.com/DataDog/documentation/) is updated. Link to doc
PR in description.

<!-- Copy and paste the relevant snippet based on the type of pull
request -->

<!-- START feat -->

## Motivation
<!-- Expand on why the change is required, include relevant context for
reviewers -->

## Design 
<!-- Include benefits from the change as well as possible drawbacks and
trade-offs -->

## Testing strategy
<!-- Describe the automated tests and/or the steps for manual testing.

<!-- END feat -->

<!-- START fix -->

## Relevant issue(s)
<!-- Link the pull request to any issues related to the fix. Use
keywords for links to automate closing the issues once the pull request
is merged. -->

## Testing strategy
<!-- Describe any added regression tests and/or the manual testing
performed. -->

<!-- END fix -->

## Reviewer Checklist
- [ ] Title is accurate.
- [ ] Description motivates each change.
- [ ] No unnecessary changes were introduced in this PR.
- [ ] Avoid breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [ ] Tests provided or description of manual testing performed is
included in the code or PR.
- [ ] Release note has been added and follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines),
or else `changelog/no-changelog` label added.
- [ ] All relevant GitHub issues are correctly linked.
- [ ] Backports are identified and tagged with Mergifyio.

Co-authored-by: Harvinder Ghotra <ghotra.harvinder@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Brett Langdon <brett.langdon@datadoghq.com>
Co-authored-by: Tahir H. Butt <tahir.butt@datadoghq.com>
Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com>
  • Loading branch information
6 people committed Dec 22, 2022
1 parent 3ac20fa commit 2e9cb0c
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 4 deletions.
25 changes: 21 additions & 4 deletions ddtrace/contrib/botocore/patch.py
Expand Up @@ -10,6 +10,7 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import botocore.client
import botocore.exceptions
Expand Down Expand Up @@ -45,6 +46,8 @@
MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB
MAX_EVENTBRIDGE_DETAIL_SIZE = 1 << 18 # 256KB

LINE_BREAK = "\n"

log = get_logger(__name__)


Expand Down Expand Up @@ -176,8 +179,17 @@ def inject_trace_to_eventbridge_detail(params, span):
entry["Detail"] = detail_json


def get_json_from_str(data_str):
# type: (str) -> Tuple[str, Optional[Dict[str, Any]]]
data_obj = json.loads(data_str)

if data_str.endswith(LINE_BREAK):
return LINE_BREAK, data_obj
return "", data_obj


def get_kinesis_data_object(data):
# type: (str) -> Optional[Dict[str, Any]]
# type: (str) -> Tuple[str, Optional[Dict[str, Any]]]
"""
:data: the data from a kinesis stream
Expand All @@ -190,13 +202,14 @@ def get_kinesis_data_object(data):

# check if data is a json string
try:
return json.loads(data)
return get_json_from_str(data)
except ValueError:
pass

# check if data is a base64 encoded json string
try:
return json.loads(base64.b64decode(data).decode("ascii"))
data_str = base64.b64decode(data).decode("ascii")
return get_json_from_str(data_str)
except ValueError:
raise TraceInjectionDecodingError("Unable to parse kinesis streams data string")

Expand All @@ -216,11 +229,15 @@ def inject_trace_to_kinesis_stream_data(record, span):
return

data = record["Data"]
data_obj = get_kinesis_data_object(data)
line_break, data_obj = get_kinesis_data_object(data)
data_obj["_datadog"] = {}
HTTPPropagator.inject(span.context, data_obj["_datadog"])
data_json = json.dumps(data_obj)

# if original string had a line break, add it back
if line_break:
data_json += line_break

# check if data size will exceed max size with headers
data_size = len(data_json)
if data_size >= MAX_KINESIS_DATA_SIZE:
Expand Down
@@ -0,0 +1,4 @@
---
fixes:
- |
botocore: Before this change, the botocore integration stripped newlines from the JSON string encoded in the data blob of Amazon Kinesis records. This change includes a terminating newline if it is present in the decoded data.
118 changes: 118 additions & 0 deletions tests/contrib/botocore/test.py
Expand Up @@ -1795,6 +1795,124 @@ def test_kinesis_put_records_base64_trace_injection(self):

client.delete_stream(StreamName=stream_name)

@mock_kinesis
def test_kinesis_put_records_newline_json_trace_injection(self):
client = self.session.create_client("kinesis", region_name="us-east-1")

stream_name = "test"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]

partition_key = "1234"
data = [
{"Data": json.dumps({"Hello": "World"}) + "\n", "PartitionKey": partition_key},
{"Data": json.dumps({"foo": "bar"}) + "\n", "PartitionKey": partition_key},
]

Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(client)
client.put_records(StreamName=stream_name, Records=data)

# check if the appropriate span was generated
spans = self.get_spans()
assert spans
span = spans[0]
assert len(spans) == 1
assert span.get_tag("aws.region") == "us-east-1"
assert span.get_tag("aws.operation") == "PutRecords"
assert span.get_tag("params.MessageBody") is None
assert_is_measured(span)
assert_span_http_status_code(span, 200)
assert span.service == "test-botocore-tracing.kinesis"
assert span.resource == "kinesis.putrecords"
records = span.get_tag("params.Records")
assert records is None

resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON")
shard_iterator = resp["ShardIterator"]

# ensure headers are present in received message
resp = client.get_records(ShardIterator=shard_iterator)
assert len(resp["Records"]) == 2
records = resp["Records"]
record = records[0]
data_str = record["Data"].decode("ascii")
assert data_str.endswith("\n")
data = json.loads(data_str)
headers = data["_datadog"]
assert headers is not None
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)

record = records[1]
data_str = record["Data"].decode("ascii")
assert data_str.endswith("\n")
data = json.loads(data_str)
assert "_datadog" not in data

client.delete_stream(StreamName=stream_name)

@mock_kinesis
def test_kinesis_put_records_newline_base64_trace_injection(self):
client = self.session.create_client("kinesis", region_name="us-east-1")

stream_name = "test"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]

partition_key = "1234"
sample_string = json.dumps({"Hello": "World"}) + "\n"
sample_string_bytes = sample_string.encode("ascii")
base64_bytes = base64.b64encode(sample_string_bytes)
data_str = base64_bytes.decode("ascii")
data = [
{"Data": data_str, "PartitionKey": partition_key},
{"Data": data_str, "PartitionKey": partition_key},
]

Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(client)
client.put_records(StreamName=stream_name, Records=data)

# check if the appropriate span was generated
spans = self.get_spans()
assert spans
span = spans[0]
assert len(spans) == 1
assert span.get_tag("aws.region") == "us-east-1"
assert span.get_tag("aws.operation") == "PutRecords"
assert span.get_tag("params.MessageBody") is None
assert_is_measured(span)
assert_span_http_status_code(span, 200)
assert span.service == "test-botocore-tracing.kinesis"
assert span.resource == "kinesis.putrecords"
records = span.get_tag("params.Records")
assert records is None

resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON")
shard_iterator = resp["ShardIterator"]

# ensure headers are present in received message
resp = client.get_records(ShardIterator=shard_iterator)
assert len(resp["Records"]) == 2
records = resp["Records"]
record = records[0]
data_str = record["Data"].decode("ascii")
assert data_str.endswith("\n")
data = json.loads(data_str)
headers = data["_datadog"]
assert headers is not None
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)

record = records[1]
data_str = base64.b64decode(record["Data"]).decode("ascii")
assert data_str.endswith("\n")
data = json.loads(data_str)
assert "_datadog" not in data

client.delete_stream(StreamName=stream_name)

@unittest.skipIf(PY2, "Skipping for Python 2.7 since older moto doesn't support secretsmanager")
def test_secretsmanager(self):
from moto import mock_secretsmanager
Expand Down

0 comments on commit 2e9cb0c

Please sign in to comment.