diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 8b253c6638..c885cca95f 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -32,7 +32,6 @@ import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler; import org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler; import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler; -import org.apache.uniffle.storage.handler.impl.UploadedHdfsClientReadHandler; import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest; import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest; import org.apache.uniffle.storage.util.StorageType; diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/UploadedHdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/UploadedHdfsClientReadHandler.java deleted file mode 100644 index 2094f1f548..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/UploadedHdfsClientReadHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.handler.impl; - -import java.io.IOException; -import java.util.Comparator; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.roaringbitmap.longlong.Roaring64NavigableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.common.ShuffleDataResult; -import org.apache.uniffle.common.util.Constants; -import org.apache.uniffle.storage.util.ShuffleStorageUtils; - -public class UploadedHdfsClientReadHandler extends HdfsClientReadHandler { - - private static final Logger LOG = LoggerFactory.getLogger(UploadedHdfsClientReadHandler.class); - - public UploadedHdfsClientReadHandler( - String appId, - int shuffleId, - int partitionId, - int indexReadLimit, - int partitionNumPerRange, - int partitionNum, - int readBufferSize, - Roaring64NavigableMap expectBlockIds, - Roaring64NavigableMap processBlockIds, - String storageBasePath, - Configuration hadoopConf) { - super(appId, - shuffleId, - partitionId, - indexReadLimit, - partitionNumPerRange, - partitionNum, - readBufferSize, - expectBlockIds, - processBlockIds, - storageBasePath, - hadoopConf); - } - - @Override - protected void init(String fullShufflePath) { - FileSystem fs; - Path baseFolder = new Path(fullShufflePath); - try { - fs = ShuffleStorageUtils.getFileSystemForPath(baseFolder, hadoopConf); - } catch (IOException ioe) { - LOG.warn("Can't get FileSystem for {}", baseFolder); - return; - } - FileStatus[] indexFiles; - String failedGetIndexFileMsg = "Can't list index file in " + baseFolder; - - try { - indexFiles = fs.listStatus(baseFolder, - file -> file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX)); - } catch (Exception e) { - LOG.warn(failedGetIndexFileMsg, e); - return; - } - - if (indexFiles != null && indexFiles.length != 0) { - for (FileStatus status : indexFiles) { - LOG.info("Find index file for shuffleId[" + shuffleId + "], partitionId[" - + partitionId + "] " + status.getPath()); - String fileNamePrefix = getFileNamePrefix(status.getPath().toUri().toString()); - try { - HdfsShuffleReadHandler handler = new UploadedStorageHdfsShuffleReadHandler( - appId, shuffleId, partitionId, fileNamePrefix, readBufferSize, - expectBlockIds, processBlockIds, hadoopConf); - readHandlers.add(handler); - } catch (Exception e) { - LOG.warn("Can't create ShuffleReaderHandler for " + fileNamePrefix, e); - } - } - readHandlers.sort(Comparator.comparing(HdfsShuffleReadHandler::getFilePrefix)); - } - } - - @Override - public ShuffleDataResult readShuffleData() { - // init lazily like LocalFileClientRead - if (readHandlers.isEmpty()) { - String fullShufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, - ShuffleStorageUtils.getUploadShuffleDataPath(appId, shuffleId, partitionId)); - init(fullShufflePath); - String combinePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, - ShuffleStorageUtils.getCombineDataPath(appId, shuffleId)); - init(combinePath); - } - return super.readShuffleData(); - } -} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/UploadedStorageHdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/UploadedStorageHdfsShuffleReadHandler.java deleted file mode 100644 index 34ef0e99ac..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/UploadedStorageHdfsShuffleReadHandler.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.handler.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.hadoop.conf.Configuration; -import org.roaringbitmap.longlong.Roaring64NavigableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.common.ShuffleIndexResult; - -public class UploadedStorageHdfsShuffleReadHandler extends HdfsShuffleReadHandler { - - private static final Logger LOG = LoggerFactory.getLogger(UploadedStorageHdfsShuffleReadHandler.class); - - private final int partitionId; - private long dataFileOffset; - - public UploadedStorageHdfsShuffleReadHandler( - String appId, - int shuffleId, - int partitionId, - String filePrefix, - int readBufferSize, - Roaring64NavigableMap expectBlockIds, - Roaring64NavigableMap processBlockIds, - Configuration conf) throws IOException { - super(appId, shuffleId, partitionId, filePrefix, readBufferSize, expectBlockIds, processBlockIds, conf); - this.partitionId = partitionId; - } - - @Override - protected byte[] readShuffleData(long offset, int expectedLength) { - byte[] data = dataReader.read(dataFileOffset + offset, expectedLength); - if (data.length != expectedLength) { - LOG.warn("Fail to read expected[{}] data, actual[{}] from file {}.data", - expectedLength, data.length, filePrefix); - return new byte[0]; - } - return data; - } - - @Override - protected ShuffleIndexResult readShuffleIndex() { - long start = System.currentTimeMillis(); - try { - byte[] indexData = indexReader.read(); - - ByteBuffer byteBuffer = ByteBuffer.wrap(indexData); - ShuffleIndexHeader shuffleIndexHeader = ShuffleIndexHeader.extractHeader(byteBuffer); - if (shuffleIndexHeader == null) { - LOG.error("Fail to read index from {}.index", filePrefix); - return new ShuffleIndexResult(); - } - - int indexFileOffset = shuffleIndexHeader.getHeaderLen(); - int indexPartitionLen = 0; - long dataFileOffset = 0; - for (ShuffleIndexHeader.Entry entry : shuffleIndexHeader.getIndexes()) { - int partitionId = entry.getPartitionId(); - indexPartitionLen = (int) entry.getPartitionIndexLength(); - if (partitionId != this.partitionId) { - indexFileOffset += entry.getPartitionIndexLength(); - dataFileOffset += entry.getPartitionDataLength(); - continue; - } - - if ((indexFileOffset + indexPartitionLen) > indexData.length) { - LOG.error("Index of partition {} is invalid, offset = {}, length = {} in {}.index", - partitionId, indexFileOffset, indexPartitionLen, filePrefix); - } - - LOG.info("Read index files {}.index for {} ms", filePrefix, System.currentTimeMillis() - start); - this.dataFileOffset = dataFileOffset; - return new ShuffleIndexResult( - Arrays.copyOfRange(indexData, indexFileOffset, indexFileOffset + indexPartitionLen)); - } - } catch (Exception e) { - LOG.info("Fail to read index files {}.index", filePrefix); - } - return new ShuffleIndexResult(); - } -} diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleUploadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleUploadHandlerRequest.java deleted file mode 100644 index a8d03abe1c..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleUploadHandlerRequest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.request; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.uniffle.storage.util.StorageType; - -/** - * CreateShuffleUploadHandlerRequest is used to hold the parameters to create remote storage for shuffle uploader. - * For now only HDFS is supported, but COS, OZONE and other storage system will be supported in the future. - */ -public class CreateShuffleUploadHandlerRequest { - private final StorageType remoteStorageType; - private final String shuffleKey; - private final String remoteStorageBasePath; - private final String hdfsFilePrefix; - private final Configuration hadoopConf; - private final int bufferSize; - private final boolean combineUpload; - - public StorageType getRemoteStorageType() { - return remoteStorageType; - } - - public String getShuffleKey() { - return shuffleKey; - } - - public String getRemoteStorageBasePath() { - return remoteStorageBasePath; - } - - public Configuration getHadoopConf() { - return hadoopConf; - } - - public String getHdfsFilePrefix() { - return hdfsFilePrefix; - } - - public int getBufferSize() { - return bufferSize; - } - - public boolean getCombineUpload() { - return combineUpload; - } - - public static class Builder { - private StorageType remoteStorageType; - private String shuffleKey; - private String remoteStorageBasePath; - private String hdfsFilePrefix; - private Configuration hadoopConf; - private int bufferSize; - private boolean combineUpload; - - public Builder() { - // use HDFS by default, we may use COS, OZONE in the future - this.remoteStorageType = StorageType.HDFS; - this.bufferSize = 4096; - this.combineUpload = true; - } - - public Builder remoteStorageType(StorageType remoteStorageType) { - this.remoteStorageType = remoteStorageType; - return this; - } - - public Builder shuffleKey(String shuffleKey) { - this.shuffleKey = shuffleKey; - return this; - } - - public Builder remoteStorageBasePath(String remoteStorageBasePath) { - this.remoteStorageBasePath = remoteStorageBasePath; - return this; - } - - public Builder hdfsFilePrefix(String hdfsFilePrefix) { - this.hdfsFilePrefix = hdfsFilePrefix; - return this; - } - - public Builder hadoopConf(Configuration hadoopConf) { - this.hadoopConf = hadoopConf; - return this; - } - - public Builder bufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public Builder combineUpload(boolean combineUpload) { - this.combineUpload = combineUpload; - return this; - } - - public CreateShuffleUploadHandlerRequest build() throws IllegalArgumentException { - validate(); - return new CreateShuffleUploadHandlerRequest(this); - } - - private void validate() throws IllegalArgumentException { - if (remoteStorageType == null) { - throw new IllegalArgumentException("Remote storage type must be set"); - } - - // We only support HDFS at present, so only check HDFS related parameters - if (remoteStorageType == StorageType.HDFS) { - if (remoteStorageBasePath == null || remoteStorageBasePath.isEmpty()) { - throw new IllegalArgumentException("Remote storage path must be set"); - } - - if (hdfsFilePrefix == null || hdfsFilePrefix.isEmpty()) { - throw new IllegalArgumentException("File prefix must be set"); - } - - if (hadoopConf == null) { - throw new IllegalArgumentException("Hadoop conf must be set"); - } - - if (bufferSize <= 1024) { - throw new IllegalArgumentException("Buffer size must be larger than 1K"); - } - } - } - - } - - private CreateShuffleUploadHandlerRequest(Builder builder) { - this.remoteStorageType = builder.remoteStorageType; - this.shuffleKey = builder.shuffleKey; - this.remoteStorageBasePath = builder.remoteStorageBasePath; - this.hdfsFilePrefix = builder.hdfsFilePrefix; - this.hadoopConf = builder.hadoopConf; - this.bufferSize = builder.bufferSize; - this.combineUpload = builder.combineUpload; - } -} diff --git a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleUploadResult.java b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleUploadResult.java deleted file mode 100644 index 00162d8886..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleUploadResult.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.util; - -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -public class ShuffleUploadResult { - private String shuffleKey; - private long size; - private List partitions; - - public ShuffleUploadResult() { - this.size = 0; - partitions = null; - } - - public ShuffleUploadResult(long size, List partitions) { - this.size = size; - this.partitions = partitions; - } - - public String getShuffleKey() { - return shuffleKey; - } - - public void setShuffleKey(String shuffleKey) { - this.shuffleKey = shuffleKey; - } - - public long getSize() { - return size; - } - - public void setSize(long size) { - this.size = size; - } - - public List getPartitions() { - return partitions; - } - - public void setPartitions(List partitions) { - this.partitions = partitions; - } - - public static ShuffleUploadResult merge(List results) { - if (results == null || results.isEmpty()) { - return null; - } - - long size = results.stream().map(ShuffleUploadResult::getSize).reduce(0L, Long::sum); - List partitions = results - .stream() - .map(ShuffleUploadResult::getPartitions) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - return new ShuffleUploadResult(size, partitions); - } - -}