Skip to content

Commit

Permalink
fixes #946 Added mem efficient col buffer for GCiter (#952)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 authored and keith-turner committed Oct 26, 2017
1 parent d123d42 commit 51dc912
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.accumulo.iterators;

import java.lang.IllegalArgumentException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.function.LongPredicate;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;

/**
* This class buffers Keys that all have the same row+column. Internally
* it only stores one Key, a list of timestamps and a list of values. At iteration
* time it materializes each Key+Value.
*/
class ColumnBuffer {

private Key key;
private ArrayList<Long> timeStamps;
private ArrayList<byte[]> values;

public ColumnBuffer() {

this.key = null;
this.timeStamps = new ArrayList<>();
this.values = new ArrayList<>();
}

/**
* @param timestamp Timestamp to be added to buffer
* @param v Value to be added to buffer
*/
private void add(long timestamp, byte[] v) {

timeStamps.add(timestamp);
values.add(v);
}

/**
* When empty, the first key added sets the row+column. After this all keys
* added must have the same row+column.
*
* @param k Key to be added to buffer
* @param v Value to be added to buffer
*/
public void add(Key k, byte[] vByte) throws IllegalArgumentException {
vByte = Arrays.copyOf(vByte, vByte.length);

if (key == null) {
key = new Key(k);
add(k.getTimestamp(), vByte);
} else if (key.equals(k, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
add(k.getTimestamp(), vByte);
} else {
throw new IllegalArgumentException();
}
}

/**
* When empty, the first key added sets the row+column. After this all keys
* added must have the same row+column.
*
* @param k Key to be added to buffer
* @param v Value to be added to buffer
*/
public void add(Key k, Value v) throws IllegalArgumentException {
add(k, v.get());
}

/**
* Clears the dest ColumnBuffer and inserts all entries in dest where the timestamp passes
* the timestampTest.
*
* @param dest Destination ColumnBuffer
* @param timestampTest Test to determine which timestamps get added to dest
*/
public void copyTo(ColumnBuffer dest, LongPredicate timestampTest) {
dest.clear();

if (key != null) {
dest.key = new Key(key);
}

for (int i = 0; i < timeStamps.size(); i++) {
long time = timeStamps.get(i);
if (timestampTest.test(time)) {
dest.add(time, values.get(i));
}
}
}

public void clear() {
timeStamps.clear();
values.clear();
key = null;
}

/**
* @return the size of the current buffer
*/
public int size() {
return timeStamps.size();
}

/**
* @param pos Position of the Key that will be retrieved
* @return The key at a given position
*/
public Key getKey(int pos) {
Key tmpKey = new Key(key);
tmpKey.setTimestamp(timeStamps.get(pos));
return tmpKey;
}

/**
* @param pos Position of the Value that will be retrieved
* @return The value at a given position
*/
public Value getValue(int pos) {
return new Value(values.get(pos));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.function.LongPredicate;

import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.IteratorSetting;
Expand All @@ -44,18 +45,6 @@
*/
public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Value> {

private static class KeyValue extends SimpleImmutableEntry<Key, Value> {
private static final long serialVersionUID = 1L;

public KeyValue(Key key, Value value) {
super(new Key(key), new Value(value));
}

public KeyValue(Key key, byte[] value) {
super(new Key(key), new Value(value));
}
}

@VisibleForTesting
static final String GC_TIMESTAMP_OPT = "timestamp.gc";

Expand All @@ -65,8 +54,8 @@ public KeyValue(Key key, byte[] value) {
private Long gcTimestamp;
private SortedKeyValueIterator<Key, Value> source;

private ArrayList<KeyValue> keys = new ArrayList<>();
private ArrayList<KeyValue> keysFiltered = new ArrayList<>();
private ColumnBuffer keys = new ColumnBuffer();
private ColumnBuffer keysFiltered = new ColumnBuffer();
private HashSet<Long> completeTxs = new HashSet<>();
private HashSet<Long> rolledback = new HashSet<>();
private Key curCol = new Key();
Expand All @@ -77,11 +66,11 @@ public KeyValue(Key key, byte[] value) {
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
IteratorEnvironment env) throws IOException {

if (env.getIteratorScope() == IteratorScope.scan) {
throw new IllegalArgumentException();
}
this.source = source;

isFullMajc = env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction();

String oats = options.get(GC_TIMESTAMP_OPT);
Expand All @@ -96,6 +85,7 @@ public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String>
}
}


@Override
public boolean hasTop() {
return position < keysFiltered.size() || source.hasTop();
Expand Down Expand Up @@ -191,7 +181,7 @@ private void readColMetadata() throws IOException {
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;

if (colType == ColumnConstants.TX_DONE_PREFIX) {
keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
keys.add(source.getTopKey(), source.getTopValue());
completeTxs.add(ts);
} else if (colType == ColumnConstants.WRITE_PREFIX) {
boolean keep = false;
Expand Down Expand Up @@ -224,7 +214,7 @@ private void readColMetadata() throws IOException {
}

if (keep) {
keys.add(new KeyValue(source.getTopKey(), val));
keys.add(source.getTopKey(), val);
} else if (complete) {
completeTxs.remove(ts);
}
Expand All @@ -249,21 +239,21 @@ private void readColMetadata() throws IOException {
}

if (keep) {
keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
keys.add(source.getTopKey(), source.getTopValue());
} else if (complete) {
completeTxs.remove(txDoneTs);
}
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
keys.add(source.getTopKey(), source.getTopValue());
}
} else if (colType == ColumnConstants.DATA_PREFIX) {
// can stop looking
break;
} else if (colType == ColumnConstants.ACK_PREFIX) {
if (!sawAck) {
if (ts >= firstWrite) {
keys.add(new KeyValue(source.getTopKey(), source.getTopValue()));
keys.add(source.getTopKey(), source.getTopValue());
}
sawAck = true;
}
Expand All @@ -274,22 +264,20 @@ private void readColMetadata() throws IOException {
source.next();
}

for (KeyValue kv : keys) {
long colType = kv.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
keys.copyTo(keysFiltered, (timestamp -> {
long colType = timestamp & ColumnConstants.PREFIX_MASK;
if (colType == ColumnConstants.TX_DONE_PREFIX) {
if (completeTxs.contains(kv.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK)) {
keysFiltered.add(kv);
}
return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
} else {
keysFiltered.add(kv);
return true;
}
}
}));
}

@Override
public Key getTopKey() {
if (position < keysFiltered.size()) {
return keysFiltered.get(position).getKey();
return keysFiltered.getKey(position);
} else {
return source.getTopKey();
}
Expand All @@ -298,7 +286,7 @@ public Key getTopKey() {
@Override
public Value getTopValue() {
if (position < keysFiltered.size()) {
return keysFiltered.get(position).getValue();
return keysFiltered.getValue(position);
} else {
return source.getTopValue();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.accumulo.iterators;

import java.lang.IllegalArgumentException;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.junit.Assert;
import org.junit.Test;

public class ColumnBufferTest {

@Test
public void testDifferentKeys() {
ColumnBuffer columnBuffer = new ColumnBuffer();
columnBuffer.add(new Key("row1"), new Value());
try {
columnBuffer.add(new Key("row2"), new Value());
Assert.fail();
} catch (IllegalArgumentException e) {

}
}
}

0 comments on commit 51dc912

Please sign in to comment.