Skip to content
Permalink
Browse files
[NO ISSUE][STO] Fix search when switching from memory to disk component
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- When searching the index and making the switch from the memory
  components to the disk components, keep the states of the queue and
  the cursors on the switched-to disk components the same as their
  states were on the memory components. If a cursor was the one who
  produced the outputElement, then do not push the next element into
  the queue from the cursor since there should not be an element in
  the queue from this cursor. Restart the search operation at the
  elements that the cursors were at and consume them since they were
  already consumed before we make the switch.

- add test case.

Change-Id: I647641f6044c1edf1477049be1c5d1b697f404c1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14885
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
  • Loading branch information
AliSolaiman committed Jan 25, 2022
1 parent f4c503a commit 4cced3e30c31d8330e69f0a10f5b0a46ee13d7bf
Showing 5 changed files with 138 additions and 25 deletions.
@@ -52,6 +52,7 @@
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@
import org.apache.asterix.transaction.management.service.logging.LogReader;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeli
RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
filterFields == null ? 0 : filterFields.length, recordType, metaType);
// fix pk fields
int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length);
int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
pkFieldsInCommitOp[i] = diff + i;
pkFieldsInCommitOp[i] = start++;
}
CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeli

private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
ARecordType itemType, ARecordType metaItemType) throws Exception {
ITypeTraits[] outputTypeTraits =
new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
// 1 boolean field at the beginning to indicate whether the operation was upsert or delete
int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount();
ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields];

// add the previous record first
ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
int f = 0;
outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
// add the upsert indicator boolean field
outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
f++;
// add the previous record
outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
f++;
// add the previous meta second
if (dataset.hasMetaPart()) {
outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
f++;
}
// add the previous filter third
@@ -854,10 +864,8 @@ private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, D
}
}
fieldIdx = i;
outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
f++;
}
for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
@@ -19,7 +19,6 @@
package org.apache.asterix.test.dataflow;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@@ -45,6 +44,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -84,7 +94,7 @@ public class SearchCursorComponentSwitchTest {
private static final boolean[] UNIQUE_META_FIELDS = null;
private static final int[] KEY_INDEXES = { 0 };
private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
private static final List<Integer> KEY_INDICATORS_LIST = List.of(Index.RECORD_INDICATOR);
private static final int TOTAL_NUM_OF_RECORDS = 2000;
private static final int RECORDS_PER_COMPONENT = 1000;
private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@ public class SearchCursorComponentSwitchTest {
private static IIndexDataflowHelper indexDataflowHelper;
private static ITransactionContext txnCtx;
private static LSMPrimaryInsertOperatorNodePushable insertOp;
private static LSMPrimaryUpsertOperatorNodePushable upsertOp;

@BeforeClass
public static void setUp() throws Exception {
@@ -143,6 +154,8 @@ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningSt
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
}

@After
@@ -201,6 +214,63 @@ public void testCursorSwitchSucceed() {
}
}

@Test
public void testCursorSwitchSucceedWithNoDuplicates() {
try {
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearSearchCallbacks();
RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE + 2;
ITupleReference[] upsertTuples = new ITupleReference[totalNumRecords];
for (int j = 0; j < totalNumRecords; j++) {
ITupleReference tuple = tupleGenerator.next();
upsertTuples[j] = TupleUtils.copyTuple(tuple);
}

// upsert and flush the tuples to create a disk component
upsert(tupleAppender, totalNumRecords, upsertTuples, true);
// upsert but don't flush the tuples to create a memory component
upsert(tupleAppender, totalNumRecords, upsertTuples, false);

// do the search operation
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(lsmBtree.getHarness(),
lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), LSMBTreeSearchCursor::new);
IIndexCursor searchCursor = accessor.createSearchCursor(false);
MultiComparator lowKeySearchCmp =
BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
MultiComparator highKeySearchCmp =
BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
RangePredicate rangePredicate =
new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp, null, null);

accessor.search(searchCursor, rangePredicate);

int count = 0;
while (searchCursor.hasNext()) {
searchCursor.next();
count++;
// flush the memory component to disk so that we make the switch to it when we hit the switch cycle
if (count == 1) {
StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
}
}

Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
failure = CleanupUtils.destroy(failure, searchCursor, accessor);
Assert.assertEquals("Records count not matching", totalNumRecords, count);
if (failure != null) {
Assert.fail(failure.getMessage());
}
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

@Test
public void testCursorSwitchFails() {
try {
@@ -268,4 +338,17 @@ private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx
emptyTupleOp.close();
Assert.assertEquals(numOfRecords, countOp.getCount());
}

private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, ITupleReference[] upsertTuples,
boolean flush) throws Exception {
upsertOp.open();
for (int j = 0; j < totalNumRecords; j++) {
DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], upsertOp);
}
tupleAppender.write(upsertOp, true);
if (flush) {
StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
}
upsertOp.close();
}
}
@@ -236,10 +236,15 @@ private void replaceMemoryComponentWithDiskComponentIfNeeded() throws HyracksDat
rangeCursors[i].close();
btreeAccessors[i].reset(btree, iap);
btreeAccessors[i].search(rangeCursors[i], reusablePred);
pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
// consume the element that we restarted the search at since before the switch it was consumed
if (rangeCursors[i].hasNext()) {
rangeCursors[i].next();
switchedElements[i].reset(rangeCursors[i].getTuple());
}
}
}
switchRequest[i] = false;
switchedElements[i] = null;
// any failed switch makes further switches pointless
switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
}
@@ -264,14 +269,18 @@ private int replaceFrom() throws HyracksDataException {
if (replaceFrom < 0) {
replaceFrom = i;
}
// we return the outputElement to the priority queue if it came from this component

PriorityQueueElement element;
if (outputElement != null && outputElement.getCursorIndex() == i) {
pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
needPushElementIntoQueue = false;
outputElement = null;
canCallProceed = true;
// there should be no element from this cursor in the queue since the element was polled
if (findElement(outputPriorityQueue, i) != null) {
throw new IllegalStateException("found element in the queue from the cursor of output element");
}
element = outputElement;
} else {
element = findElement(outputPriorityQueue, i);
}
PriorityQueueElement element = remove(outputPriorityQueue, i);

// if this cursor is still active (has an element)
// then we copy the search key to restart the operation after
// replacing the component
@@ -331,6 +340,18 @@ private PriorityQueueElement remove(PriorityQueue<PriorityQueueElement> outputPr
return null;
}

private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
// Scans the PQ for the component's element
Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
while (it.hasNext()) {
PriorityQueueElement e = it.next();
if (e.getCursorIndex() == cursorIndex) {
return e;
}
}
return null;
}

@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
@@ -193,7 +193,7 @@ public void replace(ILSMIndexOperationContext ctx) {
for (int i = 0; i < count; i++) {
ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]);
if (removed.getType() == LSMComponentType.MEMORY) {
LOGGER.info("Removed a memory component from the search operation");
LOGGER.debug("Removed memory component {} from the search operation", removed);
} else {
throw new IllegalStateException("Disk components can't be removed from the search operation");
}
@@ -40,7 +40,7 @@
import org.apache.hyracks.storage.common.MultiComparator;

public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
protected static final int SWITCH_COMPONENT_CYCLE = 100;
public static final int SWITCH_COMPONENT_CYCLE = 100;
protected final ILSMIndexOperationContext opCtx;
protected final boolean returnDeletedTuples;
protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@ public void doClose() throws HyracksDataException {
needPushElementIntoQueue = false;
for (int i = 0; i < switchRequest.length; i++) {
switchRequest[i] = false;
switchedElements[i] = null;
}
try {
if (outputPriorityQueue != null) {

0 comments on commit 4cced3e

Please sign in to comment.