Skip to content

Commit

Permalink
Add more RocksDB option (stats and compaction threads) also more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Cai committed May 23, 2016
1 parent a2286ee commit eed1726
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 7 deletions.
Expand Up @@ -34,9 +34,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
this.windowName = windowName;

if (windowSizeMs * 2 > retentionPeriodMs)
if (windowSizeMs > retentionPeriodMs)
throw new TopologyBuilderException("The retention period of the join window "
+ windowName + " must at least two times its window size.");
+ windowName + " must be no smaller than its window size.");
}

@Override
Expand Down
Expand Up @@ -17,16 +17,14 @@

package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.WindowStore;

import java.util.Iterator;
import org.apache.kafka.streams.state.WindowStoreIterator;

class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {

Expand Down Expand Up @@ -76,14 +74,15 @@ public void process(K key, V1 value) {
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);

Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo);
while (iter.hasNext()) {
needOuterJoin = false;
context().forward(key, joiner.apply(value, iter.next().value));
}

if (needOuterJoin)
context().forward(key, joiner.apply(value, null));
iter.close();
}
}

Expand Down
Expand Up @@ -217,11 +217,13 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest
// TODO: this is a bit hacky to first seek then position to get the end offset
restoreConsumer.seekToEnd(singleton(storePartition));
long endOffset = restoreConsumer.position(storePartition);
log.info("end offset for {} is {}", storePartition, endOffset);

// restore from the checkpointed offset of the change log if it is persistent and the offset exists;
// restore the state from the beginning of the change log otherwise
if (checkpointedOffsets.containsKey(storePartition)) {
restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
log.info("restored offset for {} is {}", storePartition, checkpointedOffsets.get(storePartition));
} else {
restoreConsumer.seekToBeginning(singleton(storePartition));
}
Expand Down
Expand Up @@ -52,6 +52,9 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -122,6 +125,7 @@ static File makeStateDir(String applicationId, String baseDirName) {
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
log.info("Thread {}, assigned: {}", Thread.currentThread().getName(), assignment);
try {
addStreamTasks(assignment);
addStandbyTasks();
Expand All @@ -134,6 +138,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> assignment) {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
log.info("Thread {}, revoked: {}", Thread.currentThread().getName(), assignment);
try {
commitAll();
lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
Expand Down Expand Up @@ -370,6 +375,7 @@ private void runLoop() {
requiresPoll = true;
}
maybeCommit();

maybeUpdateStandbyTasks();

maybeClean();
Expand Down Expand Up @@ -759,7 +765,12 @@ public StreamsMetricsImpl(Metrics metrics) {

@Override
public void recordLatency(Sensor sensor, long startNs, long endNs) {
sensor.record((endNs - startNs) / 1000000, endNs);
if (endNs - startNs > 1000 * 5000000L ) {
log.warn("{}, long latency {} for sensor {}",
Thread.currentThread().getName(),
(endNs - startNs) / 1000 / 1000000L, sensor.name());
}
sensor.record((endNs - startNs) / 1000000, endNs / 1000000);
}

/**
Expand Down
Expand Up @@ -62,6 +62,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int TTL_SECONDS = TTL_NOT_USED;
private static final int MAX_WRITE_BUFFERS = 5;
private static final int MIN_WRITE_BUFFER_NUMBER_TO_MERGE = 2;
private static final int MAX_BACKGROUND_COMPACTIONS = 4;
private static final String DB_FILE_DIR = "rocksdb";

private final String name;
Expand All @@ -86,6 +87,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private StoreChangeLogger<byte[], byte[]> changeLogger;
private StoreChangeLogger.ValueGetter<byte[], byte[]> getter;

public static List<RocksDBStore> stores = new ArrayList<>();

public KeyValueStore<K, V> enableLogging() {
loggingEnabled = true;

Expand All @@ -103,6 +106,7 @@ public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
}

public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) {
stores.add(this);
this.name = name;
this.parentDir = parentDir;
this.keySerde = keySerde;
Expand All @@ -114,6 +118,7 @@ public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> v
tableConfig.setBlockSize(BLOCK_SIZE);

options = new Options();
options.createStatistics();
options.setTableFormatConfig(tableConfig);
options.setMaxBytesForLevelBase(MAX_BYTES_FOR_LEVEL_BASE);
options.setTargetFileSizeBase(TARGET_FILE_SIZE_BASE);
Expand All @@ -122,6 +127,7 @@ public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> v
options.setCompactionStyle(COMPACTION_STYLE);
options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
options.setMinWriteBufferNumberToMerge(MIN_WRITE_BUFFER_NUMBER_TO_MERGE);
options.setMaxBackgroundCompactions(MAX_BACKGROUND_COMPACTIONS);
options.setCreateIfMissing(true);
options.setErrorIfExists(false);

Expand Down

0 comments on commit eed1726

Please sign in to comment.