diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java new file mode 100644 index 0000000000000..5f79e6393d6d4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.shell; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BulkDeleteCommand extends FsCommand { + + public static void registerCommands(CommandFactory factory) { + factory.addClass(BulkDeleteCommand.class, "-bulkDelete"); + } + + private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteCommand.class.getName()); + + public static final String NAME = "bulkDelete"; + + /** + * File Name parameter to be specified at command line. + */ + public static final String READ_FROM_FILE = "readFromFile"; + + /** + * Page size parameter specified at command line. + */ + public static final String PAGE_SIZE = "pageSize"; + + + public static final String USAGE = "-[ " + READ_FROM_FILE + "] [] [" + PAGE_SIZE + + "] [] [ ]"; + + public static final String DESCRIPTION = "Deletes the set of files under the given .\n" + + "If a list of paths is provided at command line then the paths are deleted directly.\n" + + "User can also point to the file where the paths are listed as full object names using the \"fileName\"" + + "parameter. The presence of a file name takes precedence over the list of objects.\n" + + "Page size refers to the size of each bulk delete batch." + + "Users can specify the page size using \"pageSize\" command parameter." + + "Default value is 1.\n"; + + private String fileName; + + private int pageSize; + + /** + * Making the class stateful as the PathData initialization for all args is not needed. + */ + LinkedList childArgs; + + protected BulkDeleteCommand() { + this.childArgs = new LinkedList<>(); + } + + protected BulkDeleteCommand(Configuration conf) { + super(conf); + this.childArgs = new LinkedList<>(); + this.pageSize = 1; + } + + /** + * Processes the command line options and initialize the variables. + * + * @param args the command line arguments + * @throws IOException in case of wrong arguments passed + */ + @Override + protected void processOptions(LinkedList args) throws IOException { + CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE); + cf.addOptionWithValue(READ_FROM_FILE); + cf.addOptionWithValue(PAGE_SIZE); + cf.parse(args); + fileName = cf.getOptValue(READ_FROM_FILE); + if (cf.getOptValue(PAGE_SIZE) != null) { + pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE)); + } else { + pageSize = 1; + } + } + + /** + * Processes the command line arguments and stores the child arguments in a list. + * + * @param args strings to expand into {@link PathData} objects + * @return the base path of the bulk delete command. + * @throws IOException if the wrong number of arguments specified + */ + @Override + protected LinkedList expandArguments(LinkedList args) throws IOException { + if (fileName == null && args.size() < 2) { + throw new IOException("Invalid Number of Arguments. Expected :" + USAGE); + } + LinkedList pathData = new LinkedList<>(); + pathData.add(new PathData(args.get(0), getConf())); + args.remove(0); + this.childArgs = args; + return pathData; + } + + /** + * Deletes the objects using the bulk delete api. + * + * @param bulkDelete Bulkdelete object exposing the API + * @param paths list of paths to be deleted in the base path + * @throws IOException on error in execution of the delete command + */ + void deleteInBatches(BulkDelete bulkDelete, List paths) throws IOException { + Batch batches = new Batch<>(paths, pageSize); + while (batches.hasNext()) { + try { + List> result = bulkDelete.bulkDelete(batches.next()); + if(!result.isEmpty()) { + LOG.warn("Number of failed deletions:{}", result.size()); + for(Map.Entry singleResult: result) { + LOG.info("{}: {}", singleResult.getKey(), singleResult.getValue()); + } + } + } catch (IllegalArgumentException e) { + throw new IOException("Exception while deleting: ", e); + } + } + } + + @Override + protected void processArguments(LinkedList args) throws IOException { + PathData basePath = args.get(0); + LOG.info("Deleting files under:{}", basePath); + List pathList = new ArrayList<>(); + if (fileName != null) { + LOG.info("Reading from file:{}", fileName); + FileSystem localFile = FileSystem.get(getConf()); + BufferedReader br = new BufferedReader(new InputStreamReader( + localFile.open(new Path(fileName)), StandardCharsets.UTF_8)); + String line; + while ((line = br.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("#")) { + pathList.add(new Path(line)); + } + } + br.close(); + } else { + pathList.addAll(childArgs.stream(). + map(Path::new). + collect(Collectors.toList())); + } + LOG.debug("Deleting:{}", pathList); + BulkDelete bulkDelete = basePath.fs.createBulkDelete(basePath.path); + deleteInBatches(bulkDelete, pathList); + } + + /** + * Batch class for deleting files in batches, once initialized the inner list can't be modified. + * + * @param template type for batches + */ + private static class Batch { + private final List data; + private final int batchSize; + private int currentLocation; + + Batch(List data, int batchSize) { + this.data = Collections.unmodifiableList(data); + this.batchSize = batchSize; + this.currentLocation = 0; + } + + /** + * @return If there is a next batch present + */ + boolean hasNext() { + return currentLocation < data.size(); + } + + /** + * @return Compute and return a new batch + */ + List next() { + List ret = new ArrayList<>(); + int i = 0; + while (i < batchSize && currentLocation < data.size()) { + ret.add(data.get(currentLocation)); + i++; + currentLocation++; + } + return ret; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java index 0abf0e3c587ad..32889e01fa489 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java @@ -51,6 +51,7 @@ abstract public class FsCommand extends Command { */ public static void registerCommands(CommandFactory factory) { factory.registerCommands(AclCommands.class); + factory.registerCommands(BulkDeleteCommand.class); factory.registerCommands(CopyCommands.class); factory.registerCommands(Count.class); factory.registerCommands(Delete.class); diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md index 14048da43a348..dd65b6f5673fa 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md @@ -72,6 +72,18 @@ public interface BulkDelete extends IOStatisticsSource, Closeable { } ``` +### Shell Command `hadoop fs -bulkdelete base_path` + +This is the command line implementation of the bulkdelete API, the user can specify a base path and the underlying +paths for deletion. The list of paths can also be read from a file with each line containing +a separate path for deletion. + +```angular2html +hadoop fs -bulkdelete ..... +hadoop fs -bulkdelete -pageSize 10 ..... +hadoop fs -bulkdelete -readFromFile +``` + ### `bulkDelete(paths)` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java new file mode 100644 index 0000000000000..0f6b70d0e0463 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.shell; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.test.GenericTestUtils; +import org.assertj.core.api.Assertions; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +public class TestBulkDeleteCommand extends AbstractHadoopTestBase { + private static Configuration conf; + private static FsShell shell; + private static LocalFileSystem lfs; + private static Path testRootDir; + + @BeforeClass + public static void setup() throws IOException { + conf = new Configuration(); + shell = new FsShell(conf); + lfs = FileSystem.getLocal(conf); + testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath( + "testFsShellBulkDelete"))); + lfs.delete(testRootDir, true); + lfs.mkdirs(testRootDir); + lfs.setWorkingDirectory(testRootDir); + } + + @Test + public void testDefaults() throws IOException { + LinkedList options = new LinkedList<>(); + BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf); + bulkDeleteCommand.processOptions(options); + Assertions.assertThat(bulkDeleteCommand.childArgs). + describedAs("Children arguments should be empty").isEmpty(); + } + + @Test + public void testArguments() throws IOException, URISyntaxException { + BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf); + LinkedList arguments = new LinkedList<>(); + String arg1 = "file:///file/name/1"; + String arg2 = "file:///file/name/1/2"; + arguments.add(arg1); + arguments.add(arg2); + LinkedList pathData = bulkDeleteCommand.expandArguments(arguments); + Assertions.assertThat(pathData). + describedAs("Only one root path must be present").hasSize(1); + Assertions.assertThat(pathData.get(0).path.toUri().getPath()). + describedAs("Base path of the command should match").isEqualTo(new URI(arg1).getPath()); + Assertions.assertThat(bulkDeleteCommand.childArgs). + describedAs("Only one other argument was passed to the command"). + hasSize(1); + Assertions.assertThat(bulkDeleteCommand.childArgs.get(0)). + describedAs("Children arguments must match").isEqualTo(arg2); + } + + @Test + public void testWrongArguments() throws Exception { + BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf); + LinkedList arguments = new LinkedList<>(); + String arg1 = "file:///file/name/1"; + arguments.add(arg1); + intercept(IOException.class, () -> bulkDeleteCommand.expandArguments(arguments)); + } + + @Test + public void testLocalFileDeletion() throws IOException { + String deletionDir = "toDelete"; + String baseFileName = "file_"; + Path baseDir = new Path(testRootDir, deletionDir); + List listOfPaths = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Path p = new Path(baseDir, baseFileName + i); + ContractTestUtils.touch(lfs, p); + ContractTestUtils.assertIsFile(lfs, p); + } + RemoteIterator remoteIterator = lfs.listFiles(baseDir, false); + while (remoteIterator.hasNext()) { + listOfPaths.add(remoteIterator.next().getPath().toUri().toString()); + } + List finalCommandList = new ArrayList<>(); + finalCommandList.add("-bulkDelete"); + finalCommandList.add(baseDir.toUri().toString()); + finalCommandList.addAll(listOfPaths); + shell.run(finalCommandList.toArray(new String[0])); + Assertions.assertThat(lfs.listFiles(baseDir, false).hasNext()) + .as("All the files should have been deleted under the path:" + + baseDir).isEqualTo(false); + + } + + @Test + public void testLocalFileDeletionWithFileName() throws IOException { + String deletionDir = "toDelete"; + String baseFileName = "file_"; + Path baseDir = new Path(testRootDir, deletionDir); + Path fileWithDeletePaths = new Path(testRootDir, "fileWithDeletePaths"); + FSDataOutputStream fsDataOutputStream = lfs.create(fileWithDeletePaths, true); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream)); + for (int i = 0; i < 100; i++) { + Path p = new Path(baseDir, baseFileName + i); + ContractTestUtils.touch(lfs, p); + ContractTestUtils.assertIsFile(lfs, p); + br.write(p.toUri().toString()); + br.newLine(); + } + br.flush(); // flush the file to write the contents + br.close(); // close the writer + List finalCommandList = new ArrayList<>(); + finalCommandList.add("-bulkDelete"); + finalCommandList.add("-readFromFile"); + finalCommandList.add(fileWithDeletePaths.toUri().toString()); + finalCommandList.add(baseDir.toUri().toString()); + shell.run(finalCommandList.toArray(new String[0])); + Assertions.assertThat(lfs.listFiles(baseDir, false).hasNext()) + .as("All the files should have been deleted under the path:" + + baseDir).isEqualTo(false); + + } + + @Test + public void testWrongArgumentsWithNonChildFile() throws Exception { + BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf); + LinkedList arguments = new LinkedList<>(); + String arg1 = "file:///file/name/1"; + String arg2 = "file:///file/name"; + arguments.add(arg1); + arguments.add(arg2); + LinkedList pathData = bulkDeleteCommand.expandArguments(arguments); + intercept(IOException.class, () -> bulkDeleteCommand.processArguments(pathData)); + } +}