Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ under the License.
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<!-- Testcontainers 2.x no longer pulls in JUnit 4 transitively,
but this module's main sources still use it (TemporaryFolder, Assume). -->
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<!-- To ensure that flink-dist is built beforehand -->
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.util.UserClassLoaderJarTestUtils;

import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down Expand Up @@ -68,7 +69,8 @@ public class HiveITCase extends TestLogger {

@ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

@ClassRule
// Testcontainers 2.x containers are no longer JUnit 4 TestRules, so the lifecycle is driven
// manually from @BeforeClass/@AfterClass instead of via @ClassRule.
public static final HiveContainers.HiveContainer HIVE_CONTAINER =
HiveContainers.createHiveContainer(
Arrays.asList("hive_sink1", "hive_sink2", "h_table_sink1", "h_table_sink2"));
Expand All @@ -93,10 +95,16 @@ public class HiveITCase extends TestLogger {

@BeforeClass
public static void beforeClass() throws Exception {
HIVE_CONTAINER.start();
initUDFJar();
initHiveConfFile();
}

@AfterClass
public static void afterClass() {
HIVE_CONTAINER.stop();
}

private static void initUDFJar() throws Exception {
Path tmpPath = TMP_FOLDER.getRoot().toPath();
LOG.info("The current temporary path: {}", tmpPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -86,9 +85,9 @@ protected void doStart() {
}

@Override
protected void finished(Description description) {
protected void containerIsStopping(InspectContainerResponse containerInfo) {
backupLogs();
super.finished(description);
super.containerIsStopping(containerInfo);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/flink-sql-client-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<artifactId>testcontainers-kafka</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
Expand Down Expand Up @@ -78,10 +78,10 @@ public class SqlClientITCase {
public static final Network NETWORK = Network.newNetwork();

@Container
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
public static final ConfluentKafkaContainer KAFKA =
new ConfluentKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
.withListener(INTER_CONTAINER_KAFKA_ALIAS + ":19092")
.withLogConsumer(LOG_CONSUMER);

public final FlinkContainers flink =
Expand Down Expand Up @@ -219,7 +219,7 @@ void testMatchRecognize() throws Exception {
" 'topic' = 'test-json',",
" 'properties.bootstrap.servers' = '"
+ INTER_CONTAINER_KAFKA_ALIAS
+ ":9092',",
+ ":19092',",
" 'scan.startup.mode' = 'earliest-offset',",
" 'format' = 'json',",
" 'json.timestamp-format.standard' = 'ISO-8601'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public class SqlGatewayE2ECase extends TestLogger {
private static final String RESULT_KEY = "$RESULT";

@ClassRule public static final TemporaryFolder FOLDER = new TemporaryFolder();
@ClassRule public static final HiveContainer HIVE_CONTAINER = new HiveContainer();

public static final HiveContainer HIVE_CONTAINER = new HiveContainer();

@Rule public final FlinkResource flinkResource = buildFlinkResource();

private static NetUtils.Port hiveserver2Port;
Expand All @@ -107,6 +109,7 @@ public class SqlGatewayE2ECase extends TestLogger {

@BeforeClass
public static void beforeClass() {
HIVE_CONTAINER.start();
ENDPOINT_CONFIG.setString(
getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), createHiveConf().getParent());
}
Expand All @@ -115,6 +118,7 @@ public static void beforeClass() {
public static void afterClass() throws Exception {
hiveserver2Port.close();
restPort.close();
HIVE_CONTAINER.stop();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -96,9 +95,9 @@ protected void containerIsStarted(InspectContainerResponse containerInfo) {
}

@Override
protected void finished(Description description) {
protected void containerIsStopping(InspectContainerResponse containerInfo) {
backupLogs();
super.finished(description);
super.containerIsStopping(containerInfo);
}

public String getHiveMetastoreURI() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
set -Eeuo pipefail

KAFKA_VERSION="3.2.3"
CONFLUENT_VERSION="7.2.9"
CONFLUENT_MAJOR_VERSION="7.2"
CONFLUENT_VERSION="7.5.3"
CONFLUENT_MAJOR_VERSION="7.5"
# Check the Confluent Platform <> Apache Kafka compatibility matrix when updating KAFKA_VERSION
KAFKA_SQL_VERSION="universal"

Expand Down
4 changes: 2 additions & 2 deletions flink-end-to-end-tests/test-scripts/test_pyflink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
set -Eeuo pipefail

KAFKA_VERSION="3.2.3"
CONFLUENT_VERSION="7.2.9"
CONFLUENT_MAJOR_VERSION="7.2"
CONFLUENT_VERSION="7.5.3"
CONFLUENT_MAJOR_VERSION="7.5"
# Check the Confluent Platform <> Apache Kafka compatibility matrix when updating KAFKA_VERSION
KAFKA_SQL_VERSION="universal"
SQL_JARS_DIR=${END_TO_END_DIR}/flink-sql-client-test/target/sql-jars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
*/
public class DockerImageVersions {

public static final String KAFKA = "confluentinc/cp-kafka:7.2.9";
public static final String KAFKA = "confluentinc/cp-kafka:7.5.3";

public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.2.9";
public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.5.3";

public static final String KINESALITE = "instructure/kinesalite:latest";

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ under the License.
<beam.version>2.43.0</beam.version>
<protoc.version>3.21.7</protoc.version>
<okhttp.version>3.14.9</okhttp.version>
<testcontainers.version>1.21.4</testcontainers.version>
<testcontainers.version>2.0.5</testcontainers.version>
<lz4.version>1.10.3</lz4.version>
<commons.io.version>2.15.1</commons.io.version>
<japicmp.skip>false</japicmp.skip>
Expand Down Expand Up @@ -286,7 +286,7 @@ under the License.

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

Expand Down