Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle late arriving events #25

Closed
pnadolny13 opened this issue Mar 1, 2023 · 5 comments · Fixed by #26
Closed

Handle late arriving events #25

pnadolny13 opened this issue Mar 1, 2023 · 5 comments · Fixed by #26
Assignees

Comments

@pnadolny13
Copy link
Collaborator

pnadolny13 commented Mar 1, 2023

I'm noticing that even if everything looks to function as expected on the tap side I'm still getting very slight diffs between the source and the warehouse. It looks like we're querying near realtime logs at the end of a sync and then bookmarking the latest timestamp we receive and I suspect some logs with earlier timestamp are arriving after the query is run. Then subsequent queries assume all prior logs have been synced so those late arriving logs are outside the next filter range and never get replicated. I've only seen this affect records within a few mins (<5 mins).

For my sync I see the following in my logs:

  1. 2023-02-28, 06:02:00 UTC - Submit Query (with filter range 2023-02-28T06:00:38 UTC - 2023-02-28T07:00:37 UTC)
  2. 2023-02-28, 06:06:20 - Get Results (679 Received)
  3. State emitted {"bookmarks": {"log": {"replication_key": "timestamp", "replication_key_value": "2023-02-28 06:01:09.040"}}}
  4. Next Run - 2023-03-01, 06:00:47 UTC "Submitting query for batch from: 2023-02-28T06:01:09 UTC - 2023-02-28T07:01:09 UTC"

This indicates to me that the replication key is being properly tracked and passed to the next run.

When I diff at the minute level right around the replication key (2023-02-28T06:01:09 UTC) range I see:

  • 2023-02-27 5:59:00 - 33 missing (58% of that minute)
  • 2023-02-27 6:00:00 - 1933 missing (47% of that minute)

Potential solutions:

  1. Dont sync up to real time. We set the end_time of the query to end_time = datetime.now(timezone.utc).timestamp() right now then overshoot it because we use a batch windowing mechanism to avoid the 10k limit. This ends up submitting a query like shown above where the end timestamp was probably somewhere around 2023-02-28T06:03: UTC but the window ends in 2023-02-28T07:00:37 UTC. This aims to get real time logs and overshoots the end time to do so. We might not be able to rely on the logs being "complete" in real time.
  2. Add a look back grace period ~5 mins default. On the next sync we start the query window at replication key value minus 5 mins.

I prefer solution 1 but it has 2 potential implementations: either dont overshoot the end timestamp which could solve the problem but I'd guess probably not and the other is to make the tap not query up to real time data by limiting the end time to now minus 5 mins (or 10 mins). Personally I'd be fine with giving cloudwatch a buffer of a few mins to allow late arriving logs to show up, so if you were trying to use this tap to sync in real time it would be on a 5 mins delay. We could make this a configurable parameter.

@pnadolny13 pnadolny13 self-assigned this Mar 1, 2023
@pnadolny13
Copy link
Collaborator Author

@aaronsteers do you have any thoughts on this? Do the issue make sense to you? I wasnt able to find anything in the cloudwatch docs that was helpful with this.

@aaronsteers
Copy link

@pnadolny13 - Yeah, this makes sense. Thanks for raising.

Your writeup is very well done here and very clear.

In general scenarios, we don't want to prematurely limit records to be exclusive of the last time interval (like the last 5 minutes) because when records get updated frequently, they can just always move into the latest bucket and therefor never be synced (or more realistically, they would be conspicuously absent for >1 sync operation where they otherwise should have been included).

In the above scenario though, updates are not a thing we need to worry about, since each line is immutable and will not be modified or be "moved" into another time window.

For this reason, I lean towards your suggestion using an arbitrary constraint to exclude records from being emitted if they are within the defined cool-off period of 1-5 minutes.

Duplicate records are tolerated by the spec, of course, but they are expensive to manage in practical use cases. For this reason, I'd vote to not just keep a conservative bookmark (in which case the records would still come through), but actually to filter the records and keep the conservative bookmark limit.

@aaronsteers
Copy link

aaronsteers commented Mar 1, 2023

Another implementation choice, If I understand the comment quoted below, would be to change the Stream behavior to use is_sorted=False. Essentially, that would turn on a host of other features that solve the fundamental problem of "the records I'm seeing in this order aren't all the records up to this time."

This makes streams non-resumable on interrupt, unless you do extra work to manage state finalization, and it uses the internal signpost feature to basically say that the bookmark is not allowed to advance past the moment in time that the sync operation began.

I can't say that this fully solves the issue, but it addresses the "overshooting the end time" you mention here:

We set the end_time of the query to end_time = datetime.now(timezone.utc).timestamp() right now then overshoot it because we use a batch windowing mechanism to avoid the 10k limit. This ends up submitting a query like shown above where the end timestamp was probably somewhere around 2023-02-28T06:03: UTC but the window ends in 2023-02-28T07:00:37 UTC. This aims to get real time logs and overshoots the end time to do so.

If it fixes the issue, this is the "easiest" solution because it uses all out-of-box capabilities in the SDK. But you'll still get significant record duplication if you don't block records from emitting, because that generic behavior is built for scenarios where updates might occur, and as such, the records themselves are allowed to be emitted, even past the point where the bookmark is allowed by the signpost to advance.

@pnadolny13
Copy link
Collaborator Author

@aaronsteers thanks for your thoughts, that was super helpful!

For this reason, I lean towards your suggestion using an arbitrary constraint to exclude records from being emitted if they are within the defined cool-off period of 1-5 minutes.

Duplicate records are tolerated by the spec, of course, but they are expensive to manage in practical use cases. For this reason, I'd vote to not just keep a conservative bookmark (in which case the records would still come through), but actually to filter the records and keep the conservative bookmark limit.

I agree, I think this is a pretty reasonable solution and it should be relatively easy to implement. The way that the cloudwatch API works is that I send a start and end timestamp for the batch of logs I want to retrieve, I will plan to make the end timestamp current_time minus 5 mins (default but configurable). This should result in what you suggest i.e. no records will be emitted after that timestamp and the bookmark will always be at least 5 mins from real time.

@aaronsteers
Copy link

aaronsteers commented Mar 1, 2023

@pnadolny13 - Sounds great. There are several ways to implement, but if helpful, one option is to drop records by returning None from post_process() if the datetime is past the established limit. (Probably more efficient in a loop, but just wanted to mention this as a possibility if its simpler.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants