Skip to content
Permalink
Browse files
Merge branch 'gerrit/neo'
Change-Id: I493e78c38f1a41557fdb4b960b58dbdc20b62104
  • Loading branch information
AliSolaiman committed Jan 25, 2022
2 parents 90c4a93 + 4cced3e commit f2b2ac1f5272dc33b2ba773ac52865cc185710bd
Showing 5 changed files with 138 additions and 25 deletions.
@@ -52,6 +52,7 @@
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@
import org.apache.asterix.transaction.management.service.logging.LogReader;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeli
RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
filterFields == null ? 0 : filterFields.length, recordType, metaType);
// fix pk fields
int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length);
int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
pkFieldsInCommitOp[i] = diff + i;
pkFieldsInCommitOp[i] = start++;
}
CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeli

private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
ARecordType itemType, ARecordType metaItemType) throws Exception {
ITypeTraits[] outputTypeTraits =
new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
// 1 boolean field at the beginning to indicate whether the operation was upsert or delete
int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount();
ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields];

// add the previous record first
ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
int f = 0;
outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
// add the upsert indicator boolean field
outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
f++;
// add the previous record
outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
f++;
// add the previous meta second
if (dataset.hasMetaPart()) {
outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
f++;
}
// add the previous filter third
@@ -854,10 +864,8 @@ private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, D
}
}
fieldIdx = i;
outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
f++;
}
for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
@@ -19,7 +19,6 @@
package org.apache.asterix.test.dataflow;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@@ -45,6 +44,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -84,7 +94,7 @@ public class SearchCursorComponentSwitchTest {
private static final boolean[] UNIQUE_META_FIELDS = null;
private static final int[] KEY_INDEXES = { 0 };
private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
private static final List<Integer> KEY_INDICATORS_LIST = List.of(Index.RECORD_INDICATOR);
private static final int TOTAL_NUM_OF_RECORDS = 2000;
private static final int RECORDS_PER_COMPONENT = 1000;
private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@ public class SearchCursorComponentSwitchTest {
private static IIndexDataflowHelper indexDataflowHelper;
private static ITransactionContext txnCtx;
private static LSMPrimaryInsertOperatorNodePushable insertOp;
private static LSMPrimaryUpsertOperatorNodePushable upsertOp;

@BeforeClass
public static void setUp() throws Exception {
@@ -143,6 +154,8 @@ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningSt
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
}

@After
@@ -201,6 +214,63 @@ public void testCursorSwitchSucceed() {
}
}

@Test
public void testCursorSwitchSucceedWithNoDuplicates() {
try {
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearSearchCallbacks();
RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE + 2;
ITupleReference[] upsertTuples = new ITupleReference[totalNumRecords];
for (int j = 0; j < totalNumRecords; j++) {
ITupleReference tuple = tupleGenerator.next();
upsertTuples[j] = TupleUtils.copyTuple(tuple);
}

// upsert and flush the tuples to create a disk component
upsert(tupleAppender, totalNumRecords, upsertTuples, true);
// upsert but don't flush the tuples to create a memory component
upsert(tupleAppender, totalNumRecords, upsertTuples, false);

// do the search operation
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(lsmBtree.getHarness(),
lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), LSMBTreeSearchCursor::new);
IIndexCursor searchCursor = accessor.createSearchCursor(false);
MultiComparator lowKeySearchCmp =
BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
MultiComparator highKeySearchCmp =
BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
RangePredicate rangePredicate =
new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp, null, null);

accessor.search(searchCursor, rangePredicate);

int count = 0;
while (searchCursor.hasNext()) {
searchCursor.next();
count++;
// flush the memory component to disk so that we make the switch to it when we hit the switch cycle
if (count == 1) {
StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
}
}

Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
failure = CleanupUtils.destroy(failure, searchCursor, accessor);
Assert.assertEquals("Records count not matching", totalNumRecords, count);
if (failure != null) {
Assert.fail(failure.getMessage());
}
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

@Test
public void testCursorSwitchFails() {
try {
@@ -268,4 +338,17 @@ private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx
emptyTupleOp.close();
Assert.assertEquals(numOfRecords, countOp.getCount());
}

private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, ITupleReference[] upsertTuples,
boolean flush) throws Exception {
upsertOp.open();
for (int j = 0; j < totalNumRecords; j++) {
DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], upsertOp);
}
tupleAppender.write(upsertOp, true);
if (flush) {
StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
}
upsertOp.close();
}
}
@@ -246,10 +246,15 @@ private void replaceMemoryComponentWithDiskComponentIfNeeded() throws HyracksDat
rangeCursors[i].close();
btreeAccessors[i].reset(btree, iap);
btreeAccessors[i].search(rangeCursors[i], reusablePred);
pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
// consume the element that we restarted the search at since before the switch it was consumed
if (rangeCursors[i].hasNext()) {
rangeCursors[i].next();
switchedElements[i].reset(rangeCursors[i].getTuple());
}
}
}
switchRequest[i] = false;
switchedElements[i] = null;
// any failed switch makes further switches pointless
switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
}
@@ -274,14 +279,18 @@ protected int replaceFrom() throws HyracksDataException {
if (replaceFrom < 0) {
replaceFrom = i;
}
// we return the outputElement to the priority queue if it came from this component

PriorityQueueElement element;
if (outputElement != null && outputElement.getCursorIndex() == i) {
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
needPushElementIntoQueue = false;
outputElement = null;
canCallProceed = true;
// there should be no element from this cursor in the queue since the element was polled
if (findElement(outputPriorityQueue, i) != null) {
throw new IllegalStateException("found element in the queue from the cursor of output element");
}
element = outputElement;
} else {
element = findElement(outputPriorityQueue, i);
}
PriorityQueueElement element = remove(outputPriorityQueue, i);

// if this cursor is still active (has an element)
// then we copy the search key to restart the operation after
// replacing the component
@@ -341,6 +350,18 @@ private PriorityQueueElement remove(PriorityQueue<PriorityQueueElement> outputPr
return null;
}

private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
// Scans the PQ for the component's element
Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
while (it.hasNext()) {
PriorityQueueElement e = it.next();
if (e.getCursorIndex() == cursorIndex) {
return e;
}
}
return null;
}

@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
@@ -193,7 +193,7 @@ public void replace(ILSMIndexOperationContext ctx) {
for (int i = 0; i < count; i++) {
ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]);
if (removed.getType() == LSMComponentType.MEMORY) {
LOGGER.info("Removed a memory component from the search operation");
LOGGER.debug("Removed memory component {} from the search operation", removed);
} else {
throw new IllegalStateException("Disk components can't be removed from the search operation");
}
@@ -40,7 +40,7 @@
import org.apache.hyracks.storage.common.MultiComparator;

public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
protected static final int SWITCH_COMPONENT_CYCLE = 100;
public static final int SWITCH_COMPONENT_CYCLE = 100;
protected final ILSMIndexOperationContext opCtx;
protected final boolean returnDeletedTuples;
protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@ public void doClose() throws HyracksDataException {
needPushElementIntoQueue = false;
for (int i = 0; i < switchRequest.length; i++) {
switchRequest[i] = false;
switchedElements[i] = null;
}
try {
if (outputPriorityQueue != null) {

0 comments on commit f2b2ac1

Please sign in to comment.