diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index f34798eeb8..2ec19b605b 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -336,7 +336,7 @@ fi KAFKA_JDK_COMPATIBILITY_OPTS="" if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then - KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED" + KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" fi if [[ "$JAVA_MAJOR_VERSION" -ge "16" ]]; then KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED" diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index 197557c68b..9878c0f8b3 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -21,8 +21,12 @@ import kafka.server.KafkaConfig; public class ConfigUtils { + public static final String ACCESS_KEY_NAME = "KAFKA_S3_ACCESS_KEY"; + public static final String SECRET_KEY_NAME = "KAFKA_S3_SECRET_KEY"; public static Config to(KafkaConfig s) { + String accessKey = System.getenv(ACCESS_KEY_NAME); + String secretKey = System.getenv(SECRET_KEY_NAME); return new Config() .nodeId(s.nodeId()) .nodeEpoch(s.nodeEpoch()) @@ -58,7 +62,9 @@ public static Config to(KafkaConfig s) { .networkBaselineBandwidth(s.s3NetworkBaselineBandwidthProp()) .refillPeriodMs(s.s3RefillPeriodMsProp()) .objectRetentionTimeInSecond(s.s3ObjectRetentionTimeInSecond()) - .failoverEnable(s.s3FailoverEnable()); + .failoverEnable(s.s3FailoverEnable()) + .accessKey(accessKey) + .secretKey(secretKey); } } diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 858e4f95ea..9f923808ac 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -49,9 +49,6 @@ import java.util.concurrent.TimeUnit; public class DefaultS3Client implements Client { - public static final String ACCESS_KEY_NAME = "KAFKA_S3_ACCESS_KEY"; - public static final String SECRET_KEY_NAME = "KAFKA_S3_SECRET_KEY"; - private final static Logger LOGGER = LoggerFactory.getLogger(DefaultS3Client.class); private final Config config; private final StreamMetadataManager metadataManager; @@ -88,8 +85,8 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { config.networkBaselineBandwidth(), config.refillPeriodMs(), config.networkBaselineBandwidth()); networkOutboundLimiter = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, config.networkBaselineBandwidth(), config.refillPeriodMs(), config.networkBaselineBandwidth()); - String accessKey = System.getenv(ACCESS_KEY_NAME); - String secretKey = System.getenv(SECRET_KEY_NAME); + String accessKey = this.config.accessKey(); + String secretKey = this.config.secretKey(); S3Operator s3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).accessKey(accessKey).secretKey(secretKey) .inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true).build(); S3Operator compactionS3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).accessKey(accessKey).secretKey(secretKey)