Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adressing Out Of Order Processing via a datastore solution #265

Open
aarengee opened this issue Jun 17, 2022 · 6 comments
Open

Adressing Out Of Order Processing via a datastore solution #265

aarengee opened this issue Jun 17, 2022 · 6 comments

Comments

@aarengee
Copy link

aarengee commented Jun 17, 2022

Problem and context

I can explain a OutOfOrder Processing problem using an example. Assume the following is the timeline of events in a kafka topic ->

Message Timestamp Key ( here order id) Record
M1 10:00 pm 123 ORDER_CREATED
M2 10:01 pm 123 ORDER_MODIFIED
M3 10:02 pm 123 ORDER_CANCELLED
M4 10:01 pm 456 ORDER_CANCELLED

Chances of Out of Order Processing will happen when a consumer

  • consumes M1 and commits it
  • consumes M2 but fails to process it and pushes it to DLQ
  • consumes M3 and commits it

Now someone cannot replay the DLQ as if they do M2 might get processed and that's a issue cause you cant have a cancelled order modified . It should be handled in the consumer but then all these rules need to be written in the consumer to detail out an acceptable sequence of events which might not be feasible.

Kafka guarantees that messages pushed in the same partition will always be delivered in order of pushing . Here messages for a single order are being pushed to same partition as the key of the record is order_id . Kafka wants you to segment your data this way as well . After all, this allows for in order processing .

However it is not always necessary that developers push the change event for the same resource in the same partition and instead choose to generate a unique key for each kafka record . Usecase being to even out the partitions and not have one partition completely blocked if one message for a order is creating a problem .

Solution

A universal solution is for all consumers is to remember when was the last seen timestamp for each order . So for above example said store would look like

Last seen timestamp Key
10:02 pm 123
10:07 pm 456

Every new message that comes into the queue makes an entry in the table and based on its staleness we take an action. Lets say since M2 with timestamp 10:01 arrived after M3 , it can be skipped or handled differently post comparison with this table ( table shows that we already processed a later message so this message M2 is stale event ).

Proposal

We are proposing an optional middle-ware using which a user can get in there message-metadata in the mapper function whether or not the message is stale. ( behind latest processed message)

The function would take in a message , extract the key ( userid/orderid/driverid ) from it and upsert into the store the id and timestamp it has against itself. Every time a new message is processed it would compare timestamps and add a is-stale? key to message-metadata persistentMap. If key is not present in the store i.e. the message is seen for the first time OR message is fresh i.e. timestamp greater than present timestamp in store it will just upsert the entry ( id ,timestamp) in the store.

We would be using redis as a data store . We would need four configs from the user

Config Purpose Sample Value
ZIGGURAT_STREAM_OOO_ENABLED Enable/Disable the feature true
ZIGGURAT_STREAM_OOO_UNIQUE_ID_PATH How to extract key from your message in this stream [:key :order :id]
ZIGGURAT_STREAM_OOO_PRODUCED_TIMESTAMP_PATH How to extract timestamp from your message in this stream [:key :timestamp]
ZIGGURAT_STREAM_OOO_REDIS_HOSTNAME Hostname of redis , port is 6379 by default 13.90.78.6
ZIGGURAT_STREAM_OOO_REDIS_TTL TTL of each record in seconds 86400

For any issues do reach out .

Thanks,
Rijul

@punit-kulal
Copy link

However it is not always necessary that developers push the change event for the same resource in the same partition and instead choose to generate a unique key for each kafka record . Usecase being to even out the partitions and not have one partition completely blocked if one message for a order is creating a problem .

Won't other messages (not necessarily of the same order Id) be blocked in the partition even if we use a unique key?? Shouldn't we be offloading such poison messages into a DLQ(which ziggurat already offers) according to a timeout. This would ensure the blocking can be scoped according to expected SLA.

Also, what happens if there's a timeout or degradation on redis end (although not common). Will the message be treated as a successfully processed message or not...?

@punit-kulal
Copy link

Also, what advantage does native ziggurat support for OOO offer except for DRY.

@aarengee
Copy link
Author

Also, what happens if there's a timeout or degradation on redis end (although not common). Will the message be treated as a successfully processed message or not...?

We are thinking of having a guarantee that is-stale? will deliver three statuses -> true / false / failed . The third status implies that we couldn't determine the stalesness of the message in which case it is upto user to choose behavior.

Also, what advantage does native ziggurat support for OOO offer except for DRY.

DRY is one of the motivations definitely and a big one . However another one is a usecase that I often see in our org .

A lot of services using ziggurat are written in a way where we treat the kafka messages as a notification-log/ticker , call the api of the same service when we get message to get the information it needs to process the message . We do this even if the message contains everything the actor needs to process the message .

This causes a perf problem where the service first constructs the message and sends it in the kafka event log but the actor calls it again hence forcing it to reconstruct the message by recomputing the same info .

Why do developers do this ?

This is mostly to protect against out of order in case of retrying messages or replaying messages from DLQ . Calling the originating/producer service always ensures that you get latest state of resource. Two solutions to it are :

  • There is a way to distinguish messages that are retried/replayed in the mapper function from the messages coming in kafka topic . We can then write code in mapper fxn so that we treat these messages separately ( ignore or call service only for these ). these meaning DLQ messages.
  • OR Actor maintains a store of <ID,last_seen_timestamp> and then for messages < last_seen_timestamp message is ignored / service is called. People dont implement this as this means having to integrate with a data base and maintaining state in an otherwise stateless system and what if the ID of the message is order id then you have to purge as well.

The idea is for ziggurat to provide this small middleware so that the only thing devs need to do is create / maintain the infra as well as fill in the configs.

@punit-kulal
Copy link

punit-kulal commented Jun 20, 2022

We are thinking of having a guarantee that is-stale? will deliver three statuses -> true / false / failed . The third status implies that we couldn't determine the stalesness of the message in which case it is upto user to choose behavior.
I meant while updating the timestamp in redis. If the set request times out, the timestamp itself in the remote store would be incorrect. And future messages won't know that the redis is technically stale unless we have unlimited retries, which in turn results in mapper time spike and in turn causes lag.

In principle, I think this arises because we are now keeping the state of truth in two places

  1. The actual source of truth.
  2. The redis timestamp for is stale feature.
    And the two places don't have a transactional mechanism.

@prathik
Copy link

prathik commented Aug 2, 2022

Which timestamp are we talking about here? Timestamps might not be strictly increasing as well (given event in same millisecond) and subject to clock-skews. Thus could be better to use offset number and the partition key as a combination.

@aarengee
Copy link
Author

aarengee commented Aug 2, 2022

Which timestamp are we talking about here?

Server time

yes it is subject to the problem you are mentioning , also same millisecond message for the same user is a very real possibility . yes .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants