New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12648: minimum changes for error handling namedTopologies #10544
Conversation
@ableegoldman This is what I think the error handling should look like. We can try to categorize more but I think we should make this best effort and we can add to it as we go. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ableegoldman this is a work in progress but this is general idea. WDYT?
there are a couple things I will improve also we need to think about what other exceptions we should consider are tightly tied to a named topology.
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) { | |||
|
|||
private void handleStreamsUncaughtException(final Throwable throwable, | |||
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { | |||
if (throwable instanceof NamedTopologyStreamsException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should consider exceptions that were not StreamExceptions as well I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) { | |||
|
|||
private void handleStreamsUncaughtException(final Throwable throwable, | |||
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { | |||
if (throwable instanceof NamedTopologyStreamsException) { | |||
String name = ((NamedTopologyStreamsException) throwable).getTopologyName(); | |||
((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread
EDIT: we def will need to track the failed topologies in KafkaStreams
, though tracking successes across restarts will be an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to lose some state after a restart, presumably we'll just start up with all queries assumed to be good and then re-de-prioritize any that are still failing. We can iterate on this and let ksql persist this info in the command topic and give Streams hints when starting up that a query may still be failing, but just tracking it in-memory seems good enough for a first pass
@@ -1198,7 +1210,7 @@ int process(final int maxNumRecords, final Time time) { | |||
task.recordProcessBatchTime(now - then); | |||
} | |||
} | |||
|
|||
reprioritizeTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if processing succeeds
@@ -43,6 +43,8 @@ | |||
private final Map<TaskId, Task> allTasksPerId = new TreeMap<>(); | |||
private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); | |||
private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); | |||
private final Collection<Task> misbehavingTasks = new HashSet<>(); | |||
private final Collection<Task> taskJail = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
working title
misbehavingTasks.add(task); | ||
} | ||
} | ||
for (Task task: misbehavingTasks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think suspending will work. however I am not sure it will survive thread restarts. I may need to store the topology names that are in each state to repopulate each of these lists in new threads
changed the obvious ones to attribute to a named topology. This might be all we need, we can always add more if they come up.
Committer Checklist (excluded from commit message)