diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java index d87975aa2..e202816f3 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/format/FluoFormatter.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -21,6 +21,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.NotificationUtil; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.values.DelLockValue; @@ -88,34 +89,21 @@ public static String toString(Entry entry) { } else { long ts = key.getTimestamp(); String type = ""; - - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.TX_DONE_PREFIX) { - type = "TX_DONE"; - } - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DEL_LOCK_PREFIX) { - type = "DEL_LOCK"; - } - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.LOCK_PREFIX) { - type = "LOCK"; - } - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.DATA_PREFIX) { - type = "DATA"; + ColumnType colType = ColumnType.from(ts); + + switch (colType) { + case RLOCK: + if (ReadLockUtil.isDelete(ts)) { + type = "DEL_RLOCK"; + } else { + type = "RLOCK"; + } + ts = ReadLockUtil.decodeTs(ts); + break; + default: + type = colType.toString(); + break; } - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.WRITE_PREFIX) { - type = "WRITE"; - } - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.ACK_PREFIX) { - type = "ACK"; - } - if ((ts & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) { - if (ReadLockUtil.isDelete(ts)) { - type = "DEL_RLOCK"; - } else { - type = "RLOCK"; - } - ts = ReadLockUtil.decodeTs(ts); - } - StringBuilder sb = new StringBuilder(); diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java index d07f59ee6..ad4a8aaa8 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.util.ZookeeperUtil; import org.apache.fluo.accumulo.values.DelLockValue; @@ -125,10 +126,10 @@ public void next() throws IOException { private boolean consumeData() throws IOException { while (source.hasTop() && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(source.getTopKey()); long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.DATA_PREFIX) { + if (colType == ColumnType.DATA) { if (ts >= truncationTime && !rolledback.contains(ts)) { return false; } @@ -173,132 +174,147 @@ private void readColMetadata() throws IOException { return; } - while (source.hasTop() + loop: while (source.hasTop() && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(source.getTopKey()); long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.TX_DONE_PREFIX) { - keys.add(source.getTopKey(), source.getTopValue()); - completeTxs.add(ts); - } else if (colType == ColumnConstants.WRITE_PREFIX) { - boolean keep = false; - boolean complete = completeTxs.contains(ts); - byte[] val = source.getTopValue().get(); - long timePtr = WriteValue.getTimestamp(val); - - if (WriteValue.isPrimary(val) && !complete) { - keep = true; + switch (colType) { + case TX_DONE: { + keys.add(source.getTopKey(), source.getTopValue()); + completeTxs.add(ts); + break; } + case WRITE: { + boolean keep = false; + boolean complete = completeTxs.contains(ts); + byte[] val = source.getTopValue().get(); + long timePtr = WriteValue.getTimestamp(val); - if (!oldestSeen) { - if (firstWrite == -1) { - firstWrite = ts; + if (WriteValue.isPrimary(val) && !complete) { + keep = true; } - if (ts < gcTimestamp) { - oldestSeen = true; - truncationTime = timePtr; - if (!(WriteValue.isDelete(val) && isFullMajc)) { + if (!oldestSeen) { + if (firstWrite == -1) { + firstWrite = ts; + } + + if (ts < gcTimestamp) { + oldestSeen = true; + truncationTime = timePtr; + if (!(WriteValue.isDelete(val) && isFullMajc)) { + keep = true; + } + } else { keep = true; } - } else { - keep = true; } - } - if (timePtr > invalidationTime) { - invalidationTime = timePtr; - } + if (timePtr > invalidationTime) { + invalidationTime = timePtr; + } - if (keep) { - keys.add(source.getTopKey(), val); - } else if (complete) { - completeTxs.remove(ts); + if (keep) { + keys.add(source.getTopKey(), val); + } else if (complete) { + completeTxs.remove(ts); + } + break; } - } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { - boolean keep = false; - long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get()); - boolean complete = completeTxs.contains(txDoneTs); + case DEL_LOCK: { + boolean keep = false; + long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get()); + boolean complete = completeTxs.contains(txDoneTs); - byte[] val = source.getTopValue().get(); + byte[] val = source.getTopValue().get(); - if (!complete && DelLockValue.isPrimary(val)) { - keep = true; - } + if (!complete && DelLockValue.isPrimary(val)) { + keep = true; + } - if (DelLockValue.isRollback(val)) { - rolledback.add(ts); - keep |= !isFullMajc; - } + if (DelLockValue.isRollback(val)) { + rolledback.add(ts); + keep |= !isFullMajc; + } - if (ts > invalidationTime) { - invalidationTime = ts; - } + if (ts > invalidationTime) { + invalidationTime = ts; + } - if (keep) { - keys.add(source.getTopKey(), source.getTopValue()); - } else if (complete) { - completeTxs.remove(txDoneTs); + if (keep) { + keys.add(source.getTopKey(), source.getTopValue()); + } else if (complete) { + completeTxs.remove(txDoneTs); + } + break; } - } else if (colType == ColumnConstants.RLOCK_PREFIX) { - boolean keep = false; - long rlts = ReadLockUtil.decodeTs(ts); - boolean isDelete = ReadLockUtil.isDelete(ts); + case RLOCK: { + boolean keep = false; + long rlts = ReadLockUtil.decodeTs(ts); + boolean isDelete = ReadLockUtil.isDelete(ts); - if (isDelete) { - lastReadLockDeleteTs = rlts; - } + if (isDelete) { + lastReadLockDeleteTs = rlts; + } - if (rlts > invalidationTime) { - if (isFullMajc) { - if (isDelete) { - if (DelReadLockValue.isRollback(source.getTopValue().get())) { - // can drop rolled back read lock delete markers on any full majc, do not need to - // consider gcTimestamp - keep = false; + if (rlts > invalidationTime) { + if (isFullMajc) { + if (isDelete) { + if (DelReadLockValue.isRollback(source.getTopValue().get())) { + // can drop rolled back read lock delete markers on any full majc, do not need to + // consider gcTimestamp + keep = false; + } else { + long rlockCommitTs = + DelReadLockValue.getCommitTimestamp(source.getTopValue().get()); + keep = rlockCommitTs >= gcTimestamp; + } } else { - long rlockCommitTs = - DelReadLockValue.getCommitTimestamp(source.getTopValue().get()); - keep = rlockCommitTs >= gcTimestamp; + keep = lastReadLockDeleteTs != rlts; } } else { - keep = lastReadLockDeleteTs != rlts; + // can drop deleted read lock entries.. keep the delete entry. + keep = isDelete || lastReadLockDeleteTs != rlts; } - } else { - // can drop deleted read lock entries.. keep the delete entry. - keep = isDelete || lastReadLockDeleteTs != rlts; } - } - if (keep) { - keys.add(source.getTopKey(), source.getTopValue()); - } - } else if (colType == ColumnConstants.LOCK_PREFIX) { - if (ts > invalidationTime) { - keys.add(source.getTopKey(), source.getTopValue()); + if (keep) { + keys.add(source.getTopKey(), source.getTopValue()); + } + break; } - } else if (colType == ColumnConstants.DATA_PREFIX) { - // can stop looking - break; - } else if (colType == ColumnConstants.ACK_PREFIX) { - if (!sawAck) { - if (ts >= firstWrite) { + case LOCK: { + if (ts > invalidationTime) { keys.add(source.getTopKey(), source.getTopValue()); } - sawAck = true; + break; } - } else { - throw new IllegalArgumentException(" unknown colType " + String.format("%x", colType)); + case DATA: { + // can stop looking + break loop; + } + case ACK: { + if (!sawAck) { + if (ts >= firstWrite) { + keys.add(source.getTopKey(), source.getTopValue()); + } + sawAck = true; + } + break; + } + + default: + throw new IllegalArgumentException(" unknown colType " + colType); + } source.next(); } keys.copyTo(keysFiltered, (timestamp -> { - long colType = timestamp & ColumnConstants.PREFIX_MASK; - if (colType == ColumnConstants.TX_DONE_PREFIX) { + if (ColumnType.from(timestamp) == ColumnType.TX_DONE) { return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK); } else { return true; diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java index dd4de54b0..167eff71e 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/OpenReadLockIterator.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -26,17 +26,10 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; -import static org.apache.fluo.accumulo.util.ColumnConstants.ACK_PREFIX; -import static org.apache.fluo.accumulo.util.ColumnConstants.DATA_PREFIX; -import static org.apache.fluo.accumulo.util.ColumnConstants.DEL_LOCK_PREFIX; -import static org.apache.fluo.accumulo.util.ColumnConstants.LOCK_PREFIX; -import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX; import static org.apache.fluo.accumulo.util.ColumnConstants.TIMESTAMP_MASK; -import static org.apache.fluo.accumulo.util.ColumnConstants.TX_DONE_PREFIX; -import static org.apache.fluo.accumulo.util.ColumnConstants.WRITE_PREFIX; public class OpenReadLockIterator implements SortedKeyValueIterator { @@ -47,35 +40,43 @@ public class OpenReadLockIterator implements SortedKeyValueIterator private void findTop() throws IOException { while (source.hasTop()) { - long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(source.getTopKey()); - if (colType == TX_DONE_PREFIX || colType == WRITE_PREFIX || colType == DEL_LOCK_PREFIX) { - source.skipToPrefix(source.getTopKey(), RLOCK_PREFIX); - continue; - } else if (colType == RLOCK_PREFIX) { - if (ReadLockUtil.isDelete(source.getTopKey())) { - lastDelete.set(source.getTopKey()); - } else { - if (lastDelete.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK); - long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & TIMESTAMP_MASK); - - if (ts1 != ts2) { + switch (colType) { + case TX_DONE: + case WRITE: + case DEL_LOCK: { + source.skipToPrefix(source.getTopKey(), ColumnType.RLOCK); + break; + } + case RLOCK: { + if (ReadLockUtil.isDelete(source.getTopKey())) { + lastDelete.set(source.getTopKey()); + } else { + if (lastDelete.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { + long ts1 = ReadLockUtil.decodeTs(source.getTopKey().getTimestamp() & TIMESTAMP_MASK); + long ts2 = ReadLockUtil.decodeTs(lastDelete.getTimestamp() & TIMESTAMP_MASK); + + if (ts1 != ts2) { + // found a read lock that is not suppressed by a delete read lock entry + return; + } + } else { // found a read lock that is not suppressed by a delete read lock entry return; } - } else { - // found a read lock that is not suppressed by a delete read lock entry - return; } + source.next(); + break; + } + case DATA: + case LOCK: + case ACK: { + source.skipColumn(source.getTopKey()); + break; } - source.next(); - continue; - } else if (colType == DATA_PREFIX || colType == LOCK_PREFIX || colType == ACK_PREFIX) { - source.skipColumn(source.getTopKey()); - continue; - } else { - throw new IllegalArgumentException("Unknown column type " + source.getTopKey()); + default: + throw new IllegalArgumentException("Unknown column type " + source.getTopKey()); } } } diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java index b6f9a4856..f6de3f849 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/PrewriteIterator.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -28,6 +28,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.values.DelReadLockValue; import org.apache.fluo.accumulo.values.WriteValue; @@ -102,9 +103,9 @@ public void seek(Range range, Collection columnFamilies, boolean i Key endKey = new Key(range.getStartKey()); if (checkAck) { - endKey.setTimestamp(ColumnConstants.DATA_PREFIX | ColumnConstants.TIMESTAMP_MASK); + endKey.setTimestamp(ColumnType.DATA.first()); } else { - endKey.setTimestamp(ColumnConstants.ACK_PREFIX | ColumnConstants.TIMESTAMP_MASK); + endKey.setTimestamp(ColumnType.ACK.first()); } // Tried seeking directly to WRITE_PREFIX, however this did not work well because of how @@ -120,114 +121,126 @@ public void seek(Range range, Collection columnFamilies, boolean i while (source.hasTop() && seekRange.getStartKey().equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(source.getTopKey()); long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.TX_DONE_PREFIX) { - // tried to make 1st seek go to WRITE_PREFIX, but this did not allow the DeleteIterator to - // be removed from the stack so it was slower. - source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.WRITE_PREFIX); - } else if (colType == ColumnConstants.WRITE_PREFIX) { - long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); - - if (timePtr > invalidationTime) { - invalidationTime = timePtr; + switch (colType) { + case TX_DONE: { + // tried to make 1st seek go to WRITE_PREFIX, but this did not allow the DeleteIterator to + // be removed from the stack so it was slower. + source.skipToPrefix(seekRange.getStartKey(), ColumnType.WRITE); + break; } + case WRITE: { + long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); - if (ts >= snaptime) { - hasTop = true; - return; - } - - source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.DEL_LOCK_PREFIX); - } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { - if (ts > invalidationTime) { - invalidationTime = ts; + if (timePtr > invalidationTime) { + invalidationTime = timePtr; + } if (ts >= snaptime) { hasTop = true; return; } - } - if (readlock) { - source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX); - } else { - source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.RLOCK_PREFIX); + source.skipToPrefix(seekRange.getStartKey(), ColumnType.DEL_LOCK); + break; } - } else if (colType == ColumnConstants.RLOCK_PREFIX) { + case DEL_LOCK: { + if (ts > invalidationTime) { + invalidationTime = ts; - long lastDeleteTs = -1; - long rlts = ReadLockUtil.decodeTs(ts); + if (ts >= snaptime) { + hasTop = true; + return; + } + } - if (!readlock) { - while (rlts > invalidationTime && colType == ColumnConstants.RLOCK_PREFIX) { - if (ReadLockUtil.isDelete(ts)) { - // ignore rolled back read locks, these should never prevent a write lock - if (!DelReadLockValue.isRollback(source.getTopValue().get())) { - if (rlts >= snaptime) { - hasTop = true; - return; - } else { - long rlockCommitTs = - DelReadLockValue.getCommitTimestamp(source.getTopValue().get()); - if (rlockCommitTs > snaptime) { + if (readlock) { + source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK); + } else { + source.skipToPrefix(seekRange.getStartKey(), ColumnType.RLOCK); + } + break; + } + case RLOCK: { + long lastDeleteTs = -1; + long rlts = ReadLockUtil.decodeTs(ts); + + if (!readlock) { + while (rlts > invalidationTime && colType == ColumnType.RLOCK) { + if (ReadLockUtil.isDelete(ts)) { + // ignore rolled back read locks, these should never prevent a write lock + if (!DelReadLockValue.isRollback(source.getTopValue().get())) { + if (rlts >= snaptime) { hasTop = true; return; + } else { + long rlockCommitTs = + DelReadLockValue.getCommitTimestamp(source.getTopValue().get()); + if (rlockCommitTs > snaptime) { + hasTop = true; + return; + } } } - } - lastDeleteTs = rlts; - } else { - if (rlts != lastDeleteTs) { - // this read lock is active - hasTop = true; - return; + lastDeleteTs = rlts; + } else { + if (rlts != lastDeleteTs) { + // this read lock is active + hasTop = true; + return; + } + } + + source.next(); + if (source.hasTop()) { + colType = ColumnType.from(source.getTopKey()); + ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; + rlts = ReadLockUtil.decodeTs(ts); + } else { + break; } } + } - source.next(); - if (source.hasTop()) { - colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; - ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - rlts = ReadLockUtil.decodeTs(ts); - } else { - break; - } + if (source.hasTop() && (colType == ColumnType.RLOCK)) { + source.skipToPrefix(seekRange.getStartKey(), ColumnType.LOCK); } + break; } + case LOCK: { + if (ts > invalidationTime) { + // nothing supersedes this lock, therefore the column is locked + hasTop = true; + return; + } - if (source.hasTop() && (colType == ColumnConstants.RLOCK_PREFIX)) { - source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.LOCK_PREFIX); - } - } else if (colType == ColumnConstants.LOCK_PREFIX) { - if (ts > invalidationTime) { - // nothing supersedes this lock, therefore the column is locked - hasTop = true; - return; + if (checkAck) { + source.skipToPrefix(seekRange.getStartKey(), ColumnType.ACK); + } else { + // only ack and data left and not interested in either so stop looking + return; + } + break; } - - if (checkAck) { - source.skipToPrefix(seekRange.getStartKey(), ColumnConstants.ACK_PREFIX); - } else { - // only ack and data left and not interested in either so stop looking + case DATA: { + // can stop looking return; } - } else if (colType == ColumnConstants.DATA_PREFIX) { - // can stop looking - return; - } else if (colType == ColumnConstants.ACK_PREFIX) { - if (checkAck && ts > ntfyTimestamp) { - hasTop = true; - return; - } else { - // nothing else to look at in this column - return; + case ACK: { + if (checkAck && ts > ntfyTimestamp) { + hasTop = true; + return; + } else { + // nothing else to look at in this column + return; + } } - } else { - throw new IllegalArgumentException(); + default: + throw new IllegalArgumentException(); } } } diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java index cae383f8b..7df373f5a 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -29,6 +29,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.values.WriteValue; public class RollbackCheckIterator implements SortedKeyValueIterator { @@ -91,60 +92,69 @@ public void seek(Range range, Collection columnFamilies, boolean i hasTop = false; while (source.hasTop() && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(source.getTopKey()); long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.TX_DONE_PREFIX) { - source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX); - continue; - } else if (colType == ColumnConstants.WRITE_PREFIX) { - long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); - - if (timePtr > invalidationTime) { - invalidationTime = timePtr; + switch (colType) { + case TX_DONE: + source.skipToPrefix(curCol, ColumnType.WRITE); + continue; + case WRITE: { + long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); + + if (timePtr > invalidationTime) { + invalidationTime = timePtr; + } + + if (lockTime == timePtr) { + hasTop = true; + return; + } + + if (lockTime > timePtr) { + source.skipToPrefix(curCol, ColumnType.DEL_LOCK); + continue; + } + break; } - - if (lockTime == timePtr) { - hasTop = true; - return; + case DEL_LOCK: { + if (ts > invalidationTime) { + invalidationTime = ts; + } + + if (ts == lockTime) { + hasTop = true; + return; + } + + if (lockTime > ts) { + source.skipToPrefix(curCol, ColumnType.LOCK); + continue; + } + break; } - - if (lockTime > timePtr) { - source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX); + case RLOCK: { + source.skipToPrefix(curCol, ColumnType.LOCK); continue; } - - } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { - if (ts > invalidationTime) { - invalidationTime = ts; + case LOCK: { + if (ts > invalidationTime) { + // nothing supersedes this lock, therefore the column is locked + hasTop = true; + return; + } + break; } - - if (ts == lockTime) { - hasTop = true; + case DATA: { + // can stop looking return; } - - if (lockTime > ts) { - source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX); - continue; - } - - } else if (colType == ColumnConstants.RLOCK_PREFIX) { - source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX); - continue; - } else if (colType == ColumnConstants.LOCK_PREFIX) { - if (ts > invalidationTime) { - // nothing supersedes this lock, therefore the column is locked - hasTop = true; - return; + case ACK: { + // do nothing if ACK + break; } - } else if (colType == ColumnConstants.DATA_PREFIX) { - // can stop looking - return; - } else if (colType == ColumnConstants.ACK_PREFIX) { - // do nothing if ACK - } else { - throw new IllegalArgumentException(); + default: + throw new IllegalArgumentException(); } source.next(); diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java index 27e63ef4c..4b5ab9d0a 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/SnapshotIterator.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -33,6 +33,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.values.WriteValue; public class SnapshotIterator implements SortedKeyValueIterator { @@ -91,87 +92,95 @@ private void findTop() throws IOException { while (source.hasTop() && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - long colType = source.getTopKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(source.getTopKey()); long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.TX_DONE_PREFIX) { - source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX); - continue; - } else if (colType == ColumnConstants.WRITE_PREFIX) { - long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); - - if (timePtr > invalidationTime) { - invalidationTime = timePtr; + switch (colType) { + case TX_DONE: { + source.skipToPrefix(curCol, ColumnType.WRITE); + continue; } + case WRITE: { + long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); - if (dataPointer == -1) { - if (ts <= snaptime) { - dataPointer = timePtr; - source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX); - continue; + if (timePtr > invalidationTime) { + invalidationTime = timePtr; + } + + if (dataPointer == -1) { + if (ts <= snaptime) { + dataPointer = timePtr; + source.skipToPrefix(curCol, ColumnType.DEL_LOCK); + continue; + } else { + source.skipToTimestamp(curCol, ColumnType.WRITE.enode(snaptime)); + continue; + } + } + break; + } + case DEL_LOCK: { + if (ts > invalidationTime) { + invalidationTime = ts; + } + if (returnReadLockPresent) { + source.skipToPrefix(curCol, ColumnType.RLOCK); } else { - source.skipToTimestamp(curCol, ColumnConstants.WRITE_PREFIX | snaptime); - continue; + source.skipToPrefix(curCol, ColumnType.LOCK); } + continue; } - } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { - if (ts > invalidationTime) { - invalidationTime = ts; + case RLOCK: { + if (returnReadLockPresent) { + rememberReadLock(source.getTopKey(), source.getTopValue()); + } + + source.skipToPrefix(curCol, ColumnType.LOCK); + continue; } - if (returnReadLockPresent) { - source.skipToPrefix(curCol, ColumnConstants.RLOCK_PREFIX); - } else { - source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX); + case LOCK: { + if (ts > invalidationTime && ts <= snaptime) { + // nothing supersedes this lock, therefore the column is locked + return; + } else { + if (dataPointer == -1) { + source.skipColumn(curCol); + continue outer; + } else { + source.skipToTimestamp(curCol, ColumnType.DATA.enode(dataPointer)); + continue; + } + } } - continue; + case DATA: { + if (dataPointer == ts) { + // found data for this column + return; + } - } else if (colType == ColumnConstants.RLOCK_PREFIX) { - if (returnReadLockPresent) { - rememberReadLock(source.getTopKey(), source.getTopValue()); - } + if (ts < dataPointer || dataPointer == -1) { + source.skipColumn(curCol); + continue outer; + } - source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX); - continue; - } else if (colType == ColumnConstants.LOCK_PREFIX) { - if (ts > invalidationTime && ts <= snaptime) { - // nothing supersedes this lock, therefore the column is locked - return; - } else { + if (ts > dataPointer) { + source.skipToTimestamp(curCol, ColumnType.DATA.enode(dataPointer)); + continue; + } + break; + } + case ACK: { if (dataPointer == -1) { source.skipColumn(curCol); continue outer; } else { - source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer); + source.skipToTimestamp(curCol, ColumnType.DATA.enode(dataPointer)); continue; } } - } else if (colType == ColumnConstants.DATA_PREFIX) { - if (dataPointer == ts) { - // found data for this column - return; - } - - if (ts < dataPointer || dataPointer == -1) { - source.skipColumn(curCol); - continue outer; - } - - if (ts > dataPointer) { - source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer); - continue; - } - } else if (colType == ColumnConstants.ACK_PREFIX) { - if (dataPointer == -1) { - source.skipColumn(curCol); - continue outer; - } else { - source.skipToTimestamp(curCol, ColumnConstants.DATA_PREFIX | dataPointer); - continue; - } - } else { - throw new IllegalArgumentException(); + default: + throw new IllegalArgumentException(); } - // TODO handle case where dataPointer >=0, but no data was found source.next(); } @@ -220,8 +229,7 @@ public void seek(Range range, Collection columnFamilies, boolean i if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE && !range.isStartKeyInclusive()) { - if ((range.getStartKey().getTimestamp() - & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) { + if (ColumnType.from(range.getStartKey()) == ColumnType.RLOCK) { Key currCol = new Key(range.getStartKey()); currCol.setTimestamp(Long.MAX_VALUE); newRange = new Range(currCol, true, range.getEndKey(), range.isEndKeyInclusive()); diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java index 3279ec2f9..534c82da3 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; /** * The purpose of this iterator is to make seeking within a columns timestamp range efficient. @@ -80,10 +80,8 @@ public void skipToTimestamp(Key curCol, long timestamp) throws IOException { } } - public void skipToPrefix(Key curCol, long prefix) throws IOException { - // first possible timestamp in sorted order for this prefix - long timestamp = prefix | ColumnConstants.TIMESTAMP_MASK; - skipToTimestamp(curCol, timestamp); + public void skipToPrefix(Key curCol, ColumnType colType) throws IOException { + skipToTimestamp(curCol, colType.first()); } public void skipColumn(Key curCol) throws IOException { diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java index 7063adcde..16b9f1ee7 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java @@ -21,20 +21,11 @@ * Constants used extract data from columns */ public class ColumnConstants { - - public static final long PREFIX_MASK = 0xe000000000000000L; - public static final long TX_DONE_PREFIX = 0x6000000000000000L; - public static final long WRITE_PREFIX = 0x4000000000000000L; - public static final long DEL_LOCK_PREFIX = 0x2000000000000000L; - public static final long RLOCK_PREFIX = 0x0000000000000000L; - public static final long LOCK_PREFIX = 0xe000000000000000L; - public static final long ACK_PREFIX = 0xc000000000000000L; - public static final long DATA_PREFIX = 0xa000000000000000L; - public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL; + public static final long PREFIX_MASK = -1L << (64 - ColumnType.BITS); + public static final long TIMESTAMP_MASK = -1L >>> ColumnType.BITS; public static final Bytes NOTIFY_CF = Bytes.of("ntfy"); public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify"; public static final Bytes GC_CF = Bytes.of("gc"); private ColumnConstants() {} - } diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java new file mode 100644 index 000000000..b16fe6dc3 --- /dev/null +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnType.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.accumulo.util; + +import com.google.common.base.Preconditions; +import org.apache.accumulo.core.data.Key; + +/** + * Abstracts how the Fluo column type is encoded in Accumulo timestamps. + */ +public enum ColumnType { + + TX_DONE, WRITE, DEL_LOCK, RLOCK, LOCK, ACK, DATA; + + private long prefix; + + /** + * @return The first possible timestamp in sorted order. + */ + public long first() { + return prefix | ColumnConstants.TIMESTAMP_MASK; + } + + /** + * @return The timestamp with this column type encoded into the high order bits. + */ + public long enode(long timestamp) { + Preconditions.checkArgument((timestamp >>> (64 - BITS)) == 0); + return prefix | timestamp; + } + + // The number of leftmost bits in in the timestamp reserved for encoding the column type + static final int BITS = 3; + private static final byte TX_DONE_PREFIX = 0x03; + private static final byte WRITE_PREFIX = 0x02; + private static final byte DEL_LOCK_PREFIX = 0x01; + private static final byte RLOCK_PREFIX = 0x00; + private static final byte LOCK_PREFIX = 0x07; + private static final byte ACK_PREFIX = 0x06; + private static final byte DATA_PREFIX = 0x05; + + static { + TX_DONE.prefix = (long) TX_DONE_PREFIX << (64 - BITS); + WRITE.prefix = (long) WRITE_PREFIX << (64 - BITS); + DEL_LOCK.prefix = (long) DEL_LOCK_PREFIX << (64 - BITS); + RLOCK.prefix = (long) RLOCK_PREFIX << (64 - BITS); + LOCK.prefix = (long) LOCK_PREFIX << (64 - BITS); + ACK.prefix = (long) ACK_PREFIX << (64 - BITS); + DATA.prefix = (long) DATA_PREFIX << (64 - BITS); + } + + public static ColumnType from(Key k) { + return from(k.getTimestamp()); + } + + public static ColumnType from(long timestamp) { + byte prefix = (byte) (timestamp >>> (64 - BITS)); + switch (prefix) { + case TX_DONE_PREFIX: + return TX_DONE; + case WRITE_PREFIX: + return WRITE; + case DEL_LOCK_PREFIX: + return DEL_LOCK; + case RLOCK_PREFIX: + return RLOCK; + case LOCK_PREFIX: + return LOCK; + case ACK_PREFIX: + return ACK; + case DATA_PREFIX: + return DATA; + default: + throw new IllegalArgumentException("Unknown prefix : " + Integer.toHexString(prefix)); + } + } +} diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java index 417fd8109..7358e1ee7 100644 --- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java +++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/SnapshotIteratorTest.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -26,7 +26,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.SortedMapIterator; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.junit.Assert; import org.junit.Test; @@ -320,6 +320,6 @@ public void testNegativeTime() { @Test(expected = IllegalArgumentException.class) public void testNonZeroPrefix() { - SnapshotIterator.setSnaptime(null, ColumnConstants.DATA_PREFIX | 6); + SnapshotIterator.setSnaptime(null, ColumnType.DATA.enode(6)); } } diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java index dbdcd1b6c..ad595e06d 100644 --- a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java +++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/iterators/TestData.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -29,7 +29,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.fluo.accumulo.format.FluoFormatter; import org.apache.fluo.accumulo.iterators.CountingIterator.Counter; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.accumulo.values.DelReadLockValue; @@ -101,28 +101,28 @@ public TestData addIfInRange(String key, String value, Range range) { switch (ct) { case "ACK": - ts |= ColumnConstants.ACK_PREFIX; + ts = ColumnType.ACK.enode(ts); break; case "TX_DONE": - ts |= ColumnConstants.TX_DONE_PREFIX; + ts = ColumnType.TX_DONE.enode(ts); break; case "WRITE": - ts |= ColumnConstants.WRITE_PREFIX; + ts = ColumnType.WRITE.enode(ts); long writeTs = Long.parseLong(value.split("\\s+")[0]); val = WriteValue.encode(writeTs, value.contains("PRIMARY"), value.contains("DELETE")); break; case "LOCK": - ts |= ColumnConstants.LOCK_PREFIX; + ts = ColumnType.LOCK.enode(ts); String rc[] = value.split("\\s+"); val = LockValue.encode(Bytes.of(rc[0]), new Column(rc[1], rc[2]), value.contains("WRITE"), value.contains("DELETE"), value.contains("TRIGGER"), 42l); break; case "DATA": - ts |= ColumnConstants.DATA_PREFIX; + ts = ColumnType.DATA.enode(ts); val = value.getBytes(); break; case "DEL_LOCK": - ts |= ColumnConstants.DEL_LOCK_PREFIX; + ts = ColumnType.DEL_LOCK.enode(ts); if (value.contains("ROLLBACK") || value.contains("ABORT")) { val = DelLockValue.encodeRollback(value.contains("PRIMARY"), true); } else { @@ -132,11 +132,11 @@ public TestData addIfInRange(String key, String value, Range range) { break; case "RLOCK": ts = ReadLockUtil.encodeTs(ts, false); - ts |= ColumnConstants.RLOCK_PREFIX; + ts = ColumnType.RLOCK.enode(ts); break; case "DEL_RLOCK": ts = ReadLockUtil.encodeTs(ts, true); - ts |= ColumnConstants.RLOCK_PREFIX; + ts = ColumnType.RLOCK.enode(ts); if (value.contains("ROLLBACK") || value.contains("ABORT")) { val = DelReadLockValue.encodeRollback(); diff --git a/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java new file mode 100644 index 000000000..3b0b09bb5 --- /dev/null +++ b/modules/accumulo/src/test/java/org/apache/fluo/accumulo/util/ColumnTypeTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.accumulo.util; + +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import org.apache.accumulo.core.data.Key; +import org.junit.Test; + +import static org.apache.fluo.accumulo.util.ColumnType.ACK; +import static org.apache.fluo.accumulo.util.ColumnType.DATA; +import static org.apache.fluo.accumulo.util.ColumnType.DEL_LOCK; +import static org.apache.fluo.accumulo.util.ColumnType.LOCK; +import static org.apache.fluo.accumulo.util.ColumnType.RLOCK; +import static org.apache.fluo.accumulo.util.ColumnType.TX_DONE; +import static org.apache.fluo.accumulo.util.ColumnType.WRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ColumnTypeTest { + + private static final long TIMESTAMP_MASK = 0x1fffffffffffffffL; + + + // map of expected prefixes for a column type + private static final Map EPM; + + static { + Builder builder = ImmutableMap.builder(); + builder.put(0x6000000000000000L, TX_DONE); + builder.put(0x4000000000000000L, WRITE); + builder.put(0x2000000000000000L, DEL_LOCK); + builder.put(0x0000000000000000L, RLOCK); + builder.put(0xe000000000000000L, LOCK); + builder.put(0xc000000000000000L, ACK); + builder.put(0xa000000000000000L, DATA); + EPM = builder.build(); + } + + @Test + public void testPrefix() { + for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 48}) { + EPM.forEach((prefix, colType) -> assertEquals(prefix | l, colType.enode(l))); + } + } + + @Test + public void testFirst() { + EPM.forEach((prefix, colType) -> assertEquals(prefix | TIMESTAMP_MASK, colType.first())); + for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 48}) { + EPM.forEach((prefix, colType) -> { + Key k1 = new Key("r", "f", "q"); + k1.setTimestamp(prefix | l); + Key k2 = new Key("r", "f", "q"); + k2.setTimestamp(colType.first()); + assertTrue(k1.compareTo(k2) > 0); + }); + } + } + + @Test + public void testFrom() { + for (long l : new long[] {0, 2, 13, 19 * 19L, 1L << 50, 1L << 50 + 1L << 48}) { + EPM.forEach((prefix, colType) -> { + assertEquals(ColumnType.from(prefix | l), colType); + Key k = new Key("r", "f", "q"); + k.setTimestamp(prefix | l); + assertEquals(ColumnType.from(k), colType); + }); + } + } + + @Test + public void testCoverage() { + EnumSet expected = EnumSet.allOf(ColumnType.class); + HashSet actual = new HashSet<>(EPM.values()); + assertEquals(expected, actual); + } +} diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java index ea47bbfbe..54d2fa037 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java @@ -36,6 +36,7 @@ import org.apache.fluo.accumulo.iterators.OpenReadLockIterator; import org.apache.fluo.accumulo.iterators.PrewriteIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.accumulo.values.DelReadLockValue; @@ -50,7 +51,6 @@ import org.apache.fluo.core.util.FluoCondition; import org.apache.fluo.core.util.SpanUtil; -import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK; import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG; /** @@ -99,7 +99,7 @@ private static class LockInfo { public LockInfo(Entry kve) { long rawTs = kve.getKey().getTimestamp(); this.entry = kve; - if ((rawTs & ColumnConstants.PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) { + if (ColumnType.from(rawTs) == ColumnType.RLOCK) { this.lockTs = ReadLockUtil.decodeTs(rawTs); ReadLockValue rlv = new ReadLockValue(kve.getValue().get()); this.prow = rlv.getPrimaryRow(); @@ -221,11 +221,11 @@ private static void rollback(Environment env, long startTs, PrimaryRowColumn prc if (lockInfo.isReadLock) { mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(), k.getColumnVisibilityParsed(), - ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(lockInfo.lockTs, true), + ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(lockInfo.lockTs, true)), DelReadLockValue.encodeRollback()); } else { mut.put(k.getColumnFamilyData().toArray(), k.getColumnQualifierData().toArray(), - k.getColumnVisibilityParsed(), ColumnConstants.DEL_LOCK_PREFIX | lockInfo.lockTs, + k.getColumnVisibilityParsed(), ColumnType.DEL_LOCK.enode(lockInfo.lockTs), DelLockValue.encodeRollback(false, true)); } } @@ -241,7 +241,7 @@ private static boolean rollbackPrimary(Environment env, long startTs, PrimaryRow ConditionalFlutation delLockMutation = new ConditionalFlutation(env, prc.prow, new FluoCondition(env, prc.pcol).setIterators(iterConf).setValue(lockValue)); - delLockMutation.put(prc.pcol, ColumnConstants.DEL_LOCK_PREFIX | prc.startTs, + delLockMutation.put(prc.pcol, ColumnType.DEL_LOCK.enode(prc.startTs), DelLockValue.encodeRollback(true, true)); ConditionalWriter cw = null; @@ -312,7 +312,7 @@ static List> getOpenReadLocks(Environment env, for (Column col : e1.getValue()) { Key start = SpanUtil.toKey(new RowColumn(e1.getKey(), col)); Key end = new Key(start); - end.setTimestamp(ColumnConstants.LOCK_PREFIX | ColumnConstants.TIMESTAMP_MASK); + end.setTimestamp(ColumnType.LOCK.first()); ranges.add(new Range(start, true, end, false)); } } @@ -329,7 +329,7 @@ static List> getOpenReadLocks(Environment env, List> ret = new ArrayList<>(); for (Entry entry : bscanner) { - if ((entry.getKey().getTimestamp() & PREFIX_MASK) == ColumnConstants.RLOCK_PREFIX) { + if (ColumnType.from(entry.getKey()) == ColumnType.RLOCK) { ret.add(entry); } } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java index f9f1e85a3..37dc45b8f 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -32,7 +32,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; @@ -176,16 +176,20 @@ private void scan(Map> ret, List> lo Bytes row = rowConverter.apply(entry.getKey().getRowData()); Column col = columnConverter.apply(entry.getKey()); - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; - - if (colType == ColumnConstants.LOCK_PREFIX) { - locks.add(entry); - } else if (colType == ColumnConstants.DATA_PREFIX) { - ret.computeIfAbsent(row, k -> new HashMap<>()).put(col, Bytes.of(entry.getValue().get())); - } else if (colType == ColumnConstants.RLOCK_PREFIX) { - readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col); - } else { - throw new IllegalArgumentException("Unexpected column type " + colType); + ColumnType colType = ColumnType.from(entry.getKey()); + switch (colType) { + case LOCK: + locks.add(entry); + break; + case DATA: + ret.computeIfAbsent(row, k -> new HashMap<>()).put(col, + Bytes.of(entry.getValue().get())); + break; + case RLOCK: + readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()).add(col); + break; + default: + throw new IllegalArgumentException("Unexpected column type " + colType); } } } finally { diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java index 5db760258..f8f83f722 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -31,7 +31,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.fluo.accumulo.iterators.SnapshotIterator; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; @@ -172,9 +172,7 @@ public void resolveLock(Entry lockEntry) { while (iterator.hasNext()) { Entry entry = iterator.next(); - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; - - if (colType == ColumnConstants.LOCK_PREFIX) { + if (ColumnType.from(entry.getKey()) == ColumnType.LOCK) { locks.add(entry); locksSeen.accept(lockEntry); } @@ -220,18 +218,19 @@ public Entry getNext() { Entry entry = iterator.next(); - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; - - if (colType == ColumnConstants.LOCK_PREFIX) { - resolveLock(entry); - continue mloop; - } else if (colType == ColumnConstants.DATA_PREFIX) { - stats.incrementEntriesReturned(1); - return entry; - } else if (colType == ColumnConstants.RLOCK_PREFIX) { - return entry; - } else { - throw new IllegalArgumentException("Unexpected column type " + colType); + ColumnType colType = ColumnType.from(entry.getKey()); + + switch (colType) { + case LOCK: + resolveLock(entry); + continue mloop; + case DATA: + stats.incrementEntriesReturned(1); + return entry; + case RLOCK: + return entry; + default: + throw new IllegalArgumentException("Unexpected column type " + colType); } } } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index ae87a24e8..3cbbe5cd7 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.fluo.accumulo.iterators.PrewriteIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.accumulo.values.DelReadLockValue; @@ -87,8 +88,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.fluo.accumulo.util.ColumnConstants.PREFIX_MASK; -import static org.apache.fluo.accumulo.util.ColumnConstants.RLOCK_PREFIX; import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG; import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK; @@ -278,7 +277,7 @@ private Map getImpl(Bytes row, Set columns, continue; } - if ((kve.getKey().getTimestamp() & PREFIX_MASK) == RLOCK_PREFIX) { + if (ColumnType.from(kve.getKey()) == ColumnType.RLOCK) { if (readLockCols == null) { readLockCols = readLocksSeen.computeIfAbsent(row, k -> new HashSet<>()); } @@ -407,14 +406,14 @@ private ConditionalFlutation prewrite(ConditionalFlutation cm, Bytes row, Column } if (isWrite(val) && !isDelete(val)) { - cm.put(col, ColumnConstants.DATA_PREFIX | startTs, val.toArray()); + cm.put(col, ColumnType.DATA.enode(startTs), val.toArray()); } if (isReadLock(val)) { - cm.put(col, ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, false), + cm.put(col, ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, false)), ReadLockValue.encode(primaryRow, primaryColumn, getTransactorID())); } else { - cm.put(col, ColumnConstants.LOCK_PREFIX | startTs, LockValue.encode(primaryRow, primaryColumn, + cm.put(col, ColumnType.LOCK.enode(startTs), LockValue.encode(primaryRow, primaryColumn, isWrite(val), isDelete(val), isTriggerRow, getTransactorID())); } @@ -668,11 +667,10 @@ private boolean checkForAckCollision(ConditionalMutation cm) { if (notification.getColumn().equals(col)) { // check to see if ACK exist after notification Key startKey = SpanUtil.toKey(notification.getRowColumn()); - startKey.setTimestamp( - ColumnConstants.ACK_PREFIX | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK)); + startKey.setTimestamp(ColumnType.ACK.first()); Key endKey = SpanUtil.toKey(notification.getRowColumn()); - endKey.setTimestamp(ColumnConstants.ACK_PREFIX | (notification.getTimestamp() + 1)); + endKey.setTimestamp(ColumnType.ACK.enode(notification.getTimestamp() + 1)); Range range = new Range(startKey, endKey); @@ -1112,11 +1110,10 @@ public Collection createMutations(CommitData cd) { m = new Flutation(env, row); for (Entry entry : updates.get(row).entrySet()) { if (isReadLock(entry.getValue())) { - m.put(entry.getKey(), - ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true), + m.put(entry.getKey(), ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)), DelReadLockValue.encodeRollback()); } else { - m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs, + m.put(entry.getKey(), ColumnType.DEL_LOCK.enode(startTs), DelLockValue.encodeRollback(false, true)); } } @@ -1134,9 +1131,9 @@ public Collection createMutations(CommitData cd) { // mark transaction as complete for garbage collection purposes Flutation m = new Flutation(env, cd.prow); - m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs, + m.put(cd.pcol, ColumnType.DEL_LOCK.enode(startTs), DelLockValue.encodeRollback(startTs, true, true)); - m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY); + m.put(cd.pcol, ColumnType.TX_DONE.enode(startTs), EMPTY); return Collections.singletonList(m); } @@ -1392,7 +1389,7 @@ public Collection createMutations(CommitData cd) { Flutation m = new Flutation(env, cd.prow); // mark transaction as complete for garbage collection purposes - m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY); + m.put(cd.pcol, ColumnType.TX_DONE.enode(commitTs), EMPTY); afterFlushMutations.add(m); if (weakNotification != null) { diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java index 849e2b5a3..2b6126dcd 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxInfo.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -22,6 +22,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.fluo.accumulo.iterators.RollbackCheckIterator; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.accumulo.values.WriteValue; import org.apache.fluo.api.data.Bytes; @@ -51,43 +52,50 @@ public static TxInfo getTransactionInfo(Environment env, Bytes prow, Column pcol return txInfo; } - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(entry.getKey()); long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.LOCK_PREFIX) { - if (ts == startTs) { - txInfo.status = TxStatus.LOCKED; - txInfo.lockValue = entry.getValue().get(); - } else { - txInfo.status = TxStatus.UNKNOWN; // locked by another tx + switch (colType) { + case LOCK: { + if (ts == startTs) { + txInfo.status = TxStatus.LOCKED; + txInfo.lockValue = entry.getValue().get(); + } else { + txInfo.status = TxStatus.UNKNOWN; // locked by another tx + } + break; } - } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { - DelLockValue dlv = new DelLockValue(entry.getValue().get()); - - if (ts != startTs) { - // expect this to always be false, must be a bug in the iterator - throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != " + startTs + ") "); + case DEL_LOCK: { + DelLockValue dlv = new DelLockValue(entry.getValue().get()); + + if (ts != startTs) { + // expect this to always be false, must be a bug in the iterator + throw new IllegalStateException(prow + " " + pcol + " (" + ts + " != " + startTs + ") "); + } + + if (dlv.isRollback()) { + txInfo.status = TxStatus.ROLLED_BACK; + } else { + txInfo.status = TxStatus.COMMITTED; + txInfo.commitTs = dlv.getCommitTimestamp(); + } + break; } + case WRITE: { + long timePtr = WriteValue.getTimestamp(entry.getValue().get()); - if (dlv.isRollback()) { - txInfo.status = TxStatus.ROLLED_BACK; - } else { - txInfo.status = TxStatus.COMMITTED; - txInfo.commitTs = dlv.getCommitTimestamp(); - } - } else if (colType == ColumnConstants.WRITE_PREFIX) { - long timePtr = WriteValue.getTimestamp(entry.getValue().get()); + if (timePtr != startTs) { + // expect this to always be false, must be a bug in the iterator + throw new IllegalStateException( + prow + " " + pcol + " (" + timePtr + " != " + startTs + ") "); + } - if (timePtr != startTs) { - // expect this to always be false, must be a bug in the iterator - throw new IllegalStateException( - prow + " " + pcol + " (" + timePtr + " != " + startTs + ") "); + txInfo.status = TxStatus.COMMITTED; + txInfo.commitTs = ts; + break; } - - txInfo.status = TxStatus.COMMITTED; - txInfo.commitTs = ts; - } else { - throw new IllegalStateException("unexpected col type returned " + colType); + default: + throw new IllegalStateException("unexpected col type returned " + colType); } return txInfo; diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java index bbf1d83d8..9ea513042 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java +++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.accumulo.values.DelReadLockValue; @@ -56,19 +56,18 @@ public static void commitColumn(Environment env, boolean isTrigger, boolean isPr boolean isWrite, boolean isDelete, boolean isReadlock, long startTs, long commitTs, Set observedColumns, Mutation m) { if (isReadlock) { - Flutation.put(env, m, col, - ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true), + Flutation.put(env, m, col, ColumnType.RLOCK.enode(ReadLockUtil.encodeTs(startTs, true)), DelReadLockValue.encodeCommit(commitTs)); } else if (isWrite) { - Flutation.put(env, m, col, ColumnConstants.WRITE_PREFIX | commitTs, + Flutation.put(env, m, col, ColumnType.WRITE.enode(commitTs), WriteValue.encode(startTs, isPrimary, isDelete)); } else { - Flutation.put(env, m, col, ColumnConstants.DEL_LOCK_PREFIX | startTs, + Flutation.put(env, m, col, ColumnType.DEL_LOCK.enode(startTs), DelLockValue.encodeCommit(commitTs, isPrimary)); } if (isTrigger) { - Flutation.put(env, m, col, ColumnConstants.ACK_PREFIX | startTs, TransactionImpl.EMPTY); + Flutation.put(env, m, col, ColumnType.ACK.enode(startTs), TransactionImpl.EMPTY); } } diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java index 546cf35ca..3b475bf5e 100644 --- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java +++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -26,6 +26,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.curator.framework.CuratorFramework; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.LongUtil; import org.apache.fluo.accumulo.util.ZookeeperUtil; import org.apache.fluo.accumulo.values.DelLockValue; @@ -638,12 +639,12 @@ private boolean wasRolledBackPrimary(long startTs, String rolledBackRow) Scanner scanner = aClient.createScanner(getCurTableName(), Authorizations.EMPTY); for (Entry entry : scanner) { - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(entry.getKey()); long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; String row = entry.getKey().getRowData().toString(); byte[] val = entry.getValue().get(); - if (row.equals(rolledBackRow) && colType == ColumnConstants.DEL_LOCK_PREFIX && ts == startTs + if (row.equals(rolledBackRow) && colType == ColumnType.DEL_LOCK && ts == startTs && DelLockValue.isPrimary(val)) { sawExpected = true; } diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java index b99b16d6f..03ff986f8 100644 --- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java +++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -27,6 +27,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.accumulo.format.FluoFormatter; import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.util.ZookeeperPath; import org.apache.fluo.accumulo.util.ZookeeperUtil; import org.apache.fluo.api.client.TransactionBase; @@ -308,10 +309,10 @@ private void verify(long oldestTs) throws TableNotFoundException { numWrites = 0; } - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + ColumnType colType = ColumnType.from(entry.getKey()); long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; - if (colType == ColumnConstants.WRITE_PREFIX) { + if (colType == ColumnType.WRITE) { numWrites++; if (numWrites > 1) { Assert.assertTrue("Extra write had ts " + ts + " < " + oldestTs, ts >= oldestTs); diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java index 1e959ff5f..3fa156c42 100644 --- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java +++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -19,7 +19,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.data.Key; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.values.WriteValue; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; @@ -45,14 +45,14 @@ * FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator(); * // could also reuse column objects. * Column column = new Column("fam1", "fam2"); - * + * * fkvg.setRow("row1").setColumn(column).setValue("val2"); - * + * * for (FluoKeyValue fluoKeyValue : fkvg.getKeyValues()) * writeToAccumuloFile(fluoKeyValue); - * + * * fkvg.setRow("row2").setColumn(column).setValue("val3"); - * + * * // Each call to getKeyValues() returns the same objects populated with different data when * // possible. So subsequent calls to getKeyValues() will create less objects. Of course this * // invalidates what was returned by previous calls to getKeyValues(). @@ -187,11 +187,11 @@ public FluoKeyValueGenerator set(RowColumnValue rcv) { */ public FluoKeyValue[] getKeyValues() { FluoKeyValue kv = keyVals[0]; - kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.WRITE_PREFIX | 1)); + kv.setKey(new Key(row, fam, qual, vis, ColumnType.WRITE.enode(1))); kv.getValue().set(WriteValue.encode(0, false, false)); kv = keyVals[1]; - kv.setKey(new Key(row, fam, qual, vis, ColumnConstants.DATA_PREFIX | 0)); + kv.setKey(new Key(row, fam, qual, vis, ColumnType.DATA.enode(0))); kv.getValue().set(val); return keyVals; diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java index dd813abcc..e09f20e5c 100644 --- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java +++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoMutationGenerator.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -20,7 +20,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.accumulo.util.ColumnConstants; +import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.values.WriteValue; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; @@ -83,9 +83,8 @@ public FluoMutationGenerator put(Column col, Bytes value) { } public FluoMutationGenerator put(Column col, byte[] value) { - Flutation.put(mutation, col, ColumnConstants.DATA_PREFIX | 0, value); - Flutation.put(mutation, col, ColumnConstants.WRITE_PREFIX | 1, - WriteValue.encode(0, false, false)); + Flutation.put(mutation, col, ColumnType.DATA.enode(0), value); + Flutation.put(mutation, col, ColumnType.WRITE.enode(1), WriteValue.encode(0, false, false)); return this; }