Skip to content

Commit

Permalink
KAFKA-2811: add standby tasks
Browse files Browse the repository at this point in the history
guozhangwang
* added a new config param "num.standby.replicas" (the default value is 0).
* added a new abstract class AbstractTask
* added StandbyTask as a subclass of AbstractTask
* modified StreamTask to a subclass of AbstractTask
* StreamThread
  * standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned()
  * standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked()
  * In addStandbyTasks(), change log partitions are assigned to restoreConsumer.
  * In removeStandByTasks(), change log partitions are removed from restoreConsumer.
  * StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0.
  * If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #526 from ymatsuda/standby_task
  • Loading branch information
Yasuhiro Matsuda authored and Geoff Anderson committed Nov 18, 2015
1 parent 46a3ce8 commit e7630a5
Show file tree
Hide file tree
Showing 12 changed files with 951 additions and 181 deletions.
Expand Up @@ -54,6 +54,10 @@ public class StreamingConfig extends AbstractConfig {
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";

/** <code>num.stream.threads</code> */
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";

/** <code>buffered.records.per.partition</code> */
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
Expand Down Expand Up @@ -136,6 +140,11 @@ public class StreamingConfig extends AbstractConfig {
1,
Importance.LOW,
NUM_STREAM_THREADS_DOC)
.define(NUM_STANDBY_REPLICAS_CONFIG,
Type.INT,
0,
Importance.LOW,
NUM_STANDBY_REPLICAS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
1000,
Expand Down Expand Up @@ -214,6 +223,7 @@ public StreamingConfig(Map<?, ?> props) {

public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
Map<String, Object> props = getConsumerConfigs();
props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
return props;
Expand Down
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;

public abstract class AbstractTask {
protected final TaskId id;
protected final ProcessorTopology topology;
protected final ProcessorStateManager stateMgr;
protected final Set<TopicPartition> partitions;
protected ProcessorContext processorContext;

protected AbstractTask(TaskId id,
Consumer<byte[], byte[]> restoreConsumer,
ProcessorTopology topology,
StreamingConfig config,
Set<TopicPartition> partitions) {
this.id = id;
this.topology = topology;
this.partitions = partitions;

// create the processor state manager
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
}

protected void initializeStateStores() {
for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
StateStore store = stateStoreSupplier.get();
store.init(this.processorContext);
}
}

public final TaskId id() {
return id;
}

public final Set<TopicPartition> partitions() {
return this.partitions;
}

public final ProcessorTopology topology() {
return topology;
}

public final ProcessorContext context() {
return processorContext;
}

public abstract void commit();

public void close() {
try {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
} catch (IOException e) {
throw new KafkaException("Error while closing the state manager in processor context", e);
}
}

}
Expand Up @@ -46,11 +46,14 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);

private StreamThread streamThread;
private int numStandbyReplicas;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
private Set<TaskId> standbyTasks;

@Override
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);

Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
Expand Down Expand Up @@ -99,7 +102,6 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
// - We try not to assign the same set of tasks to two different clients
// We do the assignment in one-pass. The result may not satisfy above all.
// 2. within each client, tasks are assigned to consumer clients in round-robin manner.

Map<UUID, Set<String>> consumersByClient = new HashMap<>();
Map<UUID, ClientState<TaskId>> states = new HashMap<>();

Expand Down Expand Up @@ -132,7 +134,7 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
// Get partition groups from the partition grouper
Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);

states = TaskAssignor.assign(states, partitionGroups.keySet(), 0); // TODO: enable standby tasks
states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
Map<String, Assignment> assignment = new HashMap<>();

for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.nio.channels.OverlappingFileLockException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ProcessorStateManager {
Expand All @@ -51,13 +52,17 @@ public class ProcessorStateManager {
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks

public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
this.partition = partition;
this.baseDir = baseDir;
this.stores = new HashMap<>();
this.restoreConsumer = restoreConsumer;
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;

// create the state directory for this task if missing (we won't create the parent directory)
createStateDirectory(baseDir);
Expand Down Expand Up @@ -103,8 +108,6 @@ public void register(StateStore store, StateRestoreCallback stateRestoreCallback
if (this.stores.containsKey(store.name()))
throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");

// ---- register the store ---- //

// check that the underlying change log topic exist or not
if (restoreConsumer.listTopics().containsKey(store.name())) {
boolean partitionNotFound = true;
Expand All @@ -124,48 +127,91 @@ public void register(StateStore store, StateRestoreCallback stateRestoreCallback

this.stores.put(store.name(), store);

if (isStandby) {
if (store.persistent())
restoreCallbacks.put(store.name(), stateRestoreCallback);
} else {
restoreActiveState(store, stateRestoreCallback);
}
}

private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {

if (store == null)
throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");

// ---- try to restore the state from change-log ---- //

// subscribe to the store's partition
TopicPartition storePartition = new TopicPartition(store.name(), partition);
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
}
TopicPartition storePartition = new TopicPartition(store.name(), partition);
restoreConsumer.assign(Collections.singletonList(storePartition));

// calculate the end offset of the partition
// TODO: this is a bit hacky to first seek then position to get the end offset
restoreConsumer.seekToEnd(storePartition);
long endOffset = restoreConsumer.position(storePartition);
try {
// calculate the end offset of the partition
// TODO: this is a bit hacky to first seek then position to get the end offset
restoreConsumer.seekToEnd(storePartition);
long endOffset = restoreConsumer.position(storePartition);

// restore from the checkpointed offset of the change log if it is persistent and the offset exists;
// restore the state from the beginning of the change log otherwise
if (checkpointedOffsets.containsKey(storePartition)) {
restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
} else {
restoreConsumer.seekToBeginning(storePartition);
}

// restore from the checkpointed offset of the change log if it is persistent and the offset exists;
// restore the state from the beginning of the change log otherwise
if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
} else {
restoreConsumer.seekToBeginning(storePartition);
}
// restore its state from changelog records; while restoring the log end offset
// should not change since it is only written by this thread.
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
stateRestoreCallback.restore(record.key(), record.value());
}

// restore its state from changelog records; while restoring the log end offset
// should not change since it is only written by this thread.
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
stateRestoreCallback.restore(record.key(), record.value());
if (restoreConsumer.position(storePartition) == endOffset) {
break;
} else if (restoreConsumer.position(storePartition) > endOffset) {
throw new IllegalStateException("Log end offset should not change while restoring");
}
}

if (restoreConsumer.position(storePartition) == endOffset) {
break;
} else if (restoreConsumer.position(storePartition) > endOffset) {
throw new IllegalStateException("Log end offset should not change while restoring");
// record the restored offset for its change log partition
long newOffset = restoreConsumer.position(storePartition);
restoredOffsets.put(storePartition, newOffset);
} finally {
// un-assign the change log partition
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
}
}

public Map<TopicPartition, Long> checkpointedOffsets() {
Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();

for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
String storeName = entry.getKey();
TopicPartition storePartition = new TopicPartition(storeName, partition);

if (checkpointedOffsets.containsKey(storePartition)) {
partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
} else {
partitionsAndOffsets.put(storePartition, -1L);
}
}
return partitionsAndOffsets;
}

public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
// restore states from changelog records
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());

for (ConsumerRecord<byte[], byte[]> record : records) {
restoreCallback.restore(record.key(), record.value());
}
// record the restored offset for its change log partition
long newOffset = restoreConsumer.position(storePartition);
restoredOffsets.put(storePartition, newOffset);

// un-assign the change log partition
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
}

public StateStore getStore(String name) {
Expand Down Expand Up @@ -224,6 +270,9 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
checkpoint.write(checkpointOffsets);
}

// un-assign the change log partition
restoreConsumer.assign(Collections.<TopicPartition>emptyList());

// release the state directory directoryLock
directoryLock.release();
}
Expand Down

0 comments on commit e7630a5

Please sign in to comment.