Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Timeouts for Sources #3031

Merged
merged 8 commits into from
Apr 27, 2021
Merged

Replace Timeouts for Sources #3031

merged 8 commits into from
Apr 27, 2021

Conversation

cgardens
Copy link
Contributor

What

  • I started yesterday on a project just to understand what would be needed to get rid of the timeouts in our workers (for all jobs and for both sources and destinations). As I did this, I realized there might be some low hanging fruit here where we could solve all of the problems we've had supported by the community without investing into alot of inter-process / inter-docker heart beating.
  • Essentially if we heart beat whenever we receive a new record from the input stream, we can reasonably say that we should wait a little longer to kill the process. This isn't perfect. In cases, where a long query prevents data from arriving for a long time, it means we could still kill the process prematurely. It should allow us to avoid the case where data is flowing (e.g. the case of the guy who has 80% of his data replicated after 10 hours). We can configure the solution in this PR to be strictly better than what we have now.

How

  • Every time a new record is received, emit a heartbeat.
  • In the logic for killing a source, only kill the source if the process is already dead or if the process is alive but we haven't received a record in a long time.

Follow up

  • I've tuned this such that the numbers are relatively low. This may be a mistake. Need to think a little more about how this should be tuned when it rolls out.

Thought Process

  • I was trying to get the most bang for our buck without having to make contributors of connectors have to do anything. There are however categories of problems that cannot be solved with this approach however. e.g. If the initial query in a source takes a really long time, and no data is sent until that query completes on the server side of the source. With this approach we would continue to kill the process prematurely. In order to solve this, we would have to have the source itself emitting a heartbeat message. (Perhaps adding a new type of message to the protocol).
  • This approach doesn't work at all with destinations as they are implemented now. If we did want to add to destinations logic for this, we would need some way to get the heartbeat messages out of the destination. We could read STDOUT of the destination and detect emitted heartbeat messages (assuming we added heartbeat messages to the protocol).

Anyway, I think this solves the problem as it is reported now, so I think we should move forward with this. Then based on future complaints around timeouts and heart beating we can decide if we want to introduce a more invasive approach where connectors are responsible for doing their own heart beating and / or adding heartbeat messages to the protocol.

Copy link
Contributor

@michel-tricot michel-tricot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful!

@@ -82,6 +94,7 @@ public void start(StandardTapConfig input, Path jobRoot) throws Exception {
LineGobbler.gobble(tapProcess.getErrorStream(), LOGGER::error);

messageIterator = streamFactory.create(IOs.newBufferedReader(tapProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

this.heartBeatFreshMagnitude = heartBeatFreshMagnitude;
this.heartBeatFreshTimeUnit = heartBeatFreshTimeUnit;
this.instantSupplier = instantSupplier;
this.lastBeat = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't initialize it to now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my inclination is that if beat has never been called it is more intuitive that isBeating returns false. if we initialize this to now then for the period between now and becoming stale, the monitor will incorrectly say that beating is happening.

@cgardens cgardens merged commit 13ee577 into master Apr 27, 2021
@cgardens cgardens deleted the cgardens/naive_heartbeat branch April 27, 2021 03:55
cgardens added a commit that referenced this pull request Apr 27, 2021
cgardens added a commit that referenced this pull request Apr 27, 2021
cgardens added a commit that referenced this pull request Apr 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants