Skip to content

Commit

Permalink
feat: Temporarily support missing orig_message_ts header for rollout (
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Sep 20, 2021
1 parent 37c48ee commit d2153ea
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog and versioning

## Unreleased

- Handle missing `orig_message_ts` header. Since all events in the pipeline produced using an older version of arroyo may not have the header yet, temporarily support a None value for `orig_message_ts`.

## 0.0.3

- Replaces Offset in consumer and processing strategy with Position, which contains both offset and timestamp information. `stage_offsets` is now `stage_positions` and `commit_offsets` is now `commit_positions`, and now includes the timestamp.
Expand Down
13 changes: 9 additions & 4 deletions arroyo/synchronized.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ class Commit:
group: str
partition: Partition
offset: int
orig_message_ts: datetime
orig_message_ts: Optional[datetime]


class CommitCodec(Codec[KafkaPayload, Commit]):
def encode(self, value: Commit) -> KafkaPayload:
assert value.orig_message_ts is not None

return KafkaPayload(
f"{value.partition.topic.name}:{value.partition.index}:{value.group}".encode(
"utf-8"
Expand All @@ -53,9 +55,12 @@ def decode(self, value: KafkaPayload) -> Commit:
raise TypeError("payload value must be a bytes object")

headers = {k: v for (k, v) in value.headers}
orig_message_ts = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
try:
orig_message_ts: Optional[datetime] = datetime.strptime(
headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT
)
except KeyError:
orig_message_ts = None

topic_name, partition_index, group = key.decode("utf-8").split(":", 3)
offset = int(val.decode("utf-8"))
Expand Down

0 comments on commit d2153ea

Please sign in to comment.