From 2eec217c36c87c153bf34a5c2794680f2cde8d6c Mon Sep 17 00:00:00 2001 From: SeungMin Date: Fri, 14 Nov 2025 15:09:36 +0900 Subject: [PATCH 1/2] [fileio] Delegate create()/getDefault() in HadoopSecuredFileSystem (#6602) --- .../fs/hadoop/HadoopSecuredFileSystem.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java index 7d0b4023835c..d229579ce5d6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java @@ -88,6 +88,41 @@ public FSDataOutputStream create( progressable)); } + @Override + public FSDataOutputStream create(Path path, boolean overwrite) throws IOException { + return runSecuredWithIOException(() -> fileSystem.create(path, overwrite)); + } + + @Override + public FSDataOutputStream create( + Path path, boolean overwrite, int bufferSize, short replication, long blockSize) + throws IOException { + return runSecuredWithIOException( + () -> fileSystem.create(path, overwrite, bufferSize, replication, blockSize)); + } + + @Override + public short getDefaultReplication(Path f) { + return runSecured(() -> fileSystem.getDefaultReplication(f)); + } + + @Deprecated + @Override + public short getDefaultReplication() { + return runSecured(() -> fileSystem.getDefaultReplication()); + } + + @Override + public long getDefaultBlockSize(Path f) { + return runSecured(() -> fileSystem.getDefaultBlockSize(f)); + } + + @Deprecated + @Override + public long getDefaultBlockSize() { + return runSecured(() -> fileSystem.getDefaultBlockSize()); + } + @Override public boolean exists(Path f) throws IOException { return runSecuredWithIOException(() -> fileSystem.exists(f)); From 6bd995538a6218d472c160af2f70000084b40fe0 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 28 Feb 2026 15:34:04 +0800 Subject: [PATCH 2/2] [test] Use new version of kafka docker image --- .../paimon/flink/DockerImageVersions.java | 26 +++++++++++++++++++ .../cdc/kafka/KafkaActionITCaseBase.java | 2 +- .../flink/kafka/KafkaTableTestBase.java | 2 +- 3 files changed, 28 insertions(+), 2 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java new file mode 100644 index 000000000000..15dcc78aa99c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +/** Docker image versions. */ +public class DockerImageVersions { + + public static final String KAFKA = "confluentinc/cp-kafka:7.9.2"; + public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.9.2"; +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index ee5a653521e6..f36dd2bf8703 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -18,13 +18,13 @@ package org.apache.paimon.flink.action.cdc.kafka; +import org.apache.paimon.flink.DockerImageVersions; import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.util.DockerImageVersions; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java index eddb8ba897dd..2b21c0a9e0a2 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java @@ -18,12 +18,12 @@ package org.apache.paimon.flink.kafka; +import org.apache.paimon.flink.DockerImageVersions; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.util.DockerImageVersions; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic;