Skip to content

Commit

Permalink
Merge branch 'master' into feature/oauth2
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>

Conflicts:
	connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPublisherActor.java
	connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActorTest.java
  • Loading branch information
yufei-cai committed Nov 16, 2021
2 parents a5be32d + 9d77b68 commit b0b5554
Show file tree
Hide file tree
Showing 126 changed files with 3,213 additions and 732 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.things.model.ThingId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
11 changes: 11 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<slf4j.version>1.7.31</slf4j.version>
<logback.version>1.2.6</logback.version>
<logstash-logback-encoder.version>6.6</logstash-logback-encoder.version>
<fluency.version>2.6.0</fluency.version>
<janino.version>2.7.8</janino.version>

<!-- ### Metrics and Tracing -->
Expand Down Expand Up @@ -407,6 +408,16 @@
<artifactId>janino</artifactId>
<version>${janino.version}</version>
</dependency>
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>fluency-core</artifactId>
<version>${fluency.version}</version>
</dependency>
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>fluency-fluentd</artifactId>
<version>${fluency.version}</version>
</dependency>

<dependency>
<groupId>io.kamon</groupId>
Expand Down
39 changes: 39 additions & 0 deletions connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ jmh-generator-annprocess). jmh-generator-annprocess overwrites the whole META-IN
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</dependency>

<!-- fluency for logging connection logs into fluentd / fluentbit -->
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>fluency-core</artifactId>
</dependency>
<dependency>
<groupId>org.komamitsu</groupId>
<artifactId>fluency-fluentd</artifactId>
</dependency>

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
Expand Down Expand Up @@ -328,6 +339,34 @@ jmh-generator-annprocess). jmh-generator-annprocess overwrites the whole META-IN
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-webjars</id>
<phase>generate-test-resources</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.webjars.npm</groupId>
<artifactId>protobufjs</artifactId>
<version>6.11.2</version>
<type>jar</type>
<includes>META-INF/resources/webjars/protobufjs/6.11.2/dist/protobuf.js</includes>
<fileMappers>
<org.codehaus.plexus.components.io.filemappers.FlattenFileMapper/>
</fileMappers>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.testOutputDirectory}/unpacked-test-webjars/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
@IndexSubclasses
public interface ConnectionConfigProvider {

CompletionStage<Config> getConnectivityConfigOverwrites(ConnectionId connectionId);

/**
* Loads a {@link ConnectivityConfig} by a connection ID.
* Loads specific overwrites of connectivity for a given connection ID.
*
* @param connectionId the connection id for which to load the {@link ConnectivityConfig}
* @return the future connectivity config
* @param connectionId the connection id for which to load config overwrites
* @return the future config overwrites
*/
CompletionStage<ConnectivityConfig> getConnectivityConfig(ConnectionId connectionId);
CompletionStage<Config> getConnectivityConfigOverwrites(ConnectionId connectionId);

/**
* Register the given {@code subscriber} for changes to the {@link ConnectivityConfig} of the given {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public int hashCode() {
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
", initTimeout=" + initTimeout +
"initTimeout=" + initTimeout +
", connectingMinTimeout=" + connectingMinTimeout +
", connectingMaxTimeout=" + connectingMaxTimeout +
", disconnectingMaxTimeout=" + disconnectingMaxTimeout +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.komamitsu.fluency.Fluency;
import org.komamitsu.fluency.fluentd.FluencyBuilderForFluentd;

import com.typesafe.config.Config;

/**
* Default implementation of {@link FluencyLoggerPublisherConfig}.
*/
@Immutable
public final class DefaultFluencyLoggerPublisherConfig implements FluencyLoggerPublisherConfig {

private static final String CONFIG_PATH = "fluency";

private final String host;
private final int port;
private final boolean sslEnabled;
private final Duration connectionTimeout;
private final Duration readTimeout;
private final long maxBufferSize;
private final long bufferChunkInitialSize;
private final long bufferChunkRetentionSize;
private final Duration bufferChunkRetentionTime;
private final Duration flushAttemptInterval;
@Nullable private final String fileBackupDir;
private final Duration waitUntilBufferFlushed;
private final Duration waitUntilFlusherTerminated;
private final boolean jvmHeapBufferMode;
private final int senderMaxRetryCount;
private final Duration senderBaseRetryInterval;
private final Duration senderMaxRetryInterval;
private final boolean ackResponseMode;
private final Duration waitUntilAllBufferFlushedDurationOnClose;

private DefaultFluencyLoggerPublisherConfig(final ConfigWithFallback config) {
host = config.getString(ConfigValue.HOST.getConfigPath());
port = config.getPositiveIntOrThrow(ConfigValue.PORT);
sslEnabled = config.getBoolean(ConfigValue.SSL_ENABLED.getConfigPath());
connectionTimeout = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.CONNECTION_TIMEOUT);
readTimeout = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.READ_TIMEOUT);
maxBufferSize = config.getPositiveLongOrThrow(ConfigValue.MAX_BUFFER_SIZE);
bufferChunkInitialSize = config.getPositiveLongOrThrow(ConfigValue.BUFFER_CHUNK_INITIAL_SIZE);
bufferChunkRetentionSize = config.getPositiveLongOrThrow(ConfigValue.BUFFER_CHUNK_RETENTION_SIZE);
bufferChunkRetentionTime =
config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.BUFFER_CHUNK_RETENTION_TIME);
flushAttemptInterval = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.FLUSH_ATTEMPT_INTERVAL);
fileBackupDir = config.getStringOrNull(ConfigValue.FILE_BACKUP_DIR);
waitUntilBufferFlushed = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.WAIT_UNTIL_BUFFER_FLUSHED);
waitUntilFlusherTerminated =
config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.WAIT_UNTIL_FLUSHER_TERMINATED);
jvmHeapBufferMode = config.getBoolean(ConfigValue.JVM_HEAP_BUFFER_MODE.getConfigPath());
senderMaxRetryCount = config.getPositiveIntOrThrow(ConfigValue.SENDER_MAX_RETRY_COUNT);
senderBaseRetryInterval =
config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.SENDER_BASE_RETRY_INTERVAL);
senderMaxRetryInterval = config.getNonNegativeAndNonZeroDurationOrThrow(ConfigValue.SENDER_MAX_RETRY_INTERVAL);
ackResponseMode = config.getBoolean(ConfigValue.ACK_RESPONSE_MODE.getConfigPath());
waitUntilAllBufferFlushedDurationOnClose =
config.getDuration(ConfigValue.WAIT_UNTIL_BUFFER_FLUSHED_DURATION_ON_CLOSE.getConfigPath());
}

/**
* Returns {@link FluencyLoggerPublisherConfig}.
*
* @param config is supposed to provide the settings of the connection config at {@value #CONFIG_PATH}.
* @return the instance.
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static FluencyLoggerPublisherConfig of(final Config config) {
return new DefaultFluencyLoggerPublisherConfig(
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
}

@Override
public Fluency buildFluencyLoggerPublisher() {
final FluencyBuilderForFluentd fluencyBuilderForFluentd = new FluencyBuilderForFluentd();

fluencyBuilderForFluentd.setSslEnabled(sslEnabled);
fluencyBuilderForFluentd.setConnectionTimeoutMilli((int) connectionTimeout.toMillis());
fluencyBuilderForFluentd.setReadTimeoutMilli((int) readTimeout.toMillis());
fluencyBuilderForFluentd.setMaxBufferSize(maxBufferSize);
fluencyBuilderForFluentd.setBufferChunkInitialSize((int) bufferChunkInitialSize);
fluencyBuilderForFluentd.setBufferChunkRetentionSize((int) bufferChunkRetentionSize);
fluencyBuilderForFluentd.setBufferChunkRetentionTimeMillis((int) bufferChunkRetentionTime.toMillis());
fluencyBuilderForFluentd.setFlushAttemptIntervalMillis((int) flushAttemptInterval.toMillis());
fluencyBuilderForFluentd.setWaitUntilBufferFlushed((int) waitUntilBufferFlushed.toSeconds());
fluencyBuilderForFluentd.setWaitUntilFlusherTerminated((int) waitUntilFlusherTerminated.toSeconds());
fluencyBuilderForFluentd.setFileBackupDir(fileBackupDir);
fluencyBuilderForFluentd.setJvmHeapBufferMode(jvmHeapBufferMode);
fluencyBuilderForFluentd.setSenderMaxRetryCount(senderMaxRetryCount);
fluencyBuilderForFluentd.setSenderBaseRetryIntervalMillis((int) senderBaseRetryInterval.toMillis());
fluencyBuilderForFluentd.setSenderMaxRetryIntervalMillis((int) senderMaxRetryInterval.toMillis());
fluencyBuilderForFluentd.setAckResponseMode(ackResponseMode);

return fluencyBuilderForFluentd.build(host, port);
}

@Override
public Duration getWaitUntilAllBufferFlushedDurationOnClose() {
return waitUntilAllBufferFlushedDurationOnClose;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DefaultFluencyLoggerPublisherConfig that = (DefaultFluencyLoggerPublisherConfig) o;
return port == that.port && sslEnabled == that.sslEnabled && maxBufferSize == that.maxBufferSize &&
bufferChunkInitialSize == that.bufferChunkInitialSize &&
bufferChunkRetentionSize == that.bufferChunkRetentionSize &&
jvmHeapBufferMode == that.jvmHeapBufferMode &&
senderMaxRetryCount == that.senderMaxRetryCount && ackResponseMode == that.ackResponseMode &&
Objects.equals(host, that.host) &&
Objects.equals(connectionTimeout, that.connectionTimeout) &&
Objects.equals(readTimeout, that.readTimeout) &&
Objects.equals(bufferChunkRetentionTime, that.bufferChunkRetentionTime) &&
Objects.equals(flushAttemptInterval, that.flushAttemptInterval) &&
Objects.equals(fileBackupDir, that.fileBackupDir) &&
Objects.equals(waitUntilBufferFlushed, that.waitUntilBufferFlushed) &&
Objects.equals(waitUntilFlusherTerminated, that.waitUntilFlusherTerminated) &&
Objects.equals(senderBaseRetryInterval, that.senderBaseRetryInterval) &&
Objects.equals(senderMaxRetryInterval, that.senderMaxRetryInterval) &&
Objects.equals(waitUntilAllBufferFlushedDurationOnClose, that.waitUntilAllBufferFlushedDurationOnClose);
}

@Override
public int hashCode() {
return Objects.hash(host, port, sslEnabled, connectionTimeout, readTimeout, maxBufferSize,
bufferChunkInitialSize,
bufferChunkRetentionSize, bufferChunkRetentionTime, flushAttemptInterval, fileBackupDir,
waitUntilBufferFlushed, waitUntilFlusherTerminated, jvmHeapBufferMode, senderMaxRetryCount,
senderBaseRetryInterval, senderMaxRetryInterval, ackResponseMode,
waitUntilAllBufferFlushedDurationOnClose);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"host=" + host +
", port=" + port +
", sslEnabled=" + sslEnabled +
", connectionTimeout=" + connectionTimeout +
", readTimeout=" + readTimeout +
", maxBufferSize=" + maxBufferSize +
", bufferChunkInitialSize=" + bufferChunkInitialSize +
", bufferChunkRetentionSize=" + bufferChunkRetentionSize +
", bufferChunkRetentionTime=" + bufferChunkRetentionTime +
", flushAttemptInterval=" + flushAttemptInterval +
", fileBackupDir=" + fileBackupDir +
", waitUntilBufferFlushed=" + waitUntilBufferFlushed +
", waitUntilFlusherTerminated=" + waitUntilFlusherTerminated +
", jvmHeapBufferMode=" + jvmHeapBufferMode +
", senderMaxRetryCount=" + senderMaxRetryCount +
", senderBaseRetryInterval=" + senderBaseRetryInterval +
", senderMaxRetryInterval=" + senderMaxRetryInterval +
", ackResponseMode=" + ackResponseMode +
", waitUntilAllBufferFlushedDurationOnClose=" + waitUntilAllBufferFlushedDurationOnClose +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.DefaultExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.config.DittoConfigError;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand All @@ -35,12 +37,18 @@ final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {
private final ConnectionThrottlingConfig throttlingConfig;
private final ExponentialBackOffConfig restartBackOffConfig;
private final Config alpakkaConfig;
private final Duration metricCollectingInterval;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ConnectionThrottlingConfig.of(kafkaConsumerScopedConfig);
restartBackOffConfig =
DefaultExponentialBackOffConfig.of(getConfigOrEmpty(kafkaConsumerScopedConfig, RESTART_PATH));
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
metricCollectingInterval =
kafkaConsumerScopedConfig.getDuration(ConfigValue.METRIC_COLLECTING_INTERVAL.getConfigPath());
if (metricCollectingInterval.isNegative() || metricCollectingInterval.isZero()) {
throw new DittoConfigError("The Kafka consumer metric collecting interval has to be positive.");
}
}

/**
Expand Down Expand Up @@ -73,6 +81,11 @@ public Config getAlpakkaConfig() {
return alpakkaConfig;
}

@Override
public Duration getMetricCollectingInterval() {
return metricCollectingInterval;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -84,12 +97,13 @@ public boolean equals(final Object o) {
final DefaultKafkaConsumerConfig that = (DefaultKafkaConsumerConfig) o;
return Objects.equals(throttlingConfig, that.throttlingConfig) &&
Objects.equals(restartBackOffConfig, that.restartBackOffConfig) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig);
Objects.equals(alpakkaConfig, that.alpakkaConfig) &&
Objects.equals(metricCollectingInterval, that.metricCollectingInterval);
}

@Override
public int hashCode() {
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig);
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig, metricCollectingInterval);
}

@Override
Expand All @@ -98,6 +112,7 @@ public String toString() {
"throttlingConfig=" + throttlingConfig +
", restartBackOffConfig=" + restartBackOffConfig +
", alpakkaConfig=" + alpakkaConfig +
", metricCollectingInterval=" + metricCollectingInterval +
"]";
}

Expand Down
Loading

0 comments on commit b0b5554

Please sign in to comment.