Skip to content

Commit

Permalink
Fix #1365 2.1 Upgrade processing for #1043 ~del (#1366)
Browse files Browse the repository at this point in the history
* Fix #1365 2.1 Upgrade processing for #1043 ~del
  • Loading branch information
hkeebler committed Sep 27, 2019
1 parent a600898 commit 840fe43
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 19 deletions.
Expand Up @@ -65,12 +65,16 @@ public interface Ample {
* derived from the table id.
*/
public enum DataLevel {
ROOT(null), METADATA(RootTable.NAME), USER(MetadataTable.NAME);
ROOT(null, null),
METADATA(RootTable.NAME, RootTable.ID),
USER(MetadataTable.NAME, MetadataTable.ID);

private final String table;
private final TableId id;

private DataLevel(String table) {
private DataLevel(String table, TableId id) {
this.table = table;
this.id = id;
}

/**
Expand All @@ -81,6 +85,15 @@ public String metaTable() {
throw new UnsupportedOperationException();
return table;
}

/**
* @return The Id of the Accumulo table in which this data level stores its metadata.
*/
public TableId tableId() {
if (id == null)
throw new UnsupportedOperationException();
return id;
}
}

/**
Expand Down
Expand Up @@ -270,6 +270,14 @@ public static String decodeRow(String row) {
return row.substring(encoded_prefix_length);
}

/**
* Value to indicate that the row has been skewed/encoded.
*/
public static class SkewedKeyValue {
public static final String STR_NAME = "skewed";
public static final Value NAME = new Value(STR_NAME);
}

}

/**
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
Expand All @@ -35,13 +36,12 @@
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerContext;
Expand All @@ -51,7 +51,6 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;

public class ServerAmpleImpl extends AmpleImpl implements Ample {

Expand Down Expand Up @@ -134,7 +133,7 @@ public void deleteGcCandidates(DataLevel level, Collection<String> paths) {

try (BatchWriter writer = context.createBatchWriter(level.metaTable())) {
for (String path : paths) {
Mutation m = new Mutation(MetadataSchema.DeletesSection.encodeRow(path));
Mutation m = new Mutation(DeletesSection.encodeRow(path));
m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
writer.addMutation(m);
}
Expand All @@ -155,9 +154,9 @@ public Iterator<String> getGcCandidates(DataLevel level, String continuePoint) {

return candidates.iterator();
} else if (level == DataLevel.METADATA || level == DataLevel.USER) {
Range range = MetadataSchema.DeletesSection.getRange();
Range range = DeletesSection.getRange();
if (continuePoint != null && !continuePoint.isEmpty()) {
String continueRow = MetadataSchema.DeletesSection.encodeRow(continuePoint);
String continueRow = DeletesSection.encodeRow(continuePoint);
range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true,
range.getEndKey(), range.isEndKeyInclusive());
}
Expand All @@ -169,10 +168,9 @@ public Iterator<String> getGcCandidates(DataLevel level, String continuePoint) {
throw new RuntimeException(e);
}
scanner.setRange(range);

return Iterators.transform(scanner.iterator(),
entry -> MetadataSchema.DeletesSection.decodeRow(entry.getKey().getRow().toString()));

return StreamSupport.stream(scanner.spliterator(), false)
.filter(entry -> entry.getValue().equals(DeletesSection.SkewedKeyValue.NAME))
.map(entry -> DeletesSection.decodeRow(entry.getKey().getRow().toString())).iterator();
} else {
throw new IllegalArgumentException();
}
Expand All @@ -196,9 +194,8 @@ private BatchWriter createWriter(TableId tableId) {
public static Mutation createDeleteMutation(ServerContext context, TableId tableId,
String pathToRemove) {
Path path = context.getVolumeManager().getFullPath(tableId, pathToRemove);
Mutation delFlag =
new Mutation(new Text(MetadataSchema.DeletesSection.encodeRow(path.toString())));
delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
Mutation delFlag = new Mutation(new Text(DeletesSection.encodeRow(path.toString())));
delFlag.put(EMPTY_TEXT, EMPTY_TEXT, DeletesSection.SkewedKeyValue.NAME);
return delFlag;
}

Expand Down
Expand Up @@ -38,8 +38,8 @@ public class UpgradeCoordinator {
private boolean haveUpgradedZooKeeper = false;
private boolean startedMetadataUpgrade = false;
private int currentVersion;
private Map<Integer,Upgrader> upgraders =
Map.of(ServerConstants.SHORTEN_RFILE_KEYS, new Upgrader8to9());
private Map<Integer,Upgrader> upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS,
new Upgrader8to9(), ServerConstants.CRYPTO_CHANGES, new Upgrader9to10());

public UpgradeCoordinator(ServerContext ctx) {
int currentVersion = ServerUtil.getAccumuloPersistentVersion(ctx.getVolumeManager());
Expand Down
Expand Up @@ -20,26 +20,42 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_GC_CANDIDATES;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
Expand All @@ -51,6 +67,7 @@
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.metadata.RootGcCandidates;
import org.apache.accumulo.server.metadata.ServerAmpleImpl;
import org.apache.accumulo.server.metadata.TabletMutatorBase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -74,6 +91,14 @@ public class Upgrader9to10 implements Upgrader {
public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs";
public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
public static final Value UPGRADED = MetadataSchema.DeletesSection.SkewedKeyValue.NAME;
public static final String OLD_DELETE_PREFIX = "~del";

/**
* This percentage was taken from the SimpleGarbageCollector and if nothing else is going on
* during upgrade then it could be larger.
*/
static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f;

@Override
public void upgradeZookeeper(ServerContext ctx) {
Expand All @@ -82,6 +107,8 @@ public void upgradeZookeeper(ServerContext ctx) {

@Override
public void upgradeMetadata(ServerContext ctx) {
upgradeFileDeletes(ctx, Ample.DataLevel.METADATA);
upgradeFileDeletes(ctx, Ample.DataLevel.USER);

}

Expand Down Expand Up @@ -352,4 +379,78 @@ static Map<String,DataFileValue> cleanupRootTabletFiles(VolumeManager fs, String
}
}

public void upgradeFileDeletes(ServerContext ctx, Ample.DataLevel level) {

String tableName = level.metaTable();
AccumuloClient c = ctx;

// find all deletes
try (BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig())) {
log.info("looking for candidates in table {}", tableName);
Iterator<String> oldCandidates = getOldCandidates(ctx, tableName);
int t = 0; // no waiting first time through
while (oldCandidates.hasNext()) {
// give it some time for memory to clean itself up if needed
sleepUninterruptibly(t, TimeUnit.SECONDS);
List<String> deletes = readCandidatesThatFitInMemory(oldCandidates);
log.info("found {} deletes to upgrade", deletes.size());
for (String olddelete : deletes) {
// create new formatted delete
log.trace("upgrading delete entry for {}", olddelete);
writer.addMutation(ServerAmpleImpl.createDeleteMutation(ctx, level.tableId(), olddelete));
}
writer.flush();
// if nothing thrown then we're good so mark all deleted
log.info("upgrade processing completed so delete old entries");
for (String olddelete : deletes) {
log.trace("deleting old entry for {}", olddelete);
writer.addMutation(deleteOldDeleteMutation(olddelete));
}
writer.flush();
t = 3;
}
} catch (TableNotFoundException | MutationsRejectedException e) {
throw new RuntimeException(e);
}
}

private Iterator<String> getOldCandidates(ServerContext ctx, String tableName)
throws TableNotFoundException {
Range range = MetadataSchema.DeletesSection.getRange();
Scanner scanner = ctx.createScanner(tableName, Authorizations.EMPTY);
scanner.setRange(range);
return StreamSupport.stream(scanner.spliterator(), false)
.filter(entry -> !entry.getValue().equals(UPGRADED))
.map(entry -> entry.getKey().getRow().toString().substring(OLD_DELETE_PREFIX.length()))
.iterator();
}

private List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) {
List<String> result = new ArrayList<>();
// Always read at least one. If memory doesn't clean up fast enough at least
// some progress is made.
while (candidates.hasNext()) {
result.add(candidates.next());
if (almostOutOfMemory(Runtime.getRuntime()))
break;
}
return result;
}

private Mutation deleteOldDeleteMutation(final String delete) {
Mutation m = new Mutation(OLD_DELETE_PREFIX + delete);
m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
return m;
}

private boolean almostOutOfMemory(Runtime runtime) {
if (runtime.totalMemory() - runtime.freeMemory()
> CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory()) {
log.info("List of delete candidates has exceeded the memory"
+ " threshold. Attempting to delete what has been gathered so far.");
return true;
} else
return false;
}

}
Expand Up @@ -57,6 +57,7 @@
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.server.metadata.ServerAmpleImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
Expand Down Expand Up @@ -213,7 +214,11 @@ public void testInvalidDelete() throws Exception {
try (BatchWriter bw = c.createBatchWriter(MetadataTable.NAME)) {
bw.addMutation(createDelMutation("", "", "", ""));
bw.addMutation(createDelMutation("", "testDel", "test", "valueTest"));
bw.addMutation(createDelMutation("/", "", "", ""));
// path is invalid but value is expected - only way the invalid entry will come through
// processing and
// show up to produce error in output to allow while loop to end
bw.addMutation(
createDelMutation("/", "", "", MetadataSchema.DeletesSection.SkewedKeyValue.STR_NAME));
}

ProcessInfo gc = cluster.exec(SimpleGarbageCollector.class);
Expand Down Expand Up @@ -304,7 +309,8 @@ private void addEntries(AccumuloClient client) throws Exception {
for (int i = 0; i < 100000; ++i) {
String longpath = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
+ "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj";
Mutation delFlag = createDelMutation(String.format("/%020d/%s", i, longpath), "", "", "");
Mutation delFlag = ServerAmpleImpl.createDeleteMutation(getServerContext(),
MetadataTable.ID, String.format("/%020d/%s", i, longpath));
bw.addMutation(delFlag);
}
}
Expand Down

0 comments on commit 840fe43

Please sign in to comment.