Skip to content

Commit

Permalink
feat: Add optional received_p99 timestamp to commit log (#295)
Browse files Browse the repository at this point in the history
The value from the received field can be used in the future for subscription scheduling if this is provided. This is better than the `orig_message_ts` field as `received` is assigned at the very start of the pipeline when Sentry receives the event (as opposed to when Snuba gets the event). Switching to this field means any delays in ingestion will be properly accounted for when determining the window on which to schedule subscriptions.

This PR also:
- deprecates the legacy decoder since we have fully switched over to the new format
- switches orig_message_ts from datetime to float. Converting between the two in encode/decode is pointless, and it introduces the possibility of timezone issues. Simpler to just keep it a unix timestamp everywhere.
  • Loading branch information
lynnagara authored Oct 18, 2023
1 parent 2da7f70 commit aa6bd34
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 46 deletions.
41 changes: 9 additions & 32 deletions arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import json
from datetime import datetime

from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic
from arroyo.utils.codecs import Codec

# Kept in decode method for backward compatibility. Will be
# remove in a future release of Arroyo
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class CommitCodec(Codec[KafkaPayload, Commit]):
def encode(self, value: Commit) -> KafkaPayload:
Expand All @@ -18,7 +13,8 @@ def encode(self, value: Commit) -> KafkaPayload:
payload = json.dumps(
{
"offset": value.offset,
"orig_message_ts": datetime.timestamp(value.orig_message_ts),
"orig_message_ts": value.orig_message_ts,
"received_p99": value.received_p99,
}
).encode("utf-8")

Expand All @@ -30,28 +26,6 @@ def encode(self, value: Commit) -> KafkaPayload:
[],
)

def decode_legacy(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
raise TypeError("payload key must be a bytes object")

val = value.value
if not isinstance(val, bytes):
raise TypeError("payload value must be a bytes object")

headers = {k: v for (k, v) in value.headers}
orig_message_ts = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
return Commit(
group,
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts,
)

def decode(self, value: KafkaPayload) -> Commit:
key = value.key
if not isinstance(key, bytes):
Expand All @@ -63,12 +37,14 @@ def decode(self, value: KafkaPayload) -> Commit:

payload = val.decode("utf-8")

if payload.isnumeric():
return self.decode_legacy(value)

decoded = json.loads(payload)
offset = decoded["offset"]
orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"])
orig_message_ts = decoded["orig_message_ts"]

if decoded.get("received_p99"):
received_ts = decoded["received_p99"]
else:
received_ts = None

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)

Expand All @@ -77,4 +53,5 @@ def decode(self, value: KafkaPayload) -> Commit:
Partition(Topic(topic_name), int(partition_index)),
offset,
orig_message_ts,
received_ts,
)
6 changes: 3 additions & 3 deletions arroyo/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Mapping, MutableMapping, Optional

from arroyo.types import Partition
Expand Down Expand Up @@ -61,9 +60,10 @@ def did_commit(self, now: float, offsets: Mapping[Partition, int]) -> None:

@dataclass(frozen=True)
class Commit:
__slots__ = ["group", "partition", "offset", "orig_message_ts"]
__slots__ = ["group", "partition", "offset", "orig_message_ts", "received_p99"]

group: str
partition: Partition
offset: int
orig_message_ts: datetime
orig_message_ts: float
received_p99: Optional[float]
14 changes: 5 additions & 9 deletions tests/backends/test_commit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime
import time

from arroyo.backends.kafka.commit import CommitCodec
from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import Commit
from arroyo.types import Partition, Topic

Expand All @@ -12,19 +11,16 @@ def test_encode_decode() -> None:

offset_to_commit = 5

now = time.time()

commit = Commit(
"leader-a",
Partition(topic, 0),
offset_to_commit,
datetime.now(),
now,
now - 5,
)

encoded = commit_codec.encode(commit)

assert commit_codec.decode(encoded) == commit

def test_decode_legacy() -> None:
legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')])
decoded = CommitCodec().decode(legacy)
assert decoded.offset == 5
assert decoded.group == "leader-a"
6 changes: 4 additions & 2 deletions tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import itertools
import os
import pickle
import time
import uuid
from contextlib import closing
from datetime import datetime
from pickle import PickleBuffer
from typing import Any, Iterator, Mapping, MutableSequence, Optional
from unittest import mock
Expand Down Expand Up @@ -195,7 +195,9 @@ def test_consumer_stream_processor_shutdown(self) -> None:


def test_commit_codec() -> None:
commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now())
commit = Commit(
"group", Partition(Topic("topic"), 0), 0, time.time(), time.time() - 5
)
assert commit_codec.decode(commit_codec.encode(commit)) == commit


Expand Down

0 comments on commit aa6bd34

Please sign in to comment.