Skip to content

Commit

Permalink
Merge #5974 #5979 #5985
Browse files Browse the repository at this point in the history
5974: Add more metrics for processing latency r=deepthidevaki a=deepthidevaki

## Description

Since we have been speculating about several root causes/solutions for performance bottlenecks, I thought it would be good to added following metrics:
* Time taken for processing a record
* Latency to write a record to the log
* Latency to commit a record

## Related issues

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [ ] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


5979: Fixes outdated topology when no new leader is assigned r=npepinpe a=npepinpe

## Description

This PR fixes a bug in the gateway topology. The topology manager keeps track of who is leader and follower for each partition. This information is gossiped by all nodes in the cluster. Normally, when a node which was leader for partition 1 sends that it is now follower, another node will send that it is leader. There's an edge case, however, when no other node sends that it is the leader. In this case, we end up with a topology where a node is both leader and follower. This means that we report the wrong topology and that the gateway will keep trying to route requests to the node. The case where no new node becomes leader can happen due to network partitioning, for example, and is an expected case we should be able to tolerate.

This PR adds more test coverage and fixes the issue by removing the old partition leader if, when adding a new follower, they have the same ID.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #2501 

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [x] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


5985: Introduce single data directory configuration field r=npepinpe a=npepinpe

## Description

This PR deprecates `zeebe.broker.data.directories` configuration setting - a comma separated list of paths - in favour of `zeebe.broker.data.directory`, a single string. This is to avoid confusion on the user's end, as we don't support multiple directories since a long time, and configuring multiple ones simply does nothing. The old setting is deprecated as of 0.26, and marked for removal in 0.27.0.

## Related issues

closes #2789 

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [x] The behavior is tested manually
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Nicolas Pépin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
3 people committed Dec 14, 2020
4 parents cfc2150 + 05dc428 + 87cddba + 16f3d2b commit c4ab987
Show file tree
Hide file tree
Showing 25 changed files with 578 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static Atomix fromConfiguration(
.withMembershipProvider(discoveryProvider);

final DataCfg dataConfiguration = configuration.getData();
final String rootDirectory = dataConfiguration.getDirectories().get(0);
final String rootDirectory = dataConfiguration.getDirectory();
IoUtil.ensureDirectoryExists(new File(rootDirectory), "Zeebe data directory");

final RaftPartitionGroup partitionGroup =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.zeebe.broker.Loggers;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
Expand All @@ -32,8 +31,14 @@ public final class DataCfg implements ConfigurationEntry {
private static final Duration DEFAULT_DISK_USAGE_MONITORING_DELAY = Duration.ofSeconds(1);
private static final double DISABLED_DISK_USAGE_WATERMARK = 1.0;

// Hint: do not use Collections.singletonList as this does not support replaceAll
private List<String> directories = Arrays.asList(DEFAULT_DIRECTORY);
/**
* @deprecated this field is deprecated in favour of {@code directory}, and will be removed in
* 0.27.0
*/
@Deprecated(since = "0.26.0")
private List<String> directories;

private String directory = DEFAULT_DIRECTORY;

private DataSize logSegmentSize = DEFAULT_DATA_SIZE;

Expand All @@ -50,7 +55,17 @@ public final class DataCfg implements ConfigurationEntry {

@Override
public void init(final BrokerCfg globalConfig, final String brokerBase) {
directories.replaceAll(d -> ConfigurationUtil.toAbsolutePath(d, brokerBase));
directory = ConfigurationUtil.toAbsolutePath(directory, brokerBase);

// fallback to directories if it's still specified
if (directories != null) {
directories = LIST_SANITIZER.apply(directories);
if (!directories.isEmpty()) {
directories.replaceAll(d -> ConfigurationUtil.toAbsolutePath(d, brokerBase));
directory = directories.get(0);
}
}

if (!diskUsageMonitoringEnabled) {
LOG.info(
"Disk usage watermarks are disabled, setting all watermarks to {}",
Expand All @@ -61,12 +76,33 @@ public void init(final BrokerCfg globalConfig, final String brokerBase) {
rocksdb.init(globalConfig, brokerBase);
}

/**
* @deprecated this method is deprecated in favour of {@link #getDirectory()}, and will be removed
* in 0.27.0
*/
@Deprecated(since = "0.26.0")
public List<String> getDirectories() {
return directories;
}

/**
* @deprecated this method is deprecated in favour of {@link #setDirectory(String)}}, and will be
* removed in 0.27.0
*/
@Deprecated(since = "0.26.0")
public void setDirectories(final List<String> directories) {
this.directories = LIST_SANITIZER.apply(directories);
if (!this.directories.isEmpty()) {
directory = directories.get(0);
}
}

public String getDirectory() {
return directory;
}

public void setDirectory(final String directory) {
this.directory = directory;
}

public long getLogSegmentSizeInBytes() {
Expand Down Expand Up @@ -126,8 +162,8 @@ public void setDiskUsageCommandWatermark(final double diskUsageCommandWatermark)
}

public long getFreeDiskSpaceCommandWatermark() {
final var directory = new File(getDirectories().get(0));
return Math.round(directory.getTotalSpace() * (1 - diskUsageCommandWatermark));
final var directoryFile = new File(getDirectory());
return Math.round(directoryFile.getTotalSpace() * (1 - diskUsageCommandWatermark));
}

public double getDiskUsageReplicationWatermark() {
Expand All @@ -139,8 +175,8 @@ public void setDiskUsageReplicationWatermark(final double diskUsageReplicationWa
}

public long getFreeDiskSpaceReplicationWatermark() {
final var directory = new File(getDirectories().get(0));
return Math.round(directory.getTotalSpace() * (1 - diskUsageReplicationWatermark));
final var directoryFile = new File(getDirectory());
return Math.round(directoryFile.getTotalSpace() * (1 - diskUsageReplicationWatermark));
}

public Duration getDiskUsageMonitoringInterval() {
Expand All @@ -162,8 +198,8 @@ public void setRocksdb(final RocksdbCfg rocksdb) {
@Override
public String toString() {
return "DataCfg{"
+ "directories="
+ directories
+ "directory="
+ directory
+ ", logSegmentSize="
+ logSegmentSize
+ ", snapshotPeriod="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DiskSpaceUsageMonitor extends Actor {
public DiskSpaceUsageMonitor(final DataCfg dataCfg) {
monitoringDelay = dataCfg.getDiskUsageMonitoringInterval();
minFreeDiskSpaceRequired = dataCfg.getFreeDiskSpaceCommandWatermark();
final var directory = new File(dataCfg.getDirectories().get(0));
final var directory = new File(dataCfg.getDirectory());
freeDiskSpaceSupplier = directory::getUsableSpace;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
Expand Down Expand Up @@ -67,7 +66,9 @@ public final class BrokerCfgTest {
"zeebe.broker.experimental.detectReprocessingInconsistency";
private static final String ZEEBE_BROKER_EXPERIMENTAL_DISABLEEXPLICITRAFTFLUSH =
"zeebe.broker.experimental.disableExplicitRaftFlush";
private static final String ZEEBE_BROKER_DATA_DIRECTORY = "zeebe.broker.data.directory";

@Deprecated(since = "0.26.0")
private static final String ZEEBE_BROKER_DATA_DIRECTORIES = "zeebe.broker.data.directories";

private static final String ZEEBE_BROKER_NETWORK_HOST = "zeebe.broker.network.host";
Expand Down Expand Up @@ -350,31 +351,45 @@ public void shouldClearContactPointFromEnvironment() {
}

@Test
public void shouldUseDefaultDirectories() {
assertDefaultDirectories(DEFAULT_DIRECTORY);
public void shouldUseDefaultDirectory() {
// given
final String expectedDataDirectory = Paths.get(BROKER_BASE, DEFAULT_DIRECTORY).toString();

// then
assertWithDefaultConfigurations(
config -> assertThat(config.getData().getDirectory()).isEqualTo(expectedDataDirectory));
}

@Test
public void shouldUseSpecifiedDirectories() {
assertDirectories("directories", "data1", "data2", "data3");
public void shouldUseSpecifiedDirectory() {
// given
final BrokerCfg config = TestConfigReader.readConfig("directory", environment);
final String expectedDataDirectory = Paths.get(BROKER_BASE, "foo").toString();

// then
assertThat(config.getData().getDirectory()).isEqualTo(expectedDataDirectory);
}

@Test
public void shouldUseDirectoriesFromEnvironment() {
environment.put(ZEEBE_BROKER_DATA_DIRECTORIES, "foo,bar");
assertDefaultDirectories("foo", "bar");
public void shouldUseDirectoryFromEnvironment() {
// given
final String expectedDataDirectory = Paths.get(BROKER_BASE, "foo").toString();
environment.put(ZEEBE_BROKER_DATA_DIRECTORY, "foo");

// then
assertWithDefaultConfigurations(
config -> assertThat(config.getData().getDirectory()).isEqualTo(expectedDataDirectory));
}

@Test
public void shouldUseDirectoriesFromEnvironmentWithSpecifiedDirectories() {
public void shouldOverrideDirectoryWithFirstDirectories() {
// given
final String expectedDataDirectory = Paths.get(BROKER_BASE, "foo").toString();
environment.put(ZEEBE_BROKER_DATA_DIRECTORIES, "foo,bar");
assertDirectories("directories", "foo", "bar");
}

@Test
public void shouldUseSingleDirectoryFromEnvironment() {
environment.put(ZEEBE_BROKER_DATA_DIRECTORIES, "hello");
assertDirectories("directories", "hello");
// then
assertWithDefaultConfigurations(
config -> assertThat(config.getData().getDirectory()).isEqualTo(expectedDataDirectory));
}

@Test
Expand Down Expand Up @@ -821,24 +836,6 @@ private void assertContactPoints(final String configFileName, final List<String>
assertThat(cfg.getInitialContactPoints()).containsExactlyElementsOf(contactPoints);
}

private void assertDefaultDirectories(final String... directories) {
assertDirectories("default", directories);
assertDirectories("empty", directories);
}

private void assertDirectories(final String configFileName, final String... directories) {
assertDirectories(configFileName, Arrays.asList(directories));
}

private void assertDirectories(final String configFileName, final List<String> directories) {
final DataCfg cfg = TestConfigReader.readConfig(configFileName, environment).getData();
final List<String> expected =
directories.stream()
.map(d -> Paths.get(BROKER_BASE, d).toString())
.collect(Collectors.toList());
assertThat(cfg.getDirectories()).containsExactlyElementsOf(expected);
}

private void assertDefaultEmbeddedGatewayEnabled(final boolean enabled) {
assertEmbeddedGatewayEnabled("default", enabled);
assertEmbeddedGatewayEnabled("empty", enabled);
Expand Down
23 changes: 11 additions & 12 deletions broker/src/test/java/io/zeebe/broker/test/EmbeddedBrokerRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -71,7 +70,7 @@ public final class EmbeddedBrokerRule extends ExternalResource {
protected final SpringBrokerBridge springBrokerBridge = new SpringBrokerBridge();
protected long startTime;
private File newTemporaryFolder;
private List<String> dataDirectories;
private String dataDirectory;

@SafeVarargs
public EmbeddedBrokerRule(final Consumer<BrokerCfg>... configurators) {
Expand Down Expand Up @@ -244,7 +243,7 @@ public void startBroker(final PartitionListener... listeners) {
});
}

dataDirectories = broker.getBrokerContext().getBrokerConfiguration().getData().getDirectories();
dataDirectory = broker.getBrokerContext().getBrokerConfiguration().getData().getDirectory();
}

public void configureBroker(final BrokerCfg brokerCfg) {
Expand All @@ -269,17 +268,17 @@ public void configureBroker(final BrokerCfg brokerCfg) {
}

public void purgeSnapshots() {
for (final String dataDirectoryName : dataDirectories) {
final File dataDirectory = new File(dataDirectoryName);
final File directory = new File(dataDirectory);

final File[] partitionDirectories =
dataDirectory.listFiles((d, f) -> new File(d, f).isDirectory());
final File[] partitionDirectories = directory.listFiles((d, f) -> new File(d, f).isDirectory());
if (partitionDirectories == null) {
return;
}

for (final File partitionDirectory : partitionDirectories) {
final File stateDirectory = new File(partitionDirectory, STATE_DIRECTORY);
if (stateDirectory.exists()) {
deleteSnapshots(stateDirectory);
}
for (final File partitionDirectory : partitionDirectories) {
final File stateDirectory = new File(partitionDirectory, STATE_DIRECTORY);
if (stateDirectory.exists()) {
deleteSnapshots(stateDirectory);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions broker/src/test/resources/system/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ zeebe:
host: 0.0.0.0

data:
# Specify a list of directories in which data is stored.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORIES.
directories: [ data ]
# Specify the directory in which data is stored.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORY.
directory: data
# The size of data log segment files.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_LOGSEGMENTSIZE.
logSegmentSize: 512MB
Expand Down
4 changes: 0 additions & 4 deletions broker/src/test/resources/system/directories.yaml

This file was deleted.

4 changes: 4 additions & 0 deletions broker/src/test/resources/system/directory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
zeebe:
broker:
data:
directory: foo
6 changes: 3 additions & 3 deletions dist/src/main/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ zeebe:
host: 0.0.0.0

data:
# Specify a list of directories in which data is stored.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORIES.
directories: [ data ]
# Specify a directory in which data is stored.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORY.
directory: data
# The size of data log segment files.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_LOGSEGMENTSIZE.
logSegmentSize: 512MB
Expand Down
11 changes: 3 additions & 8 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,9 @@
# ├── runtime
# └── snapshots

# Specify a list of directories in which data is stored. Using multiple
# directories makes sense in case the machine which is running Zeebe has
# multiple disks which are used in a JBOD (just a bunch of disks) manner. This
# allows to get greater throughput in combination with a higher io thread count
# since writes to different disks can potentially be done in parallel.
#
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORIES.
# directories: [ data ]
# Specify the directory in which data is stored.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORY.
# directory: data

# The size of data log segment files.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_LOGSEGMENTSIZE.
Expand Down
11 changes: 3 additions & 8 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,9 @@
# ├── runtime
# └── snapshots

# Specify a list of directories in which data is stored. Using multiple
# directories makes sense in case the machine which is running Zeebe has
# multiple disks which are used in a JBOD (just a bunch of disks) manner. This
# allows to get greater throughput in combination with a higher io thread count
# since writes to different disks can potentially be done in parallel.
#
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORIES.
# directories: [ data ]
# Specify the directory in which data is stored.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_DIRECTORY.
# directory: data

# The size of data log segment files.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_DATA_LOGSEGMENTSIZE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ public final class StreamProcessorMetrics {
Histogram.build()
.namespace(NAMESPACE)
.name("stream_processor_latency")
.help("Latency of processing in seconds")
.help(
"Time between a record is written until it is picked up for processing (in seconds)")
.labelNames("recordType", "partition")
.register();
private static final Histogram PROCESSING_DURATION =
Histogram.build()
.namespace(NAMESPACE)
.name("stream_processor_processing_duration")
.help("Time for processing a record (in seconds)")
.labelNames("recordType", "partition")
.register();

Expand Down Expand Up @@ -64,6 +72,13 @@ public void processingLatency(
.observe((processed - written) / 1000f);
}

public void processingDuration(
final RecordType recordType, final long started, final long processed) {
PROCESSING_DURATION
.labels(recordType.name(), partitionIdLabel)
.observe((processed - started) / 1000f);
}

public void eventProcessed() {
event("processed");
}
Expand Down

0 comments on commit c4ab987

Please sign in to comment.