Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.FileSystemType;
import org.apache.doris.fsv2.FileSystemType;
import org.apache.doris.thrift.TFileType;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -453,7 +453,7 @@ public FileSystemType getFileSystemType() {
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
case VIEWFS:
case GFS:
fsType = FileSystemType.DFS;
fsType = FileSystemType.HDFS;
break;
case JFS:
fsType = FileSystemType.JFS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fsv2.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;

import com.github.benmanes.caffeine.cache.CacheLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fsv2.FileSystem;
import org.apache.doris.fsv2.remote.RemoteFile;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.FileSystemProviderImpl;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.fsv2.FileSystemProvider;
import org.apache.doris.fsv2.FileSystemProviderImpl;
import org.apache.doris.fsv2.remote.dfs.DFSFileSystem;
import org.apache.doris.transaction.TransactionManagerFactory;

import com.google.common.annotations.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.systable.SupportedSysTables;
import org.apache.doris.datasource.systable.SysTable;
import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.fsv2.FileSystemDirectoryLister;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.FileSystemUtil;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.fs.remote.SwitchingFileSystem;
import org.apache.doris.fsv2.FileSystem;
import org.apache.doris.fsv2.FileSystemProvider;
import org.apache.doris.fsv2.FileSystemUtil;
import org.apache.doris.fsv2.remote.RemoteFile;
import org.apache.doris.fsv2.remote.S3FileSystem;
import org.apache.doris.fsv2.remote.SwitchingFileSystem;
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.fs.FileSystemIOException;
import org.apache.doris.fs.RemoteIterator;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.fsv2.DirectoryLister;
import org.apache.doris.fsv2.FileSystemCache;
import org.apache.doris.fsv2.FileSystemDirectoryLister;
import org.apache.doris.fsv2.FileSystemIOException;
import org.apache.doris.fsv2.RemoteIterator;
import org.apache.doris.fsv2.remote.RemoteFile;
import org.apache.doris.fsv2.remote.RemoteFileSystem;
import org.apache.doris.fsv2.remote.dfs.DFSFileSystem;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fsv2.remote.BrokerFileSystem;
import org.apache.doris.fsv2.remote.RemoteFileSystem;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.fsv2.DirectoryLister;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.fsv2.DirectoryLister;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.doris.fs.remote;

import org.apache.doris.backup.Status;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.FileSystemCache;

import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +34,7 @@ public class SwitchingFileSystem implements FileSystem {
private final Map<String, String> properties;

public SwitchingFileSystem(ExternalMetaCacheMgr extMetaCacheMgr, String bindBrokerName,
Map<String, String> properties) {
Map<String, String> properties) {
this.extMetaCacheMgr = extMetaCacheMgr;
this.bindBrokerName = bindBrokerName;
this.properties = properties;
Expand Down Expand Up @@ -123,10 +121,16 @@ public Status listDirectories(String remotePath, Set<String> result) {
}

public FileSystem fileSystem(String location) {
return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
// todo: This method is currently unused.
// LocationPath has already been adapted to the new V2 logic.
// We’re keeping this code commented out for now, but it will be fully removed once
// V2 is finalized and fully adopted.
/* return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, properties,
bindBrokerName), properties, bindBrokerName));
bindBrokerName), properties, bindBrokerName));*/
//
return null;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java
// and modified by Doris

package org.apache.doris.fsv2;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.fsv2.remote.RemoteFile;

public interface DirectoryLister {
RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location)
throws FileSystemIOException;
}
117 changes: 117 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/fsv2/FileSystemCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.fsv2;

import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.fsv2.remote.RemoteFileSystem;

import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.hadoop.conf.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;

public class FileSystemCache {

private final LoadingCache<FileSystemCacheKey, RemoteFileSystem> fileSystemCache;

public FileSystemCache() {
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
CacheFactory fsCacheFactory = new CacheFactory(
OptionalLong.of(86400L),
OptionalLong.empty(),
Config.max_remote_file_system_cache_num,
false,
null);
fileSystemCache = fsCacheFactory.buildCache(this::loadFileSystem);
}

private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) throws UserException {
return FileSystemFactory.get(key.type, key.getFsProperties(), key.bindBrokerName);
}

public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
return fileSystemCache.get(key);
}

public static class FileSystemCacheKey {
private final FileSystemType type;
// eg: hdfs://nameservices1
private final String fsIdent;
private final Map<String, String> properties;
private final String bindBrokerName;
// only for creating new file system
private final Configuration conf;

public FileSystemCacheKey(Pair<FileSystemType, String> fs,
Map<String, String> properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using a raw map?

String bindBrokerName,
Configuration conf) {
this.type = fs.first;
this.fsIdent = fs.second;
this.properties = properties;
this.bindBrokerName = bindBrokerName;
this.conf = conf;
}

public FileSystemCacheKey(Pair<FileSystemType, String> fs,
Map<String, String> properties, String bindBrokerName) {
this(fs, properties, bindBrokerName, null);
}

public Map<String, String> getFsProperties() {
if (conf == null) {
return properties;
}
Map<String, String> result = new HashMap<>();
conf.iterator().forEachRemaining(e -> result.put(e.getKey(), e.getValue()));
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
FileSystemCacheKey o = (FileSystemCacheKey) obj;
boolean equalsWithoutBroker = type.equals(o.type)
&& fsIdent.equals(o.fsIdent)
&& properties.equals(o.properties);
if (bindBrokerName == null) {
return equalsWithoutBroker && o.bindBrokerName == null;
}
return equalsWithoutBroker && bindBrokerName.equals(o.bindBrokerName);
}

@Override
public int hashCode() {
if (bindBrokerName == null) {
return Objects.hash(properties, fsIdent, type);
}
return Objects.hash(properties, fsIdent, type, bindBrokerName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.fsv2;

import org.apache.doris.backup.Status;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.fsv2.remote.RemoteFile;

import java.util.ArrayList;
import java.util.List;

public class FileSystemDirectoryLister implements DirectoryLister {
public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location)
throws FileSystemIOException {
List<RemoteFile> result = new ArrayList<>();
Status status = fs.listFiles(location, recursive, result);
if (!status.ok()) {
throw new FileSystemIOException(status.getErrCode(), status.getErrMsg());
}
return new RemoteFileRemoteIterator(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.fsv2.remote.BrokerFileSystem;
import org.apache.doris.fsv2.remote.RemoteFileSystem;

import java.util.List;
import java.util.Map;

public class FileSystemFactory {
Expand Down Expand Up @@ -52,6 +53,22 @@ public static RemoteFileSystem get(String name, Map<String, String> properties)
return new BrokerFileSystem(name, properties);
}

public static RemoteFileSystem get(FileSystemType fileSystemType, Map<String, String> properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this class, we should only keep get(StorageProperties storageProperties), and remove all other get method.

String bindBrokerName)
throws UserException {
if (fileSystemType == FileSystemType.BROKER) {
return new BrokerFileSystem(bindBrokerName, properties);
}
List<StorageProperties> storagePropertiesList = StorageProperties.createAll(properties);

for (StorageProperties storageProperties : storagePropertiesList) {
if (storageProperties.getStorageName().equalsIgnoreCase(fileSystemType.name())) {
return StorageTypeMapper.create(storageProperties);
}
}
throw new RuntimeException("Unsupported file system type: " + fileSystemType);
}

public static RemoteFileSystem get(BrokerDesc brokerDesc) {
if (null != brokerDesc.getStorageProperties()) {
return get(brokerDesc.getStorageProperties());
Expand Down
Loading
Loading