Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor #13666

Merged
merged 7 commits into from Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion checkstyle/import-control.xml
Expand Up @@ -229,10 +229,12 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.common"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.timeline"/>
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
</subpackage>

Expand Down
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;

/**
* The base event type used by all events processed in the
* coordinator runtime.
*/
public interface CoordinatorEvent extends EventAccumulator.Event<Integer> {
/**
dajac marked this conversation as resolved.
Show resolved Hide resolved
* Runs the event.
*/
void run();
dajac marked this conversation as resolved.
Show resolved Hide resolved

/**
* Completes the event with the provided exception.
*
* @param exception An exception to complete the event with.
*/
void complete(Throwable exception);
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;

import java.util.concurrent.RejectedExecutionException;

/**
* A {{@link CoordinatorEvent}} processor.
*/
public interface CoordinatorEventProcessor extends AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any plans to make other coordinator event processors besides the multi-threaded one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I use a different implementation in unit tests (without threads) and I plan to use another one for simulation tests at some point.

/**
dajac marked this conversation as resolved.
Show resolved Hide resolved
* Enqueues a new {{@link CoordinatorEvent}}.
*
* @param event The event.
* @throws RejectedExecutionException If the event processor is closed.
*/
void enqueue(CoordinatorEvent event) throws RejectedExecutionException;
}
Expand Up @@ -26,6 +26,7 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -114,10 +115,10 @@ public EventAccumulator(
*
* @param event An {{@link Event}}.
*/
public void add(T event) {
public void add(T event) throws RejectedExecutionException {
lock.lock();
try {
if (closed) throw new IllegalStateException("Can't accept an event because the accumulator is closed.");
if (closed) throw new RejectedExecutionException("Can't accept an event because the accumulator is closed.");

K key = event.key();
Queue<T> queue = queues.get(key);
Expand Down
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.runtime;

import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}}
* which guarantees that events sharing a partition key are not processed concurrently.
*/
public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {

/**
* The logger.
*/
private final Logger log;

/**
* The accumulator.
*/
private final EventAccumulator<Integer, CoordinatorEvent> accumulator;

/**
* The processing threads.
*/
private final List<EventProcessorThread> threads;

/**
* The lock for protecting access to the resources during the
* shutting down.
*/
private final ReentrantLock lock;

/**
* A boolean indicated whether the event processor is shutting down.
*/
private volatile boolean shuttingDown;

/**
* Constructor.
*
* @param logContext The log context.
* @param threadPrefix The thread prefix.
* @param numThreads The number of threads.
*/
public MultiThreadedEventProcessor(
LogContext logContext,
String threadPrefix,
int numThreads
) {
this.log = logContext.logger(MultiThreadedEventProcessor.class);
this.shuttingDown = false;
this.lock = new ReentrantLock();
this.accumulator = new EventAccumulator<>();
this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
new EventProcessorThread(
threadPrefix + threadId
)
).collect(Collectors.toList());
this.threads.forEach(EventProcessorThread::start);
}

/**
* The event processor thread. The thread pulls events from the
* accumulator and runs them.
*/
private class EventProcessorThread extends Thread {
private final Logger log;

EventProcessorThread(
String name
) {
super(name);
log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class);
setDaemon(false);
}

private void handleEvents() {
while (!shuttingDown) {
dajac marked this conversation as resolved.
Show resolved Hide resolved
CoordinatorEvent event = accumulator.poll();
dajac marked this conversation as resolved.
Show resolved Hide resolved
if (event != null) {
try {
log.debug("Executing event: {}.", event);
event.run();
} catch (Throwable t) {
log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
event.complete(t);
} finally {
accumulator.done(event);
}
}
}
}

private void drainEvents() {
CoordinatorEvent event = accumulator.poll(0, TimeUnit.MILLISECONDS);
while (event != null) {
try {
log.debug("Draining event: {}.", event);
event.complete(new RejectedExecutionException("EventProcessor is closed."));
} catch (Throwable t) {
log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t);
} finally {
accumulator.done(event);
}

event = accumulator.poll(0, TimeUnit.MILLISECONDS);
}
}

@Override
public void run() {
log.info("Starting");

try {
handleEvents();
} catch (Throwable t) {
log.error("Exiting with exception.", t);
}

// The accumulator is drained and all the pending events are rejected
// when the event processor is shutdown.
if (shuttingDown) {
log.info("Shutting down. Draining the remaining events.");
try {
drainEvents();
} catch (Throwable t) {
log.error("Draining threw exception.", t);
}
log.info("Shutdown completed");
}
}
}

/**
* Enqueues a new {{@link CoordinatorEvent}}.
*
* @param event The event.
* @throws RejectedExecutionException If the event processor is closed.
*/
@Override
public void enqueue(CoordinatorEvent event) throws RejectedExecutionException {
accumulator.add(event);
}

/**
* Begins the shutdown of the event processor.
*/
public void beginShutdown() {
lock.lock();
try {
if (shuttingDown) {
log.debug("Event processor is already shutting down.");
return;
}

log.info("Shutting down event processor.");
// The accumulator must be closed first to ensure that new events are
// rejected before threads are notified to shutdown and start to drain
// the accumulator.
accumulator.close();
shuttingDown = true;
} finally {
lock.unlock();
}
}

/**
* Closes the event processor.
*/
@Override
public void close() throws InterruptedException {
lock.lock();
try {
beginShutdown();
for (Thread t : threads) {
t.join();
}
log.info("Event processor closed.");
} finally {
lock.unlock();
}
}
}
Expand Up @@ -112,7 +112,7 @@ public void testBasicOperations() {
}

@Test
public void testKeyConcurrentProcessingAndOrdering() {
public void testKeyConcurrentAndOrderingGuarantees() {
EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>();

MockEvent event0 = new MockEvent(1, 0);
Expand Down