Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACCUMULO-4629 add exact deletes #588

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Expand Up @@ -41,6 +41,7 @@ public class Constants {
public static final String ZTABLE_COMPACT_ID = "/compact-id";
public static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
public static final String ZTABLE_NAMESPACE = "/namespace";
public static final String ZTABLE_EXACT_DELETE = "/exact-delete";

public static final String ZNAMESPACES = "/namespaces";
public static final String ZNAMESPACE_NAME = "/name";
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.impl.TableOperationsHelper;
import org.apache.accumulo.core.client.rfile.RFile.ScannerOptions;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.client.summary.Summarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
Expand All @@ -56,6 +57,7 @@ public class NewTableConfiguration {
private TimeType timeType = DEFAULT_TIME_TYPE;

private boolean limitVersion = true;
private boolean exactDelete = false;

private Map<String,String> properties = Collections.emptyMap();
private Map<String,String> samplerProps = Collections.emptyMap();
Expand Down Expand Up @@ -156,6 +158,31 @@ public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfigur
return this;
}

/**
* This setting determines how deletes are interpreted for a table. When this setting is false,
* which is the default, deletes hide everything in a column where the timestamp is less than or
* equal to the delete. When this setting is true, only versions in a column with the same
* timestamp as a delete are hidden.
*
* @see TableOperations#isExactDeleteEnabled(String)
* @see ScannerOptions#withExactDeletes()
* @since 2.0.0
* @return this
*/
public NewTableConfiguration setExactDeleteEnabled(boolean b) {
exactDelete = b;
return this;
}

/**
* @since 2.0.0
* @return The value previously passed to {@link #setExactDeleteEnabled(boolean)}. If
* {@link #setExactDeleteEnabled(boolean)} was never called then returns false.
*/
public boolean isExactDeleteEnabled() {
return exactDelete;
}

/**
* Enables creating summary statistics using {@link Summarizer}'s for the new table.
*
Expand Down
Expand Up @@ -1015,4 +1015,14 @@ void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> pred
*/
List<SummarizerConfiguration> listSummarizers(String tableName)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException;

/**
* Determines if exact deletes were enabled for a table when it was created. By default exact
* deletes are not enabled.
*
* @see NewTableConfiguration#setExactDeleteEnabled(boolean)
* @since 2.0.0
*/
public boolean isExactDeleteEnabled(String tableName)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException;
}
Expand Up @@ -158,12 +158,14 @@ public IteratorEnvironment cloneWithSamplingEnabled() {
private ScannerOptions options;
private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
private AccumuloConfiguration config;
private boolean exactDeletes;

public OfflineIterator(ScannerOptions options, ClientContext context,
Authorizations authorizations, Text table, Range range) {
Authorizations authorizations, Text table, Range range, boolean exactDeletes) {
this.options = new ScannerOptions(options);
this.context = context;
this.range = range;
this.exactDeletes = exactDeletes;

if (this.options.fetchedColumns.size() > 0) {
this.range = range.bound(this.options.fetchedColumns.first(),
Expand Down Expand Up @@ -383,7 +385,7 @@ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<
defaultSecurityLabel = cv.getExpression();

SortedKeyValueIterator<Key,Value> visFilter = IteratorUtil.setupSystemScanIterators(multiIter,
new HashSet<>(options.fetchedColumns), authorizations, defaultSecurityLabel);
new HashSet<>(options.fetchedColumns), authorizations, defaultSecurityLabel, exactDeletes);

return iterEnv.getTopLevelIterator(
IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf,
Expand Down
Expand Up @@ -38,6 +38,7 @@ public class OfflineScanner extends ScannerOptions implements Scanner {
private ClientContext context;
private Authorizations authorizations;
private Text tableId;
private boolean exactDeletes;

public OfflineScanner(ClientContext context, Table.ID tableId, Authorizations authorizations) {
checkArgument(context != null, "context is null");
Expand All @@ -49,6 +50,7 @@ public OfflineScanner(ClientContext context, Table.ID tableId, Authorizations au
this.authorizations = authorizations;
this.batchSize = Constants.SCAN_BATCH_SIZE;
this.timeOut = Integer.MAX_VALUE;
this.exactDeletes = Tables.isExactDeleteEnabled(context, tableId);
}

@Deprecated
Expand Down Expand Up @@ -95,7 +97,7 @@ public void disableIsolation() {

@Override
public Iterator<Entry<Key,Value>> iterator() {
return new OfflineIterator(this, context, authorizations, tableId, range);
return new OfflineIterator(this, context, authorizations, tableId, range, exactDeletes);
}

@Override
Expand Down
Expand Up @@ -235,7 +235,8 @@ public void create(String tableName, NewTableConfiguration ntc)
checkArgument(ntc != null, "ntc is null");

List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)),
ByteBuffer.wrap(ntc.getTimeType().name().getBytes(UTF_8)));
ByteBuffer.wrap(ntc.getTimeType().name().getBytes(UTF_8)),
ByteBuffer.wrap(Boolean.toString(ntc.isExactDeleteEnabled()).getBytes(UTF_8)));

Map<String,String> opts = ntc.getProperties();

Expand Down Expand Up @@ -1929,4 +1930,14 @@ public List<SummarizerConfiguration> listSummarizers(String tableName)
public ImportSourceArguments addFilesTo(String tableName) {
return new BulkImport(tableName, context);
}

@Override
public boolean isExactDeleteEnabled(String tableName) throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");

Table.ID tableId = Tables.getTableId(context, tableName);

return Tables.isExactDeleteEnabled(context, tableId);

}
}
Expand Up @@ -237,6 +237,16 @@ public static TableState getTableState(ClientContext context, Table.ID tableId,

}

public static boolean isExactDeleteEnabled(ClientContext context, Table.ID tableId) {
String path = ZooUtil.getRoot(context.getInstanceID()) + Constants.ZTABLES + "/"
+ tableId.canonicalID() + Constants.ZTABLE_EXACT_DELETE;
ZooCache zc = getZooCache(context);

byte[] val = zc.get(path);

return Boolean.parseBoolean(new String(val, UTF_8));
}

public static String qualified(String tableName) {
return qualified(tableName, Namespace.DEFAULT);
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Predicate;

import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.client.summary.Summarizer;
Expand Down Expand Up @@ -169,6 +170,18 @@ public interface ScannerOptions {
*/
ScannerOptions withBounds(Range range);

/**
* This setting determines how delete marker are interpreted. By default deletes hide everything
* in a column where the timestamp is less then or equal to the delete. With this setting only
* versions of the column with the same timestamp as a delete are hidden.
*
* @see NewTableConfiguration#setExactDeleteEnabled(boolean)
* @see TableOperations#isExactDeleteEnabled(String)
* @since 2.0.0
* @return this
*/
ScannerOptions withExactDeletes();

/**
* Construct the {@link Scanner} with iterators specified in a tables properties. Properties for
* a table can be obtained by calling {@link TableOperations#getProperties(String)}
Expand Down
Expand Up @@ -89,6 +89,7 @@ static class Opts {
boolean useSystemIterators = true;
public HashMap<String,String> tableConfig;
Range bounds;
public boolean exactDeletes = false;
}

// This cache exist as a hack to avoid leaking decompressors. When the RFile code is not given a
Expand Down Expand Up @@ -180,6 +181,11 @@ public void indexWeightChanged() {}
"Set authorizations and specified not to use system iterators");
}

if (opts.exactDeletes && !opts.useSystemIterators) {
throw new IllegalArgumentException(
"Requested exact deletes and specified not to use system iterators");
}

this.opts = opts;
if (null != opts.tableConfig && opts.tableConfig.size() > 0) {
ConfigurationCopy tableCC = new ConfigurationCopy(DefaultConfiguration.getInstance());
Expand Down Expand Up @@ -377,7 +383,7 @@ public Iterator<Entry<Key,Value>> iterator() {
SortedSet<Column> cols = this.getFetchedColumns();
families = LocalityGroupUtil.families(cols);
iterator = IteratorUtil.setupSystemScanIterators(iterator, cols, getAuthorizations(),
EMPTY_BYTES);
EMPTY_BYTES, opts.exactDeletes);
}

try {
Expand Down
Expand Up @@ -149,4 +149,10 @@ public ScannerOptions withBounds(Range range) {
this.opts.bounds = range;
return this;
}

@Override
public ScannerOptions withExactDeletes() {
this.opts.exactDeletes = true;
return this;
}
}
Expand Up @@ -449,8 +449,8 @@ public static List<IteratorSetting> decodeIteratorSettings(byte[] enc) {

public static SortedKeyValueIterator<Key,Value> setupSystemScanIterators(
SortedKeyValueIterator<Key,Value> source, Set<Column> cols, Authorizations auths,
byte[] defaultVisibility) throws IOException {
DeletingIterator delIter = new DeletingIterator(source, false);
byte[] defaultVisibility, boolean exactDeletes) throws IOException {
SortedKeyValueIterator<Key,Value> delIter = DeletingIterator.wrap(source, false, exactDeletes);
ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
SortedKeyValueIterator<Key,Value> colFilter = ColumnQualifierFilter.wrap(cfsi, cols);
return VisibilityFilter.wrap(colFilter, auths, defaultVisibility);
Expand Down
Expand Up @@ -30,6 +30,8 @@
import org.apache.accumulo.core.iterators.ServerWrappingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

import com.google.common.annotations.VisibleForTesting;

public class DeletingIterator extends ServerWrappingIterator {
private boolean propogateDeletes;
private Key workKey = new Key();
Expand All @@ -39,13 +41,13 @@ public DeletingIterator deepCopy(IteratorEnvironment env) {
return new DeletingIterator(this, env);
}

public DeletingIterator(DeletingIterator other, IteratorEnvironment env) {
private DeletingIterator(DeletingIterator other, IteratorEnvironment env) {
super(other.source.deepCopy(env));
propogateDeletes = other.propogateDeletes;
}

public DeletingIterator(SortedKeyValueIterator<Key,Value> iterator, boolean propogateDeletes)
throws IOException {
@VisibleForTesting
DeletingIterator(SortedKeyValueIterator<Key,Value> iterator, boolean propogateDeletes) {
super(iterator);
this.propogateDeletes = propogateDeletes;
}
Expand Down Expand Up @@ -105,4 +107,13 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
IteratorEnvironment env) {
throw new UnsupportedOperationException();
}

public static SortedKeyValueIterator<Key,Value> wrap(SortedKeyValueIterator<Key,Value> iterator,
boolean propogateDeletes, boolean exactDeletes) {
if (exactDeletes) {
return new ExactDeletingIterator(iterator, propogateDeletes);
} else {
return new DeletingIterator(iterator, propogateDeletes);
}
}
}
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.accumulo.core.iterators.system;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.ServerWrappingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

public class ExactDeletingIterator extends ServerWrappingIterator {
private boolean propogateDeletes;
private Key workKey = new Key();

@Override
public ExactDeletingIterator deepCopy(IteratorEnvironment env) {
return new ExactDeletingIterator(this, env);
}

public ExactDeletingIterator(ExactDeletingIterator other, IteratorEnvironment env) {
super(other.source.deepCopy(env));
propogateDeletes = other.propogateDeletes;
}

public ExactDeletingIterator(SortedKeyValueIterator<Key,Value> iterator,
boolean propogateDeletes) {
super(iterator);
this.propogateDeletes = propogateDeletes;
}

@Override
public void next() throws IOException {
if (source.getTopKey().isDeleted())
skipRowColumnTime();
else
source.next();
findTop();
}

private static Range adjustRange(Range range) {
Range seekRange = range;

if (range.getStartKey() != null
&& (range.isStartKeyInclusive() ^ range.getStartKey().isDeleted())) {
Key seekKey = new Key(seekRange.getStartKey());
seekKey.setDeleted(true);
seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
}

return seekRange;
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
// do not want to seek to the middle of a row
Range seekRange = adjustRange(range);

source.seek(seekRange, columnFamilies, inclusive);
findTop();

if (propogateDeletes && range != seekRange) {
while (source.hasTop() && range.beforeStartKey(source.getTopKey())) {
next();
}
}
}

private void findTop() throws IOException {
if (!propogateDeletes) {
while (source.hasTop() && source.getTopKey().isDeleted()) {
skipRowColumnTime();
}
}
}

private void skipRowColumnTime() throws IOException {
workKey.set(source.getTopKey());

Key keyToSkip = workKey;
source.next();

while (source.hasTop()
&& source.getTopKey().equals(keyToSkip, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME)) {
source.next();
}
}

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
}