Skip to content

Commit

Permalink
Added column type enum (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Nov 8, 2018
1 parent 31d0bc7 commit 7539679
Show file tree
Hide file tree
Showing 22 changed files with 713 additions and 494 deletions.
Expand Up @@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
Expand All @@ -21,6 +21,7 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
Expand Down Expand Up @@ -88,34 +89,21 @@ public static String toString(Entry<Key, Value> entry) {
} else {
long ts = key.getTimestamp();
String type = "";

if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.TX_DONE_PREFIX) {
type = "TX_DONE";
}
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DEL_LOCK_PREFIX) {
type = "DEL_LOCK";
}
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.LOCK_PREFIX) {
type = "LOCK";
}
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DATA_PREFIX) {
type = "DATA";
ColumnType colType = ColumnType.from(ts);

switch (colType) {
case RLOCK:
if (ReadLockUtil.isDelete(ts)) {
type = "DEL_RLOCK";
} else {
type = "RLOCK";
}
ts = ReadLockUtil.decodeTs(ts);
break;
default:
type = colType.toString();
break;
}
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.WRITE_PREFIX) {
type = "WRITE";
}
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) {
type = "ACK";
}
if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) {
if (ReadLockUtil.isDelete(ts)) {
type = "DEL_RLOCK";
} else {
type = "RLOCK";
}
ts = ReadLockUtil.decodeTs(ts);
}


StringBuilder sb = new StringBuilder();

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
Expand Down Expand Up @@ -125,10 +126,10 @@ public void next() throws IOException {
private boolean consumeData() throws IOException {
while (source.hasTop()
&& curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;

if (colType == ColumnConstants.DATA_PREFIX) {
if (colType == ColumnType.DATA) {
if (ts >= truncationTime && !rolledback.contains(ts)) {
return false;
}
Expand Down Expand Up @@ -173,132 +174,147 @@ private void readColMetadata() throws IOException {
return;
}

while (source.hasTop()
loop: while (source.hasTop()
&& curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {

long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
ColumnType colType = ColumnType.from(source.getTopKey());
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;

if (colType == ColumnConstants.TX_DONE_PREFIX) {
keys.add(source.getTopKey(), source.getTopValue());
completeTxs.add(ts);
} else if (colType == ColumnConstants.WRITE_PREFIX) {
boolean keep = false;
boolean complete = completeTxs.contains(ts);
byte[] val = source.getTopValue().get();
long timePtr = WriteValue.getTimestamp(val);

if (WriteValue.isPrimary(val) && !complete) {
keep = true;
switch (colType) {
case TX_DONE: {
keys.add(source.getTopKey(), source.getTopValue());
completeTxs.add(ts);
break;
}
case WRITE: {
boolean keep = false;
boolean complete = completeTxs.contains(ts);
byte[] val = source.getTopValue().get();
long timePtr = WriteValue.getTimestamp(val);

if (!oldestSeen) {
if (firstWrite == -1) {
firstWrite = ts;
if (WriteValue.isPrimary(val) && !complete) {
keep = true;
}

if (ts < gcTimestamp) {
oldestSeen = true;
truncationTime = timePtr;
if (!(WriteValue.isDelete(val) && isFullMajc)) {
if (!oldestSeen) {
if (firstWrite == -1) {
firstWrite = ts;
}

if (ts < gcTimestamp) {
oldestSeen = true;
truncationTime = timePtr;
if (!(WriteValue.isDelete(val) && isFullMajc)) {
keep = true;
}
} else {
keep = true;
}
} else {
keep = true;
}
}

if (timePtr > invalidationTime) {
invalidationTime = timePtr;
}
if (timePtr > invalidationTime) {
invalidationTime = timePtr;
}

if (keep) {
keys.add(source.getTopKey(), val);
} else if (complete) {
completeTxs.remove(ts);
if (keep) {
keys.add(source.getTopKey(), val);
} else if (complete) {
completeTxs.remove(ts);
}
break;
}
} else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
boolean keep = false;
long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
boolean complete = completeTxs.contains(txDoneTs);
case DEL_LOCK: {
boolean keep = false;
long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
boolean complete = completeTxs.contains(txDoneTs);

byte[] val = source.getTopValue().get();
byte[] val = source.getTopValue().get();

if (!complete && DelLockValue.isPrimary(val)) {
keep = true;
}
if (!complete && DelLockValue.isPrimary(val)) {
keep = true;
}

if (DelLockValue.isRollback(val)) {
rolledback.add(ts);
keep |= !isFullMajc;
}
if (DelLockValue.isRollback(val)) {
rolledback.add(ts);
keep |= !isFullMajc;
}

if (ts > invalidationTime) {
invalidationTime = ts;
}
if (ts > invalidationTime) {
invalidationTime = ts;
}

if (keep) {
keys.add(source.getTopKey(), source.getTopValue());
} else if (complete) {
completeTxs.remove(txDoneTs);
if (keep) {
keys.add(source.getTopKey(), source.getTopValue());
} else if (complete) {
completeTxs.remove(txDoneTs);
}
break;
}
} else if (colType == ColumnConstants.RLOCK_PREFIX) {
boolean keep = false;
long rlts = ReadLockUtil.decodeTs(ts);
boolean isDelete = ReadLockUtil.isDelete(ts);
case RLOCK: {
boolean keep = false;
long rlts = ReadLockUtil.decodeTs(ts);
boolean isDelete = ReadLockUtil.isDelete(ts);

if (isDelete) {
lastReadLockDeleteTs = rlts;
}
if (isDelete) {
lastReadLockDeleteTs = rlts;
}

if (rlts > invalidationTime) {
if (isFullMajc) {
if (isDelete) {
if (DelReadLockValue.isRollback(source.getTopValue().get())) {
// can drop rolled back read lock delete markers on any full majc, do not need to
// consider gcTimestamp
keep = false;
if (rlts > invalidationTime) {
if (isFullMajc) {
if (isDelete) {
if (DelReadLockValue.isRollback(source.getTopValue().get())) {
// can drop rolled back read lock delete markers on any full majc, do not need to
// consider gcTimestamp
keep = false;
} else {
long rlockCommitTs =
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
keep = rlockCommitTs >= gcTimestamp;
}
} else {
long rlockCommitTs =
DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
keep = rlockCommitTs >= gcTimestamp;
keep = lastReadLockDeleteTs != rlts;
}
} else {
keep = lastReadLockDeleteTs != rlts;
// can drop deleted read lock entries.. keep the delete entry.
keep = isDelete || lastReadLockDeleteTs != rlts;
}
} else {
// can drop deleted read lock entries.. keep the delete entry.
keep = isDelete || lastReadLockDeleteTs != rlts;
}
}

if (keep) {
keys.add(source.getTopKey(), source.getTopValue());
}
} else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) {
keys.add(source.getTopKey(), source.getTopValue());
if (keep) {
keys.add(source.getTopKey(), source.getTopValue());
}
break;
}
} else if (colType == ColumnConstants.DATA_PREFIX) {
// can stop looking
break;
} else if (colType == ColumnConstants.ACK_PREFIX) {
if (!sawAck) {
if (ts >= firstWrite) {
case LOCK: {
if (ts > invalidationTime) {
keys.add(source.getTopKey(), source.getTopValue());
}
sawAck = true;
break;
}
} else {
throw new IllegalArgumentException(" unknown colType " + String.format("%x", colType));
case DATA: {
// can stop looking
break loop;
}
case ACK: {
if (!sawAck) {
if (ts >= firstWrite) {
keys.add(source.getTopKey(), source.getTopValue());
}
sawAck = true;
}
break;
}

default:
throw new IllegalArgumentException(" unknown colType " + colType);

}

source.next();
}

keys.copyTo(keysFiltered, (timestamp -> {
long colType = timestamp & ColumnConstants.PREFIX_MASK;
if (colType == ColumnConstants.TX_DONE_PREFIX) {
if (ColumnType.from(timestamp) == ColumnType.TX_DONE) {
return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
} else {
return true;
Expand Down

0 comments on commit 7539679

Please sign in to comment.