diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java index 7d0b4023835c..d229579ce5d6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java @@ -88,6 +88,41 @@ public FSDataOutputStream create( progressable)); } + @Override + public FSDataOutputStream create(Path path, boolean overwrite) throws IOException { + return runSecuredWithIOException(() -> fileSystem.create(path, overwrite)); + } + + @Override + public FSDataOutputStream create( + Path path, boolean overwrite, int bufferSize, short replication, long blockSize) + throws IOException { + return runSecuredWithIOException( + () -> fileSystem.create(path, overwrite, bufferSize, replication, blockSize)); + } + + @Override + public short getDefaultReplication(Path f) { + return runSecured(() -> fileSystem.getDefaultReplication(f)); + } + + @Deprecated + @Override + public short getDefaultReplication() { + return runSecured(() -> fileSystem.getDefaultReplication()); + } + + @Override + public long getDefaultBlockSize(Path f) { + return runSecured(() -> fileSystem.getDefaultBlockSize(f)); + } + + @Deprecated + @Override + public long getDefaultBlockSize() { + return runSecured(() -> fileSystem.getDefaultBlockSize()); + } + @Override public boolean exists(Path f) throws IOException { return runSecuredWithIOException(() -> fileSystem.exists(f)); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java new file mode 100644 index 000000000000..15dcc78aa99c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +/** Docker image versions. */ +public class DockerImageVersions { + + public static final String KAFKA = "confluentinc/cp-kafka:7.9.2"; + public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.9.2"; +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index ee5a653521e6..f36dd2bf8703 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -18,13 +18,13 @@ package org.apache.paimon.flink.action.cdc.kafka; +import org.apache.paimon.flink.DockerImageVersions; import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.util.DockerImageVersions; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java index eddb8ba897dd..2b21c0a9e0a2 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java @@ -18,12 +18,12 @@ package org.apache.paimon.flink.kafka; +import org.apache.paimon.flink.DockerImageVersions; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.util.DockerImageVersions; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic;