Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ public enum Property {
"The listening port for the garbage collector's monitor service", "1.3.5"),
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
"The number of threads used to delete RFiles and write-ahead logs", "1.3.5"),
@Deprecated(since = "2.1.1", forRemoval = true)
GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN,
"Do not use the Trash, even if it is configured.", "1.5.0"),
@Deprecated(since = "2.1.0", forRemoval = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public MiniAccumuloClusterImpl(MiniAccumuloConfigImpl config) throws IOException
conf.set("dfs.support.append", "true");
conf.set("dfs.datanode.synconclose", "true");
conf.set("dfs.datanode.data.dir.perm", MiniDFSUtil.computeDatanodeDirectoryPermission());
config.getHadoopConfOverrides().forEach((k, v) -> conf.set(k, v));
String oldTestBuildData = System.setProperty("test.build.data", dfs.getAbsolutePath());
miniDFS.set(new MiniDFSCluster.Builder(conf).build());
if (oldTestBuildData == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class MiniAccumuloConfigImpl {

private File dir = null;
private String rootPassword = null;
private Map<String,String> hadoopConfOverrides = new HashMap<>();
private Map<String,String> siteConfig = new HashMap<>();
private Map<String,String> configuredSiteConig = new HashMap<>();
private Map<String,String> clientProps = new HashMap<>();
Expand Down Expand Up @@ -836,4 +837,24 @@ public void preStartConfigUpdate() {
this.preStartConfigProcessor.accept(this);
}
}

/**
* Add server-side Hadoop configuration properties
*
* @param overrides properties
* @since 2.1.1
*/
public void setHadoopConfOverrides(Map<String,String> overrides) {
hadoopConfOverrides.putAll(overrides);
}

/**
* Get server-side Hadoop configuration properties
*
* @return map of properties set in prior call to {@link #setHadoopConfOverrides(Map)}
* @since 2.1.1
*/
public Map<String,String> getHadoopConfOverrides() {
return new HashMap<>(hadoopConfOverrides);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.accumulo.core.volume.VolumeImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -354,7 +355,10 @@ public void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String pool
@Override
public boolean moveToTrash(Path path) throws IOException {
FileSystem fs = getFileSystemByPath(path);
String key = CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
log.trace("{}: {}", key, fs.getConf().get(key));
Trash trash = new Trash(fs, fs.getConf());
log.trace("Hadoop Trash is enabled for {}: {}", path, trash.isEnabled());
return trash.moveToTrash(path);
}

Expand Down
10 changes: 8 additions & 2 deletions server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,15 @@ boolean inSafeMode() {
boolean moveToTrash(Path path) throws IOException {
final VolumeManager fs = context.getVolumeManager();
if (!isUsingTrash()) {
log.trace("Accumulo Trash is disabled. Skipped for {}", path);
return false;
}
try {
return fs.moveToTrash(path);
boolean success = fs.moveToTrash(path);
log.trace("Accumulo Trash enabled, moving to trash succeeded?: {}", success);
return success;
} catch (FileNotFoundException ex) {
log.error("Error moving {} to trash", path, ex);
return false;
}
}
Expand All @@ -478,7 +482,9 @@ boolean moveToTrash(Path path) throws IOException {
* @return true if trash is used
*/
boolean isUsingTrash() {
return !config.getBoolean(Property.GC_TRASH_IGNORE);
@SuppressWarnings("removal")
Property p = Property.GC_TRASH_IGNORE;
return !config.getBoolean(p);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ long getStartDelay() {
* @return true if trash is used
*/
boolean isUsingTrash() {
return !getConfiguration().getBoolean(Property.GC_TRASH_IGNORE);
@SuppressWarnings("removal")
Property p = Property.GC_TRASH_IGNORE;
return !getConfiguration().getBoolean(p);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class SimpleGarbageCollectorTest {
private SimpleGarbageCollector gc;
private ConfigurationCopy systemConfig;
private static SiteConfiguration siteConfig = SiteConfiguration.empty().build();
@SuppressWarnings("removal")
private final Property GC_TRASH_IGNORE = Property.GC_TRASH_IGNORE;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -97,7 +99,7 @@ private ConfigurationCopy createSystemConfig() {
conf.put(Property.GC_CYCLE_START.getKey(), "1");
conf.put(Property.GC_CYCLE_DELAY.getKey(), "20");
conf.put(Property.GC_DELETE_THREADS.getKey(), "2");
conf.put(Property.GC_TRASH_IGNORE.getKey(), "false");
conf.put(GC_TRASH_IGNORE.getKey(), "false");

return new ConfigurationCopy(conf);
}
Expand Down Expand Up @@ -132,7 +134,7 @@ public void testMoveToTrash_UsingTrash_VolMgrFailure() throws Exception {

@Test
public void testMoveToTrash_NotUsingTrash() throws Exception {
systemConfig.set(Property.GC_TRASH_IGNORE.getKey(), "true");
systemConfig.set(GC_TRASH_IGNORE.getKey(), "true");
Path path = createMock(Path.class);
assertFalse(gc.moveToTrash(path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public void finishReplacement(AccumuloConfiguration acuTableConf, VolumeManager
// up later
for (Path path : oldDatafiles) {
Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName());
if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !fs.moveToTrash(deleteFile)) {
@SuppressWarnings("removal")
Property p = Property.GC_TRASH_IGNORE;
if (acuTableConf.getBoolean(p) || !fs.moveToTrash(deleteFile)) {
fs.deleteRecursively(deleteFile);
}
}
Expand Down
Comment thread
dlmarion marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// base class for ITs that test our legacy trash property and Hadoop's trash policy with accumulo gc
public class GarbageCollectorTrashBase extends ConfigurableMacBase {
Comment thread
dlmarion marked this conversation as resolved.

private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorTrashBase.class);

protected ArrayList<StoredTabletFile> getFilesForTable(ServerContext ctx, AccumuloClient client,
String tableName) {
String tid = client.tableOperations().tableIdMap().get(tableName);
TabletsMetadata tms =
ctx.getAmple().readTablets().forTable(TableId.of(tid)).fetch(ColumnType.FILES).build();
ArrayList<StoredTabletFile> files = new ArrayList<>();
tms.forEach(tm -> {
files.addAll(tm.getFiles());
});
LOG.debug("Tablet files: {}", files);
return files;
}

protected ArrayList<StoredTabletFile> loadData(ServerContext ctx, AccumuloClient client,
String tableName) throws Exception {
// create some files
for (int i = 0; i < 5; i++) {
ReadWriteIT.ingest(client, 10, 10, 10, 0, tableName);
client.tableOperations().flush(tableName);
}
return getFilesForTable(ctx, client, tableName);
}

protected boolean userTrashDirExists(FileSystem fs) {
return !fs.getTrashRoots(false).isEmpty();
}

protected void makeTrashDir(FileSystem fs) throws IOException {
if (!userTrashDirExists(fs)) {
Path homeDir = fs.getHomeDirectory();
Path trashDir = new Path(homeDir, ".Trash");
Comment thread
ctubbsii marked this conversation as resolved.
assertTrue(fs.mkdirs(trashDir));
}
assertTrue(userTrashDirExists(fs));

}

protected void waitForFilesToBeGCd(final ArrayList<StoredTabletFile> files) throws Exception {
Wait.waitFor(() -> files.stream().noneMatch(stf -> {
try {
return super.getCluster().getMiniDfs().getFileSystem().exists(stf.getPath());
} catch (IOException e) {
throw new UncheckedIOException("error", e);
}
}));
}

protected long countFilesInTrash(FileSystem fs, TableId tid)
throws FileNotFoundException, IOException {
Collection<FileStatus> dirs = fs.getTrashRoots(true);
if (dirs.isEmpty()) {
return -1;
}
long count = 0;
Iterator<FileStatus> iter = dirs.iterator();
while (iter.hasNext()) {
FileStatus stat = iter.next();
LOG.debug("Trash root: {}", stat.getPath());
RemoteIterator<LocatedFileStatus> riter = fs.listFiles(stat.getPath(), true);
while (riter.hasNext()) {
LocatedFileStatus lfs = riter.next();
if (lfs.isDirectory()) {
continue;
}
TabletFile tf = new TabletFile(lfs.getPath());
LOG.debug("File in trash: {}, tableId: {}", lfs.getPath(), tf.getTableId());
if (tid.equals(tf.getTableId())) {
count++;
}
}
}
return count;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;

// verify trash is not used with Hadoop default configuration as Trash is not
// enabled by default.
public class GarbageCollectorTrashDefaultIT extends GarbageCollectorTrashBase {
Comment thread
dlmarion marked this conversation as resolved.

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(5);
}

@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
// By default Hadoop trash is disabled - fs.trash.interval defaults to 0
Comment thread
dlmarion marked this conversation as resolved.
Map<String,String> hadoopOverrides = new HashMap<>();
hadoopOverrides.put(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, "0");
cfg.setHadoopConfOverrides(hadoopOverrides);
cfg.useMiniDFS(true);

cfg.setProperty(Property.GC_CYCLE_START, "1");
cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
@SuppressWarnings("removal")
Property p = Property.GC_TRASH_IGNORE;
cfg.setProperty(p, "false"); // default, use trash if configured
cfg.setProperty(Property.GC_PORT, "0");
cfg.setProperty(Property.TSERV_MAXMEM, "5K");
cfg.setProperty(Property.TABLE_MAJC_RATIO, "5.0");
cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
}

@Test
public void testTrashHadoopDisabledAccumuloEnabled() throws Exception {
String table = this.getUniqueNames(1)[0];
final FileSystem fs = super.getCluster().getFileSystem();
super.makeTrashDir(fs);
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
ArrayList<StoredTabletFile> files = super.loadData(super.getServerContext(), c, table);
assertFalse(files.isEmpty());
c.tableOperations().compact(table, new CompactionConfig());
TableId tid = TableId.of(c.tableOperations().tableIdMap().get(table));
// The default value for fs.trash.interval is 0, which means that
// trash is disabled in the Hadoop configuration. Enabling trash in
// Accumulo (GC_TRASH_IGNORE = false) still requires enabling trash in Hadoop
super.waitForFilesToBeGCd(files);
assertEquals(0, super.countFilesInTrash(fs, tid));
}

}

}
Loading