diff --git a/build.sh b/build.sh index 552b6d48b8e917..985cc05f63c5a3 100755 --- a/build.sh +++ b/build.sh @@ -641,7 +641,8 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then modules+=("fe-extension-spi") modules+=("fe-extension-loader") modules+=("fe-core") - # Filesystem SPI plugin modules (loaded at runtime as plugins) + # Filesystem API and SPI plugin modules (loaded at runtime as plugins) + modules+=("fe-filesystem/fe-filesystem-api") modules+=("fe-filesystem/fe-filesystem-spi") for _fs_mod in s3 oss cos obs azure hdfs local broker; do if [[ -d "${DORIS_HOME}/fe/fe-filesystem/fe-filesystem-${_fs_mod}" ]]; then @@ -987,7 +988,7 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then (cd "${DORIS_HOME}/fe" && "${MVN_CMD}" dependency:copy-dependencies \ -pl "fe-filesystem/fe-filesystem-${fs_module}" \ -DoutputDirectory="${fs_plugin_target}" \ - -DexcludeArtifactIds="fe-filesystem-spi,fe-extension-spi" \ + -DexcludeArtifactIds="fe-filesystem-api,fe-filesystem-spi,fe-extension-spi" \ --no-transfer-progress -q 2>/dev/null) || true done unset FS_PLUGIN_DIR fs_module fs_plugin_target fs_module_dir diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 36e10387f89507..e6340eb9bb2e94 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -69,7 +69,14 @@ under the License. fe-common ${project.version} - + + + ${project.groupId} + fe-filesystem-api + ${project.version} + + ${project.groupId} fe-filesystem-spi diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index ae06d0d8ef8995..08abb011d2ce2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -32,11 +32,11 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.storage.BrokerProperties; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; import org.apache.doris.foundation.fs.FsStorageType; import org.apache.doris.fs.FileSystemDescriptor; import org.apache.doris.fs.FileSystemFactory; @@ -139,7 +139,7 @@ public class Repository implements Writable, GsonPostProcessable { /** SPI filesystem for I/O operations; transient — rebuilt in {@link #gsonPostProcess()}. * Null for BROKER repositories, which resolve a live broker endpoint per I/O call. */ - private transient org.apache.doris.filesystem.spi.FileSystem spiFs; + private transient org.apache.doris.filesystem.FileSystem spiFs; public FileSystemDescriptor getFileSystemDescriptor() { return fileSystemDescriptor; @@ -275,7 +275,7 @@ public String getErrorMsg() { * instance that must be closed by calling {@link #releaseSpiFs}. * */ - private org.apache.doris.filesystem.spi.FileSystem acquireSpiFs() throws IOException { + private org.apache.doris.filesystem.FileSystem acquireSpiFs() throws IOException { if (spiFs != null) { return spiFs; } @@ -298,7 +298,7 @@ private org.apache.doris.filesystem.spi.FileSystem acquireSpiFs() throws IOExcep * Releases an SPI FileSystem acquired via {@link #acquireSpiFs}. * Closes broker per-call instances; leaves the shared non-broker instance open. */ - private void releaseSpiFs(org.apache.doris.filesystem.spi.FileSystem fs) { + private void releaseSpiFs(org.apache.doris.filesystem.FileSystem fs) { if (fs != spiFs) { try { fs.close(); @@ -309,7 +309,7 @@ private void releaseSpiFs(org.apache.doris.filesystem.spi.FileSystem fs) { } /** Uploads a local file to a remote path via the SPI filesystem. */ - private static void spiUploadFile(org.apache.doris.filesystem.spi.FileSystem fs, + private static void spiUploadFile(org.apache.doris.filesystem.FileSystem fs, String localFilePath, String remotePath) throws IOException { DorisOutputFile outputFile = fs.newOutputFile(Location.of(remotePath)); try (java.io.InputStream in = Files.newInputStream(Paths.get(localFilePath)); @@ -338,7 +338,7 @@ public Status initRepository() { } String repoInfoFilePath = assembleRepoInfoFilePath(); - org.apache.doris.filesystem.spi.FileSystem fs; + org.apache.doris.filesystem.FileSystem fs; try { fs = acquireSpiFs(); } catch (IOException e) { @@ -460,7 +460,7 @@ public boolean ping() { String path = location + "/" + joinPrefix(PREFIX_REPO, name) + "/" + FILE_REPO_INFO; try { URI checkUri = new URI(path); - org.apache.doris.filesystem.spi.FileSystem fs = acquireSpiFs(); + org.apache.doris.filesystem.FileSystem fs = acquireSpiFs(); try { boolean exists = fs.exists(Location.of(checkUri.normalize().toString())); if (!exists) { @@ -489,7 +489,7 @@ public Status listSnapshots(List snapshotNames) { // eg. __palo_repository_repo_name/__ss_* String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR) + "*"; - org.apache.doris.filesystem.spi.FileSystem fs; + org.apache.doris.filesystem.FileSystem fs; try { fs = acquireSpiFs(); } catch (IOException e) { @@ -610,7 +610,7 @@ public Status upload(String localFilePath, String remoteFilePath) { Preconditions.checkState(!Strings.isNullOrEmpty(md5sum)); String finalRemotePath = assembleFileNameWithSuffix(remoteFilePath, md5sum); - org.apache.doris.filesystem.spi.FileSystem fs; + org.apache.doris.filesystem.FileSystem fs; try { fs = acquireSpiFs(); } catch (IOException e) { @@ -647,7 +647,7 @@ public Status upload(String localFilePath, String remoteFilePath) { // remoteFilePath must be a file(not dir) and does not contain checksum public Status download(String remoteFilePath, String localFilePath) { - org.apache.doris.filesystem.spi.FileSystem fs; + org.apache.doris.filesystem.FileSystem fs; try { fs = acquireSpiFs(); } catch (IOException e) { @@ -822,7 +822,7 @@ private List getSnapshotInfo(String snapshotName, String timestamp) { LOG.debug("assemble infoFilePath: {}, snapshot: {}", infoFilePath, snapshotName); } try { - org.apache.doris.filesystem.spi.FileSystem fs = acquireSpiFs(); + org.apache.doris.filesystem.FileSystem fs = acquireSpiFs(); List results; try { results = fs.listFiles(Location.of(infoFilePath + "*")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AzureResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AzureResource.java index 0a5c2592be14d8..9626d6a8dcff75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AzureResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AzureResource.java @@ -98,7 +98,7 @@ protected static void pingAzure(String bucketName, String rootPath, Arrays.fill(contentData, (byte) 'A'); try { - org.apache.doris.filesystem.spi.FileSystem fileSystem = + org.apache.doris.filesystem.FileSystem fileSystem = FileSystemFactory.getFileSystem(newProperties); Preconditions.checkState(fileSystem instanceof ObjFileSystem, "Expected object-storage filesystem for Azure resource"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java index d471ecf9826101..8ca46edc1a8d52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java @@ -22,8 +22,8 @@ import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.DatasourcePrintableMap; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import org.apache.doris.fs.FileSystemFactory; import com.google.common.base.Preconditions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index a226dcffa172b1..15bd252643094f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -134,7 +134,7 @@ protected static void pingS3(String bucketName, String rootPath, Map properties = buildProperties(objInfo); - org.apache.doris.filesystem.spi.FileSystem fileSystem = + org.apache.doris.filesystem.FileSystem fileSystem = FileSystemFactory.getFileSystem(properties); DorisInputFile inputFile = fileSystem.newInputFile(Location.of(remotePath)); try (InputStream in = inputFile.newStream()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 844fa0bf0a7939..2753af917bb462 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -27,9 +27,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HiveExternalMetaCache; import org.apache.doris.datasource.property.storage.BrokerProperties; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TBrokerCloseReaderRequest; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 3809e012d8ccc0..ff1f0c87a068a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -20,7 +20,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.AzurePropertyUtils; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystemType; +import org.apache.doris.filesystem.FileSystemType; import org.apache.doris.foundation.property.StoragePropertiesException; import org.apache.doris.fs.SchemaTypeMapper; import org.apache.doris.thrift.TFileType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java index f937d46b2a63d4..4ff3d85f3b9860 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java @@ -21,10 +21,10 @@ import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.hive.HiveExternalMetaCache.FileCacheValue; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemTransferUtil; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemTransferUtil; +import org.apache.doris.filesystem.Location; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -419,7 +419,7 @@ && isValidBase(fileSystem, dirPath, base, validWriteIdList)) { return fileCacheValue; } - private static String locationName(org.apache.doris.filesystem.spi.Location location) { + private static String locationName(org.apache.doris.filesystem.Location location) { String uri = location.uri(); int idx = uri.lastIndexOf('/'); return idx >= 0 ? uri.substring(idx + 1) : uri; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 03ba5ef66f579a..f310059ac3d342 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -25,10 +25,10 @@ import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.statistics.CommonStatistics; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemUtil; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemUtil; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.spi.ObjFileSystem; import org.apache.doris.foundation.util.PathUtils; import org.apache.doris.fs.SpiSwitchingFileSystem; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java index 8137fd6aa654b3..4cc8d9aafa33e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java @@ -44,11 +44,11 @@ import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.metacache.MetaCacheEntry; import org.apache.doris.datasource.metacache.MetaCacheEntryDef; -import org.apache.doris.filesystem.spi.BlockInfo; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemIOException; -import org.apache.doris.filesystem.spi.RemoteIterator; +import org.apache.doris.filesystem.BlockInfo; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemIOException; +import org.apache.doris.filesystem.RemoteIterator; import org.apache.doris.fs.DirectoryLister; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.fs.FileSystemDirectoryLister; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index f223d7f59f157a..b99541c2bde508 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -22,7 +22,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.statistics.CommonStatistics; -import org.apache.doris.filesystem.spi.FileSystem; +import org.apache.doris.filesystem.FileSystem; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateFileIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateFileIO.java index e3459f69789775..8d42726421d491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateFileIO.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateFileIO.java @@ -18,8 +18,8 @@ package org.apache.doris.datasource.iceberg.fileio; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import org.apache.doris.fs.FileSystemFactory; import org.apache.iceberg.DataFile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateInputFile.java index 9931ff0d454d3f..ea5f36bb6e4a50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateInputFile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateInputFile.java @@ -17,7 +17,7 @@ package org.apache.doris.datasource.iceberg.fileio; -import org.apache.doris.filesystem.spi.DorisInputFile; +import org.apache.doris.filesystem.DorisInputFile; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.InputFile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateOutputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateOutputFile.java index df916215b7d5c2..ab3315d6e3f981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateOutputFile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateOutputFile.java @@ -17,9 +17,9 @@ package org.apache.doris.datasource.iceberg.fileio; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import com.google.common.io.CountingOutputStream; import org.apache.iceberg.io.InputFile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateSeekableInputStream.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateSeekableInputStream.java index ce7e1f867b40d2..b44ac1b1c8057b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateSeekableInputStream.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/fileio/DelegateSeekableInputStream.java @@ -17,7 +17,7 @@ package org.apache.doris.datasource.iceberg.fileio; -import org.apache.doris.filesystem.spi.DorisInputStream; +import org.apache.doris.filesystem.DorisInputStream; import org.apache.iceberg.io.SeekableInputStream; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java index c3fb959ad9d3a3..4302f3397892cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java @@ -21,10 +21,10 @@ package org.apache.doris.fs; import org.apache.doris.catalog.TableIf; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemIOException; -import org.apache.doris.filesystem.spi.RemoteIterator; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemIOException; +import org.apache.doris.filesystem.RemoteIterator; public interface DirectoryLister { RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index 36e9f2051bf43d..2ff465cbedec12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -20,7 +20,7 @@ import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystem; +import org.apache.doris.filesystem.FileSystem; import com.github.benmanes.caffeine.cache.LoadingCache; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java index 31c997685f5776..19c411e37cc67f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java @@ -18,12 +18,12 @@ package org.apache.doris.fs; import org.apache.doris.catalog.TableIf; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemIOException; -import org.apache.doris.filesystem.spi.FileSystemTransferUtil; -import org.apache.doris.filesystem.spi.RemoteIterator; -import org.apache.doris.filesystem.spi.SimpleRemoteIterator; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemIOException; +import org.apache.doris.filesystem.FileSystemTransferUtil; +import org.apache.doris.filesystem.RemoteIterator; +import org.apache.doris.filesystem.SimpleRemoteIterator; import java.io.IOException; import java.util.List; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index b328b6a04c0b50..a0ec9c60934c77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -18,7 +18,7 @@ package org.apache.doris.fs; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystemProvider; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,10 +70,10 @@ public static void initPluginManager(FileSystemPluginManager manager) { * Otherwise falls back to ServiceLoader discovery (unit-test / migration path). * * @param properties key-value storage configuration - * @return initialized {@code org.apache.doris.filesystem.spi.FileSystem} + * @return initialized {@code org.apache.doris.filesystem.FileSystem} * @throws IOException if no provider matches or creation fails */ - public static org.apache.doris.filesystem.spi.FileSystem getFileSystem(Map properties) + public static org.apache.doris.filesystem.FileSystem getFileSystem(Map properties) throws IOException { FileSystemPluginManager mgr = pluginManager; if (mgr != null) { @@ -101,7 +101,7 @@ public static org.apache.doris.filesystem.spi.FileSystem getFileSystem(MapThe caller is responsible for resolving the broker name to a live host:port via @@ -151,10 +151,10 @@ static void clearProviderCache() { * @param port live broker Thrift port * @param clientId FE identifier sent to broker for logging (e.g. "host:editLogPort") * @param brokerParams broker-specific params (username, password, hadoop config, ...) - * @return initialized {@code org.apache.doris.filesystem.spi.FileSystem} + * @return initialized {@code org.apache.doris.filesystem.FileSystem} * @throws IOException if the broker filesystem provider is not found or creation fails */ - public static org.apache.doris.filesystem.spi.FileSystem getBrokerFileSystem( + public static org.apache.doris.filesystem.FileSystem getBrokerFileSystem( String host, int port, String clientId, Map brokerParams) throws IOException { Map props = new HashMap<>(brokerParams); props.put("_STORAGE_TYPE_", "BROKER"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemPluginManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemPluginManager.java index 07e17cfc3be6ad..9e86b7551d1f0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemPluginManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemPluginManager.java @@ -22,8 +22,8 @@ import org.apache.doris.extension.loader.LoadFailure; import org.apache.doris.extension.loader.LoadReport; import org.apache.doris.extension.loader.PluginHandle; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java index cd0df4e72957ce..a5e750b83d0ecd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java @@ -18,7 +18,7 @@ package org.apache.doris.fs; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystemType; +import org.apache.doris.filesystem.FileSystemType; import org.apache.doris.thrift.TFileType; import lombok.Getter; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SpiSwitchingFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SpiSwitchingFileSystem.java index 25cc63ebfeeb4a..159a5cff677494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/SpiSwitchingFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SpiSwitchingFileSystem.java @@ -19,13 +19,13 @@ import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.GlobListing; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.GlobListing; +import org.apache.doris.filesystem.Location; import com.google.common.annotations.VisibleForTesting; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/StoragePropertiesConverter.java b/fe/fe-core/src/main/java/org/apache/doris/fs/StoragePropertiesConverter.java index 6534aeb6acb118..5c79aee787c9f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/StoragePropertiesConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/StoragePropertiesConverter.java @@ -30,8 +30,8 @@ /** * Converts legacy {@link StorageProperties} objects to the {@code Map} format - * expected by {@link org.apache.doris.filesystem.spi.FileSystemProvider#supports} and - * {@link org.apache.doris.filesystem.spi.FileSystemProvider#create}. + * expected by {@link org.apache.doris.filesystem.FileSystemProvider#supports} and + * {@link org.apache.doris.filesystem.FileSystemProvider#create}. * *

This class is the ONLY place in fe-core that knows about the mapping between * StorageProperties subtypes and filesystem property keys. All other code in fe-core diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java index 2b4d7c654bfdd3..24979236c6d03a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java @@ -21,10 +21,10 @@ package org.apache.doris.fs; import org.apache.doris.catalog.TableIf; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystemIOException; -import org.apache.doris.filesystem.spi.RemoteIterator; -import org.apache.doris.filesystem.spi.SimpleRemoteIterator; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystemIOException; +import org.apache.doris.filesystem.RemoteIterator; +import org.apache.doris.filesystem.SimpleRemoteIterator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -71,13 +71,13 @@ public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long tra } @Override - public RemoteIterator listFiles(org.apache.doris.filesystem.spi.FileSystem fs, boolean recursive, + public RemoteIterator listFiles(org.apache.doris.filesystem.FileSystem fs, boolean recursive, TableIf table, String location) throws FileSystemIOException { return listInternal(fs, recursive, table, new TransactionDirectoryListingCacheKey(transactionId, location)); } - private RemoteIterator listInternal(org.apache.doris.filesystem.spi.FileSystem fs, + private RemoteIterator listInternal(org.apache.doris.filesystem.FileSystem fs, boolean recursive, TableIf table, TransactionDirectoryListingCacheKey cacheKey) throws FileSystemIOException { @@ -100,7 +100,7 @@ private RemoteIterator listInternal(org.apache.doris.filesystem.spi.F } private RemoteIterator createListingRemoteIterator( - org.apache.doris.filesystem.spi.FileSystem fs, + org.apache.doris.filesystem.FileSystem fs, boolean recursive, TableIf table, TransactionDirectoryListingCacheKey cacheKey) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java index 6aacb93aab7bd5..b901bc2a72c495 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java @@ -19,10 +19,10 @@ import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.GlobListing; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.GlobListing; +import org.apache.doris.filesystem.Location; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.job.offset.Offset; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java index b7ee36ada33330..7374bc7139bc00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java @@ -25,7 +25,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.Location; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; @@ -192,7 +192,7 @@ private void deleteExistingFilesInFE(String tvfName, Map props) fsCopyProps.remove("compress_type"); StorageProperties storageProps = StorageProperties.createPrimary(fsCopyProps); - try (org.apache.doris.filesystem.spi.FileSystem fs = FileSystemFactory.getFileSystem(storageProps)) { + try (org.apache.doris.filesystem.FileSystem fs = FileSystemFactory.getFileSystem(storageProps)) { fs.delete(Location.of(parentDir), true); } catch (java.io.IOException e) { throw new UserException("Failed to delete existing files in " + parentDir + ": " + e.getMessage()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java index 2ad2d805567980..54379354cc3c98 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java @@ -25,12 +25,12 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.BrokerProperties; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.service.FrontendOptions; @@ -72,7 +72,7 @@ public class RepositoryTest { private SnapshotInfo info; @Mocked - private org.apache.doris.filesystem.spi.FileSystem mockFs; + private org.apache.doris.filesystem.FileSystem mockFs; @Mocked private DorisOutputFile mockOutputFile; @Mocked @@ -103,13 +103,13 @@ String getLocalHostAddress() { // acquireSpiFs() (broker path) returns the mock without a real connection. new MockUp() { @Mock - public org.apache.doris.filesystem.spi.FileSystem getFileSystem( + public org.apache.doris.filesystem.FileSystem getFileSystem( Map properties) throws IOException { return mockFs; } @Mock - public org.apache.doris.filesystem.spi.FileSystem getFileSystem( + public org.apache.doris.filesystem.FileSystem getFileSystem( StorageProperties storageProperties) throws IOException { return mockFs; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index 9916b496e35336..cccbc63bcc86e4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -19,7 +19,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystemType; +import org.apache.doris.filesystem.FileSystemType; import org.apache.doris.foundation.property.StoragePropertiesException; import org.apache.doris.thrift.TFileType; diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java index 03ddbd06e9055f..35971d3d3e1f16 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java @@ -17,13 +17,13 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.local.LocalFileSystem; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; import org.apache.doris.fs.SpiSwitchingFileSystem; import org.apache.doris.qe.ConnectContext; diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index eae3382f23fd82..60ad40dfcb617b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -364,7 +364,7 @@ public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdate }); if (mode != TUpdateMode.NEW) { - fs.mkdirs(org.apache.doris.filesystem.spi.Location.of(targetPath)); + fs.mkdirs(org.apache.doris.filesystem.Location.of(targetPath)); } java.nio.file.Path writeDir = java.nio.file.Paths.get( @@ -398,7 +398,7 @@ private String getWritePath() { private boolean fsExists(String path) { try { - return fs.exists(org.apache.doris.filesystem.spi.Location.of(path)); + return fs.exists(org.apache.doris.filesystem.Location.of(path)); } catch (java.io.IOException e) { return false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemTransferUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemTransferUtilTest.java index 5db4c650bee118..21c777fee191b0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemTransferUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemTransferUtilTest.java @@ -17,11 +17,11 @@ package org.apache.doris.fs; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemTransferUtil; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.local.LocalFileSystemProvider; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemTransferUtil; -import org.apache.doris.filesystem.spi.Location; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystem.java b/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystem.java index 86e5ec9b5098f6..fde4c1d6a73e1c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystem.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystem.java @@ -17,12 +17,12 @@ package org.apache.doris.fs; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -38,13 +38,13 @@ import java.util.stream.Collectors; /** - * In-memory {@link org.apache.doris.filesystem.spi.FileSystem} implementation for unit testing. + * In-memory {@link org.apache.doris.filesystem.FileSystem} implementation for unit testing. *

* File data is stored in a {@link ConcurrentHashMap}. Directories are implicit * (any Location whose path ends with "/" is treated as a directory). * Thread-safe for concurrent read/write operations. */ -public class MemoryFileSystem implements org.apache.doris.filesystem.spi.FileSystem { +public class MemoryFileSystem implements org.apache.doris.filesystem.FileSystem { // Maps location string → file bytes (null entry = directory marker) private final ConcurrentHashMap store = new ConcurrentHashMap<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystemTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystemTest.java index f19b02a1ab68eb..05e9c30fe6cbe2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystemTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/MemoryFileSystemTest.java @@ -17,9 +17,9 @@ package org.apache.doris.fs; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; import org.junit.Assert; import org.junit.Before; diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/SchemaTypeMapperTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/SchemaTypeMapperTest.java index e09212d06fab9b..62881603c0c46d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/SchemaTypeMapperTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/SchemaTypeMapperTest.java @@ -18,7 +18,7 @@ package org.apache.doris.fs; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.filesystem.spi.FileSystemType; +import org.apache.doris.filesystem.FileSystemType; import org.apache.doris.thrift.TFileType; import org.junit.jupiter.api.Assertions; diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java index 8c3f7c7eeade4b..f55107b27b5bc4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java @@ -21,10 +21,10 @@ package org.apache.doris.fs; import org.apache.doris.catalog.TableIf; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileSystemIOException; -import org.apache.doris.filesystem.spi.Location; -import org.apache.doris.filesystem.spi.RemoteIterator; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileSystemIOException; +import org.apache.doris.filesystem.Location; +import org.apache.doris.filesystem.RemoteIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -143,7 +143,7 @@ public CountingDirectoryLister(Map> fileStatuses) { } @Override - public RemoteIterator listFiles(org.apache.doris.filesystem.spi.FileSystem fs, boolean recursive, + public RemoteIterator listFiles(org.apache.doris.filesystem.FileSystem fs, boolean recursive, TableIf table, String location) throws FileSystemIOException { // No specific recursive files-only listing implementation diff --git a/fe/fe-filesystem/fe-filesystem-api/pom.xml b/fe/fe-filesystem/fe-filesystem-api/pom.xml new file mode 100644 index 00000000000000..0dabcff314a78d --- /dev/null +++ b/fe/fe-filesystem/fe-filesystem-api/pom.xml @@ -0,0 +1,59 @@ + + + + 4.0.0 + + + org.apache.doris + fe-filesystem + ${revision} + ../pom.xml + + + fe-filesystem-api + jar + Doris FE Filesystem API + + Consumer-facing API for the Doris FE filesystem abstraction layer. + Contains the core FileSystem interface, value types (Location, FileEntry, BlockInfo), + streaming types (FileIterator, DorisInputFile, DorisOutputStream), utility helpers, + and the FileSystemProvider discovery interface. + + Zero third-party external dependencies — only JDK and the Doris plugin registration + interface (fe-extension-spi). Both fe-core callers and filesystem provider + implementations depend on this artifact. + + + + + + ${project.groupId} + fe-extension-spi + ${project.version} + + + + + doris-fe-filesystem-api + + diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/BlockInfo.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/BlockInfo.java similarity index 97% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/BlockInfo.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/BlockInfo.java index 911b4e93d8fa86..fd31ffaee7c44c 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/BlockInfo.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/BlockInfo.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; /** * Represents a block location within a file (e.g., HDFS block). diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisInputFile.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisInputFile.java similarity index 98% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisInputFile.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisInputFile.java index fd607455025e9d..ad5273f307b2b8 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisInputFile.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisInputFile.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.IOException; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisInputStream.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisInputStream.java similarity index 97% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisInputStream.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisInputStream.java index 2939c07eb2c29f..09ee4f5351552e 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisInputStream.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisInputStream.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.IOException; import java.io.InputStream; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisOutputFile.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisOutputFile.java similarity index 96% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisOutputFile.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisOutputFile.java index 4eb117c9764841..a4aae2221927c3 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/DorisOutputFile.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/DorisOutputFile.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.IOException; import java.io.OutputStream; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileEntry.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileEntry.java similarity index 98% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileEntry.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileEntry.java index a33ba7f91f4aec..16f523ca992fa7 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileEntry.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileEntry.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.util.List; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileIterator.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileIterator.java similarity index 96% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileIterator.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileIterator.java index 9c61a954af9caa..c5ebed80d38cb4 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileIterator.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileIterator.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.Closeable; import java.io.IOException; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystem.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystem.java similarity index 99% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystem.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystem.java index 9fcb2537cc7443..20d1c36bb434ee 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystem.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.IOException; import java.util.ArrayList; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemIOException.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemIOException.java similarity index 96% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemIOException.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemIOException.java index 335123af957323..164e88bff616e1 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemIOException.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemIOException.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.IOException; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemProvider.java similarity index 97% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemProvider.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemProvider.java index 2517b880faad15..ed546042fc313e 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemProvider.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import org.apache.doris.extension.spi.Plugin; import org.apache.doris.extension.spi.PluginFactory; @@ -33,7 +33,7 @@ * *

Implementations must: * 1. Have a public no-arg constructor. - * 2. Register in META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider. + * 2. Register in META-INF/services/org.apache.doris.filesystem.FileSystemProvider. * 3. Have NO dependency on fe-core, fe-common, or fe-catalog. */ public interface FileSystemProvider extends PluginFactory { diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemTransferUtil.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemTransferUtil.java similarity index 99% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemTransferUtil.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemTransferUtil.java index 611e74fd90ae1d..3c67d76015294f 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemTransferUtil.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemTransferUtil.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.io.IOException; import java.io.InputStream; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemType.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemType.java similarity index 98% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemType.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemType.java index c50b1b8547d699..ac8d82f5cc0b1f 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemType.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemType.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; // TODO: [FileSystemType Unification] // There are currently multiple definitions of file system types across the codebase, including but not limited to: diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemUtil.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemUtil.java similarity index 98% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemUtil.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemUtil.java index 9eb6ac0dfa9d3b..1de7a29425b8f7 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/FileSystemUtil.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/FileSystemUtil.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.util.List; import java.util.concurrent.CompletableFuture; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/GlobListing.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/GlobListing.java similarity index 98% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/GlobListing.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/GlobListing.java index c65cc3db11f0f7..a8b82c6a250796 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/GlobListing.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/GlobListing.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.util.List; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/Location.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/Location.java similarity index 97% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/Location.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/Location.java index 9ed8210dc26e70..0ed2da147decea 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/Location.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/Location.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.util.Objects; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteIterator.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/RemoteIterator.java similarity index 96% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteIterator.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/RemoteIterator.java index e566963080aaa4..97d4edf672bf57 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteIterator.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/RemoteIterator.java @@ -18,7 +18,7 @@ // https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java // and modified by Doris -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; public interface RemoteIterator { boolean hasNext() throws FileSystemIOException; diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/SimpleRemoteIterator.java b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/SimpleRemoteIterator.java similarity index 97% rename from fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/SimpleRemoteIterator.java rename to fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/SimpleRemoteIterator.java index 70b03c25b326b5..8b01299d131357 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/SimpleRemoteIterator.java +++ b/fe/fe-filesystem/fe-filesystem-api/src/main/java/org/apache/doris/filesystem/SimpleRemoteIterator.java @@ -18,7 +18,7 @@ // https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java // and modified by Doris -package org.apache.doris.filesystem.spi; +package org.apache.doris.filesystem; import java.util.Iterator; import java.util.Objects; diff --git a/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystem.java b/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystem.java index cde5888861b565..5535a214580265 100644 --- a/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystem.java @@ -17,12 +17,12 @@ package org.apache.doris.filesystem.azure; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.spi.ObjFileSystem; import org.apache.doris.filesystem.spi.RemoteObject; import org.apache.doris.filesystem.spi.RemoteObjects; @@ -40,7 +40,7 @@ import java.util.List; /** - * Azure Blob Storage-backed {@link org.apache.doris.filesystem.spi.FileSystem} implementation. + * Azure Blob Storage-backed {@link org.apache.doris.filesystem.FileSystem} implementation. * *

Does not depend on fe-core, fe-common, or fe-catalog. * Azure does not support atomic directory renames; {@link #rename} is limited to single blobs. @@ -101,12 +101,12 @@ private void deleteRecursive(String prefix) throws IOException { * Azure Blob Storage does not support atomic directory renames. * Single-blob renames are supported via copy-then-delete. * - * @throws UnsupportedOperationException if the source appears to be a directory prefix + * @throws IOException if the source appears to be a directory prefix */ @Override public void rename(Location src, Location dst) throws IOException { if (src.uri().endsWith(DIR_MARKER_SUFFIX)) { - throw new UnsupportedOperationException( + throw new IOException( "Renaming directories is not supported in Azure Blob Storage."); } objStorage.copyObject(src.uri(), dst.uri()); @@ -176,7 +176,7 @@ private void fetchNextPage() throws IOException { bufferIdx = 0; for (RemoteObject obj : page.getObjectList()) { Location loc = Location.of(rebuildUri(prefix, obj.getKey())); - buffer.add(new FileEntry(loc, obj.getSize(), false, obj.modificationTime(), List.of())); + buffer.add(new FileEntry(loc, obj.getSize(), false, obj.getModificationTime(), List.of())); } if (page.isTruncated()) { continuationToken = page.getContinuationToken(); diff --git a/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystemProvider.java index 965c409e5fc872..1fe67b914731e8 100644 --- a/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-azure/src/main/java/org/apache/doris/filesystem/azure/AzureFileSystemProvider.java @@ -17,8 +17,8 @@ package org.apache.doris.filesystem.azure; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import java.io.IOException; import java.util.Map; @@ -26,7 +26,7 @@ /** * SPI provider for Azure Blob Storage. * - *

Registered via META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider. + *

Registered via META-INF/services/org.apache.doris.filesystem.FileSystemProvider. * *

Identified by the presence of {@code AZURE_ACCOUNT_NAME}, {@code azure.account_name}, * or an endpoint containing {@code blob.core.windows.net}. diff --git a/fe/fe-filesystem/fe-filesystem-azure/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-azure/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-azure/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-azure/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerFileSystemProvider.java index 376ccacc8d1fe6..c67aa47e65e5b3 100644 --- a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerFileSystemProvider.java @@ -17,8 +17,8 @@ package org.apache.doris.filesystem.broker; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import java.io.IOException; import java.util.HashMap; @@ -33,7 +33,7 @@ * avoiding any dependency on BrokerMgr here. * *

Registered via: - * META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider + * META-INF/services/org.apache.doris.filesystem.FileSystemProvider */ public class BrokerFileSystemProvider implements FileSystemProvider { @@ -55,6 +55,9 @@ public FileSystem create(Map properties) throws IOException { if (host == null || host.isEmpty()) { throw new IOException("BROKER_HOST is required"); } + if (portStr == null || portStr.isEmpty()) { + throw new IOException("BROKER_PORT is required"); + } int port; try { port = Integer.parseInt(portStr); diff --git a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputFile.java b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputFile.java index 5dde093c47415b..eda0362196ef7b 100644 --- a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputFile.java +++ b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputFile.java @@ -17,9 +17,9 @@ package org.apache.doris.filesystem.broker; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.Location; import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerOpenReaderRequest; import org.apache.doris.thrift.TBrokerOpenReaderResponse; diff --git a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputStream.java b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputStream.java index 2e3813a152f597..25c040b74ed1f5 100644 --- a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputStream.java +++ b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerInputStream.java @@ -17,7 +17,7 @@ package org.apache.doris.filesystem.broker; -import org.apache.doris.filesystem.spi.DorisInputStream; +import org.apache.doris.filesystem.DorisInputStream; import org.apache.doris.thrift.TBrokerCloseReaderRequest; import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerOperationStatus; @@ -56,6 +56,8 @@ class BrokerInputStream extends DorisInputStream { private int bufOffset; private int bufLen; private boolean eof; + /** True when the Thrift client has already been invalidated (e.g. on RPC failure). */ + private boolean clientInvalidated = false; BrokerInputStream(TNetworkAddress endpoint, BrokerClientPool clientPool, TPaloBrokerService.Client client, TBrokerFD fd) { @@ -141,6 +143,7 @@ private void fillBuffer() throws IOException { eof = true; } } catch (TException e) { + clientInvalidated = true; clientPool.invalidate(endpoint, client); throw new IOException("Broker pread RPC failed at offset " + position + ": " + e.getMessage(), e); } @@ -148,6 +151,10 @@ private void fillBuffer() throws IOException { @Override public void close() throws IOException { + if (clientInvalidated) { + // Client already invalidated by a prior RPC failure; broker FD will time out on the broker side. + return; + } try { TBrokerCloseReaderRequest req = new TBrokerCloseReaderRequest(TBrokerVersion.VERSION_ONE, fd); TBrokerOperationStatus opst = client.closeReader(req); diff --git a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerOutputFile.java b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerOutputFile.java index 18661c146c4d8a..b3bc318c393a51 100644 --- a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerOutputFile.java +++ b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerOutputFile.java @@ -17,8 +17,8 @@ package org.apache.doris.filesystem.broker; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.Location; import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerOpenMode; import org.apache.doris.thrift.TBrokerOpenWriterRequest; diff --git a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerSpiFileSystem.java b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerSpiFileSystem.java index dc99c7ec563ee0..1f087d3138d4fc 100644 --- a/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerSpiFileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-broker/src/main/java/org/apache/doris/filesystem/broker/BrokerSpiFileSystem.java @@ -17,12 +17,12 @@ package org.apache.doris.filesystem.broker; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import org.apache.doris.thrift.TBrokerCheckPathExistRequest; import org.apache.doris.thrift.TBrokerCheckPathExistResponse; import org.apache.doris.thrift.TBrokerDeletePathRequest; diff --git a/fe/fe-filesystem/fe-filesystem-broker/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-broker/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-broker/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-broker/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-cos/src/main/java/org/apache/doris/filesystem/cos/CosFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-cos/src/main/java/org/apache/doris/filesystem/cos/CosFileSystemProvider.java index 0bd6bebca160fa..ad60dc354bd2a5 100644 --- a/fe/fe-filesystem/fe-filesystem-cos/src/main/java/org/apache/doris/filesystem/cos/CosFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-cos/src/main/java/org/apache/doris/filesystem/cos/CosFileSystemProvider.java @@ -17,10 +17,10 @@ package org.apache.doris.filesystem.cos; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import org.apache.doris.filesystem.s3.S3FileSystem; import org.apache.doris.filesystem.s3.S3ObjStorage; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; import java.io.IOException; import java.util.HashMap; @@ -29,7 +29,7 @@ /** * SPI provider for Tencent Cloud COS. * - *

Registered via META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider. + *

Registered via META-INF/services/org.apache.doris.filesystem.FileSystemProvider. * *

Identified by an endpoint containing {@code myqcloud.com}. Translates COS-specific * property keys to S3-compatible keys and delegates to {@link S3FileSystem}. @@ -38,6 +38,9 @@ public class CosFileSystemProvider implements FileSystemProvider { @Override public boolean supports(Map properties) { + if ("COS".equals(properties.get("_STORAGE_TYPE_"))) { + return true; + } String endpoint = properties.get("COS_ENDPOINT"); if (endpoint == null) { endpoint = properties.get("AWS_ENDPOINT"); diff --git a/fe/fe-filesystem/fe-filesystem-cos/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-cos/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-cos/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-cos/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/DFSFileSystem.java b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/DFSFileSystem.java index bbc3bd45438441..9ea99fe7b16128 100644 --- a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/DFSFileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/DFSFileSystem.java @@ -17,11 +17,11 @@ package org.apache.doris.filesystem.hdfs; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileIterator; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.spi.HadoopAuthenticator; -import org.apache.doris.filesystem.spi.Location; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -39,7 +39,7 @@ * SPI FileSystem implementation for HDFS, ViewFS, JFS, OFS, and OSSHdfs. * Has zero dependency on fe-core; accepts {@code Map} configuration. */ -public class DFSFileSystem implements org.apache.doris.filesystem.spi.FileSystem { +public class DFSFileSystem implements org.apache.doris.filesystem.FileSystem { private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); @@ -122,8 +122,9 @@ public void rename(Location src, Location dst) throws IOException { @Override public FileIterator list(Location location) throws IOException { Path path = new Path(location.toString()); - FileStatus[] statuses = authenticator.doAs(() -> getHadoopFs(path).listStatus(path)); - return new HdfsFileIterator(statuses); + org.apache.hadoop.fs.RemoteIterator it = + authenticator.doAs(() -> getHadoopFs(path).listStatusIterator(path)); + return new HdfsFileIterator(it, authenticator); } @Override diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileIterator.java b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileIterator.java index b6cb6674f930b3..1ea84987e4ac0d 100644 --- a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileIterator.java +++ b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileIterator.java @@ -17,34 +17,39 @@ package org.apache.doris.filesystem.hdfs; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.Location; +import org.apache.doris.filesystem.spi.HadoopAuthenticator; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; /** - * {@link FileIterator} backed by a Hadoop {@link FileStatus} array from a single listStatus call. + * {@link FileIterator} backed by a Hadoop {@link RemoteIterator}{@code } for lazy, + * streaming directory listing. Each {@code hasNext()} / {@code next()} call is executed inside the + * provided {@link HadoopAuthenticator} context so that Kerberos tickets remain valid throughout. */ class HdfsFileIterator implements FileIterator { - private final FileStatus[] statuses; - private int index = 0; + private final RemoteIterator delegate; + private final HadoopAuthenticator authenticator; - HdfsFileIterator(FileStatus[] statuses) { - this.statuses = statuses; + HdfsFileIterator(RemoteIterator delegate, HadoopAuthenticator authenticator) { + this.delegate = delegate; + this.authenticator = authenticator; } @Override public boolean hasNext() throws IOException { - return index < statuses.length; + return authenticator.doAs(delegate::hasNext); } @Override public FileEntry next() throws IOException { - FileStatus status = statuses[index++]; + FileStatus status = authenticator.doAs(delegate::next); Location loc = Location.of(status.getPath().toString()); return new FileEntry(loc, status.getLen(), status.isDirectory(), status.getModificationTime(), null); @@ -52,6 +57,6 @@ public FileEntry next() throws IOException { @Override public void close() throws IOException { - // no-op: statuses array is already fully loaded + // RemoteIterator has no close(); listing completes on exhaustion. } } diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileSystemProvider.java index b6785d52a9ef9d..47fa6326806d34 100644 --- a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsFileSystemProvider.java @@ -17,8 +17,8 @@ package org.apache.doris.filesystem.hdfs; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import java.io.IOException; import java.util.Map; diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsInputFile.java b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsInputFile.java index de4c77e1549665..c7ad1a2bac49a7 100644 --- a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsInputFile.java +++ b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsInputFile.java @@ -17,10 +17,10 @@ package org.apache.doris.filesystem.hdfs; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.spi.HadoopAuthenticator; -import org.apache.doris.filesystem.spi.Location; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsOutputFile.java b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsOutputFile.java index 62991deaad9e4c..a4ae556b695927 100644 --- a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsOutputFile.java +++ b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsOutputFile.java @@ -17,9 +17,9 @@ package org.apache.doris.filesystem.hdfs; -import org.apache.doris.filesystem.spi.DorisOutputFile; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.spi.HadoopAuthenticator; -import org.apache.doris.filesystem.spi.Location; import org.apache.hadoop.fs.Path; diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsSeekableInputStream.java b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsSeekableInputStream.java index ce4f1a7e1322b3..178c60e95d56fa 100644 --- a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsSeekableInputStream.java +++ b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/java/org/apache/doris/filesystem/hdfs/HdfsSeekableInputStream.java @@ -17,7 +17,7 @@ package org.apache.doris.filesystem.hdfs; -import org.apache.doris.filesystem.spi.DorisInputStream; +import org.apache.doris.filesystem.DorisInputStream; import org.apache.hadoop.fs.FSDataInputStream; diff --git a/fe/fe-filesystem/fe-filesystem-hdfs/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-hdfs/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-hdfs/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-hdfs/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystem.java b/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystem.java index be69690ad77f9e..c5c42de68b5d24 100644 --- a/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystem.java @@ -17,13 +17,13 @@ package org.apache.doris.filesystem.local; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; import java.io.IOException; import java.io.OutputStream; @@ -173,16 +173,25 @@ public long getPos() throws IOException { @Override public void seek(long pos) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } raf.seek(pos); } @Override public int read() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } return raf.read(); } @Override public int read(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } return raf.read(b, off, len); } diff --git a/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystemProvider.java index 09fc4734902023..592c4af4d5d7f6 100644 --- a/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-local/src/main/java/org/apache/doris/filesystem/local/LocalFileSystemProvider.java @@ -17,8 +17,8 @@ package org.apache.doris.filesystem.local; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import java.io.IOException; import java.util.Map; diff --git a/fe/fe-filesystem/fe-filesystem-local/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-local/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-local/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-local/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-obs/src/main/java/org/apache/doris/filesystem/obs/ObsFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-obs/src/main/java/org/apache/doris/filesystem/obs/ObsFileSystemProvider.java index fd1b7e9ebc7efa..c506d72f054ef5 100644 --- a/fe/fe-filesystem/fe-filesystem-obs/src/main/java/org/apache/doris/filesystem/obs/ObsFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-obs/src/main/java/org/apache/doris/filesystem/obs/ObsFileSystemProvider.java @@ -17,10 +17,10 @@ package org.apache.doris.filesystem.obs; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import org.apache.doris.filesystem.s3.S3FileSystem; import org.apache.doris.filesystem.s3.S3ObjStorage; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; import java.io.IOException; import java.util.HashMap; @@ -29,7 +29,7 @@ /** * SPI provider for Huawei Cloud OBS. * - *

Registered via META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider. + *

Registered via META-INF/services/org.apache.doris.filesystem.FileSystemProvider. * *

Identified by an endpoint containing {@code myhuaweicloud.com}. Translates OBS-specific * property keys to S3-compatible keys and delegates to {@link S3FileSystem}. @@ -38,6 +38,9 @@ public class ObsFileSystemProvider implements FileSystemProvider { @Override public boolean supports(Map properties) { + if ("OBS".equals(properties.get("_STORAGE_TYPE_"))) { + return true; + } String endpoint = properties.get("OBS_ENDPOINT"); if (endpoint == null) { endpoint = properties.get("AWS_ENDPOINT"); diff --git a/fe/fe-filesystem/fe-filesystem-obs/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-obs/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-obs/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-obs/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-oss/src/main/java/org/apache/doris/filesystem/oss/OssFileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-oss/src/main/java/org/apache/doris/filesystem/oss/OssFileSystemProvider.java index 90854e7ae93d73..9edf8760a2b39f 100644 --- a/fe/fe-filesystem/fe-filesystem-oss/src/main/java/org/apache/doris/filesystem/oss/OssFileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-oss/src/main/java/org/apache/doris/filesystem/oss/OssFileSystemProvider.java @@ -17,10 +17,10 @@ package org.apache.doris.filesystem.oss; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import org.apache.doris.filesystem.s3.S3FileSystem; import org.apache.doris.filesystem.s3.S3ObjStorage; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; import java.io.IOException; import java.util.HashMap; @@ -29,7 +29,7 @@ /** * SPI provider for Alibaba Cloud OSS. * - *

Registered via META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider. + *

Registered via META-INF/services/org.apache.doris.filesystem.FileSystemProvider. * *

Identified by an endpoint containing {@code aliyuncs.com}. Translates OSS-specific * property keys to S3-compatible keys and delegates to {@link S3FileSystem}. @@ -38,6 +38,9 @@ public class OssFileSystemProvider implements FileSystemProvider { @Override public boolean supports(Map properties) { + if ("OSS".equals(properties.get("_STORAGE_TYPE_"))) { + return true; + } String endpoint = properties.get("OSS_ENDPOINT"); if (endpoint == null) { endpoint = properties.get("AWS_ENDPOINT"); diff --git a/fe/fe-filesystem/fe-filesystem-oss/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-oss/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-oss/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-oss/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystem.java b/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystem.java index e8cc3d2259f1a6..231ca7c96c491e 100644 --- a/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystem.java @@ -17,13 +17,13 @@ package org.apache.doris.filesystem.s3; -import org.apache.doris.filesystem.spi.DorisInputFile; -import org.apache.doris.filesystem.spi.DorisInputStream; -import org.apache.doris.filesystem.spi.DorisOutputFile; -import org.apache.doris.filesystem.spi.FileEntry; -import org.apache.doris.filesystem.spi.FileIterator; -import org.apache.doris.filesystem.spi.GlobListing; -import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.filesystem.DorisInputFile; +import org.apache.doris.filesystem.DorisInputStream; +import org.apache.doris.filesystem.DorisOutputFile; +import org.apache.doris.filesystem.FileEntry; +import org.apache.doris.filesystem.FileIterator; +import org.apache.doris.filesystem.GlobListing; +import org.apache.doris.filesystem.Location; import org.apache.doris.filesystem.spi.ObjFileSystem; import org.apache.doris.filesystem.spi.RemoteObject; import org.apache.doris.filesystem.spi.RemoteObjects; @@ -169,7 +169,7 @@ private void fetchNextPage() throws IOException { bufferIdx = 0; for (RemoteObject obj : page.getObjectList()) { Location loc = Location.of(reconstructUri(prefix, obj.getKey())); - buffer.add(new FileEntry(loc, obj.getSize(), false, obj.modificationTime(), List.of())); + buffer.add(new FileEntry(loc, obj.getSize(), false, obj.getModificationTime(), List.of())); } if (page.isTruncated()) { continuationToken = page.getContinuationToken(); @@ -399,7 +399,9 @@ public GlobListing globListWithLimit(Location path, String startAfter, long maxB List files = new ArrayList<>(); long totalSize = 0L; boolean reachLimit = false; - String currentMaxFile = ""; + // nextMatchAfterLimit: the first matching key found after the page limit was reached. + // Empty string means no such key was found yet (scanning still in progress or no more keys). + String nextMatchAfterLimit = ""; String lastMatchedKey = ""; boolean isTruncated; @@ -407,12 +409,11 @@ public GlobListing globListWithLimit(Location path, String startAfter, long maxB do { ListObjectsV2Response response = s3.getClient().listObjectsV2(request); for (S3Object obj : response.contents()) { - // Once limit reached: scan remaining objects to find the next glob-match - // so callers can determine whether more data exists. if (reachLimit) { - if (matcher.matches(Paths.get(obj.key()))) { - currentMaxFile = obj.key(); - break; + // After hitting limit: find the first matching key so callers know more data exists. + if (nextMatchAfterLimit.isEmpty() + && matcher.matches(Paths.get(obj.key()))) { + nextMatchAfterLimit = obj.key(); } continue; } @@ -433,21 +434,18 @@ public GlobListing globListWithLimit(Location path, String startAfter, long maxB if ((maxFiles > 0 && files.size() >= maxFiles) || (maxBytes > 0 && totalSize >= maxBytes)) { reachLimit = true; - break; } } - if (currentMaxFile.isEmpty()) { - currentMaxFile = lastMatchedKey; - } - isTruncated = response.isTruncated(); if (isTruncated) { request = request.toBuilder() .continuationToken(response.nextContinuationToken()) .build(); } - } while (isTruncated && !reachLimit); + // Continue paginating after limit until we find the next matching key, + // so callers can use it as a pagination cursor. + } while (isTruncated && (!reachLimit || nextMatchAfterLimit.isEmpty())); } catch (NoSuchKeyException e) { LOG.info("NoSuchKey when listing s3://{}/{}, treating as empty", bucket, listPrefix); return new GlobListing(List.of(), bucket, listPrefix, ""); @@ -455,6 +453,8 @@ public GlobListing globListWithLimit(Location path, String startAfter, long maxB throw new IOException("Failed to list S3 objects at " + uri + ": " + e.getMessage(), e); } - return new GlobListing(files, bucket, listPrefix, currentMaxFile); + // maxFile is the next matching key after the returned page (if found), or the last returned key. + String maxFile = nextMatchAfterLimit.isEmpty() ? lastMatchedKey : nextMatchAfterLimit; + return new GlobListing(files, bucket, listPrefix, maxFile); } } diff --git a/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystemProvider.java b/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystemProvider.java index 9adb8a54098f86..ad2e3074089897 100644 --- a/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystemProvider.java +++ b/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3FileSystemProvider.java @@ -17,8 +17,8 @@ package org.apache.doris.filesystem.s3; -import org.apache.doris.filesystem.spi.FileSystem; -import org.apache.doris.filesystem.spi.FileSystemProvider; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.FileSystemProvider; import java.io.IOException; import java.util.Map; @@ -26,7 +26,7 @@ /** * SPI provider for AWS S3 and S3-compatible storage (MinIO, etc.). * - *

Registered via META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider. + *

Registered via META-INF/services/org.apache.doris.filesystem.FileSystemProvider. * *

Identified by presence of AWS_ACCESS_KEY with either AWS_ENDPOINT or AWS_REGION. * S3 is intentionally the last-resort provider; cloud-specific providers (OSS, COS, OBS) diff --git a/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3ObjStorage.java b/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3ObjStorage.java index 201be37ae24e88..a6db41676f5c84 100644 --- a/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3ObjStorage.java +++ b/fe/fe-filesystem/fe-filesystem-s3/src/main/java/org/apache/doris/filesystem/s3/S3ObjStorage.java @@ -33,6 +33,7 @@ import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -129,28 +130,31 @@ public S3Client getClient() throws IOException { private S3Client buildClient() throws IOException { String endpointStr = properties.get(PROP_ENDPOINT); - if (endpointStr == null || endpointStr.isEmpty()) { - throw new IOException("S3 property " + PROP_ENDPOINT + " is required"); - } - if (!endpointStr.contains("://")) { - endpointStr = "https://" + endpointStr; - } String region = properties.getOrDefault(PROP_REGION, "us-east-1"); AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(); - return S3Client.builder() + S3ClientBuilder builder = S3Client.builder() .httpClient(UrlConnectionHttpClient.builder() .socketTimeout(Duration.ofSeconds(30)) .connectionTimeout(Duration.ofSeconds(30)) .build()) - .endpointOverride(URI.create(endpointStr)) .credentialsProvider(credentialsProvider) .region(Region.of(region)) .serviceConfiguration(S3Configuration.builder() .chunkedEncodingEnabled(false) .pathStyleAccessEnabled(usePathStyle) - .build()) - .build(); + .build()); + + // endpointOverride is only set for non-AWS endpoints (MinIO, COS, OSS, etc.). + // Standard AWS S3 access uses region-only routing without an explicit endpoint. + if (endpointStr != null && !endpointStr.isEmpty()) { + if (!endpointStr.contains("://")) { + endpointStr = "https://" + endpointStr; + } + builder.endpointOverride(URI.create(endpointStr)); + } + + return builder.build(); } private AwsCredentialsProvider buildCredentialsProvider() { diff --git a/fe/fe-filesystem/fe-filesystem-s3/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider b/fe/fe-filesystem/fe-filesystem-s3/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider similarity index 100% rename from fe/fe-filesystem/fe-filesystem-s3/src/main/resources/META-INF/services/org.apache.doris.filesystem.spi.FileSystemProvider rename to fe/fe-filesystem/fe-filesystem-s3/src/main/resources/META-INF/services/org.apache.doris.filesystem.FileSystemProvider diff --git a/fe/fe-filesystem/fe-filesystem-spi/pom.xml b/fe/fe-filesystem/fe-filesystem-spi/pom.xml index 84af3b6fedc145..9a9ea3c49b921b 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/pom.xml +++ b/fe/fe-filesystem/fe-filesystem-spi/pom.xml @@ -39,6 +39,13 @@ under the License. + + + ${project.groupId} + fe-filesystem-api + ${project.version} + diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/ObjFileSystem.java b/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/ObjFileSystem.java index 09210b25bcc575..951b8ae9dc889d 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/ObjFileSystem.java +++ b/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/ObjFileSystem.java @@ -17,6 +17,9 @@ package org.apache.doris.filesystem.spi; +import org.apache.doris.filesystem.FileSystem; +import org.apache.doris.filesystem.Location; + import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -44,7 +47,7 @@ public ObjStorage getObjStorage() { @Override public boolean exists(Location location) throws IOException { try { - objStorage.headObject(location.withoutScheme()); + objStorage.headObject(location.uri()); return true; } catch (IOException e) { if (isNotFoundError(e)) { diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteObject.java b/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteObject.java index 6f05392fde663f..c0e78b368560cf 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteObject.java +++ b/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RemoteObject.java @@ -53,7 +53,7 @@ public long getSize() { } /** Last-modified time in milliseconds since epoch. 0 if not available. */ - public long modificationTime() { + public long getModificationTime() { return modificationTime; } } diff --git a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RequestBody.java b/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RequestBody.java index 71232d209e3ef0..fe407969f50cf5 100644 --- a/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RequestBody.java +++ b/fe/fe-filesystem/fe-filesystem-spi/src/main/java/org/apache/doris/filesystem/spi/RequestBody.java @@ -17,7 +17,6 @@ package org.apache.doris.filesystem.spi; -import java.io.IOException; import java.io.InputStream; /** @@ -34,7 +33,7 @@ public RequestBody(InputStream content, long contentLength) { this.contentLength = contentLength; } - public InputStream content() throws IOException { + public InputStream content() { return content; } diff --git a/fe/fe-filesystem/pom.xml b/fe/fe-filesystem/pom.xml index 12763a0b424d64..77f95533a9133e 100644 --- a/fe/fe-filesystem/pom.xml +++ b/fe/fe-filesystem/pom.xml @@ -38,6 +38,7 @@ under the License. + fe-filesystem-api fe-filesystem-spi fe-filesystem-s3 fe-filesystem-oss