Skip to content

Commit

Permalink
Merge branch 'master' into feat/azure-gh
Browse files Browse the repository at this point in the history
  • Loading branch information
yannick-roeder committed Feb 6, 2024
2 parents d55d28d + e1f25c2 commit c7104ca
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 435 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Change Log

## [2.6.0](https://github.com/bakdata/kafka-large-message-serde/tree/2.6.0) (2024-01-05)
[View commits](https://github.com/bakdata/kafka-large-message-serde/compare/2.5.1...2.6.0)

**Closed issues:**

- Invalid value null for configuration key serializer exception [\#44](https://github.com/bakdata/kafka-large-message-serde/issues/44)

**Merged pull requests:**

- Update dependencies and switch to Java 11 [\#46](https://github.com/bakdata/kafka-large-message-serde/pull/46) ([@philipp94831](https://github.com/philipp94831))

## [2.5.1](https://github.com/bakdata/kafka-large-message-serde/tree/2.5.1) (2023-01-05)
[View commits](https://github.com/bakdata/kafka-large-message-serde/compare/2.5.0...2.5.1)

Expand Down
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ configure<org.hildan.github.changelog.plugin.GitHubChangelogExtension> {

subprojects {
apply(plugin = "java-library")
apply(plugin = "java-test-fixtures")
apply(plugin = "io.freefair.lombok")

configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
}

Expand Down
14 changes: 7 additions & 7 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
version=2.5.2-SNAPSHOT
version=2.6.1-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx2048m
kafkaVersion=3.3.1
confluentVersion=7.3.1
junitVersion=5.9.1
log4jVersion=2.19.0
assertJVersion=3.23.1
s3MockVersion=2.4.16
kafkaVersion=3.5.2
confluentVersion=7.5.1
junitVersion=5.10.1
log4jVersion=2.22.1
assertJVersion=3.25.1
joolVersion=0.9.14
testContainersVersion=1.19.3
10 changes: 3 additions & 7 deletions large-message-connect/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,13 @@ dependencies {
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)

val s3MockVersion: String by project
testImplementation(group = "com.adobe.testing", name = "s3mock-junit5", version = s3MockVersion) {
exclude(group = "ch.qos.logback")
exclude(group = "org.apache.logging.log4j", module = "log4j-to-slf4j")
}
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion)
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val joolVersion: String by project
testImplementation(group = "org.jooq", name = "jool-java-8", version = joolVersion)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.3.0") {
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.5.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion)
testImplementation(testFixtures(project(":large-message-core")))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -27,7 +27,6 @@
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;

import com.adobe.testing.s3mock.junit5.S3MockExtension;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -50,13 +49,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class LargeMessageConverterIntegrationTest {
@RegisterExtension
static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent()
.withSecureConnection(false).build();
class LargeMessageConverterIntegrationTest extends AmazonS3IntegrationTest {
private static final String BUCKET_NAME = "testbucket";
private static final String S3_KEY_NAME = "contentkey";
private static final String TOPIC = "input";
Expand All @@ -65,30 +61,11 @@ class LargeMessageConverterIntegrationTest {
private EmbeddedKafkaCluster kafkaCluster;
private Path outputFile;

static void configureS3HTTPService() {
System.setProperty(SdkSystemSetting.SYNC_HTTP_SERVICE_IMPL.property(),
"software.amazon.awssdk.http.apache.ApacheSdkHttpService");
}

private static Properties createS3BackedProperties() {
final Properties properties = new Properties();
properties.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, "http://localhost:" + S3_MOCK.getHttpPort());
properties.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1");
properties.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, "foo");
properties.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, "bar");
properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(
AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME));
properties.put(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return properties;
}

@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
S3_MOCK.createS3Client().createBucket(BUCKET_NAME);
configureS3HTTPService();
final S3Client s3 = this.getS3Client();
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
this.kafkaCluster = this.createCluster();
this.kafkaCluster.start();
}
Expand Down Expand Up @@ -126,6 +103,17 @@ private EmbeddedKafkaCluster createCluster() {
.build());
}

private Properties createS3BackedProperties() {
final Properties properties = new Properties();
properties.putAll(this.getLargeMessageConfig());
properties.put(LargeMessageSerdeConfig.KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
properties.put(
AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/%s", BUCKET_NAME, S3_KEY_NAME));
properties.setProperty(LargeMessageConverterConfig.CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return properties;
}

private Properties config() {
final Properties properties = new Properties();
properties.put(ConnectorConfig.NAME_CONFIG, "test");
Expand All @@ -134,7 +122,7 @@ private Properties config() {
properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, LargeMessageConverter.class.getName());
createS3BackedProperties().forEach(
this.createS3BackedProperties().forEach(
(key, value) -> properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + key, value));
return properties;
}
Expand All @@ -146,7 +134,7 @@ private Properties createProducerProperties(final boolean shouldBack) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG,
Integer.toString(shouldBack ? 0 : Integer.MAX_VALUE));
properties.putAll(createS3BackedProperties());
properties.putAll(this.createS3BackedProperties());
return properties;
}
}
Loading

0 comments on commit c7104ca

Please sign in to comment.