Skip to content
Permalink
Browse files
[NO ISSUE][STO] Close datasets of flushed indexes after recovery
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- After performing redo of a flush log on any index, close its dataset
  to ensure any cached state that might have been changed during recovery
  is cleared (e.g. the component id generator).
- Fix LSMFlushRecoveryTest total number of records to be inserted.
- Update LSMFlushRecoveryTest to check for duplicate component ids.

Change-Id: I29072f475cc7b4d7d6efde415be0329fc568443e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11423
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
(cherry picked from commit 0e7e4bd)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12024
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Jun 24, 2021
1 parent 32fcc2a commit acff469e4363dde3c0aafe3c388f62aad4aa87a7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 20 deletions.
@@ -173,15 +173,14 @@ public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACID
deleteRecoveryTemporaryFiles();

//get active partitions on this node
replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN);
replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
}

@Override
public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
throws IOException, ACIDException {
public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
boolean closeOnFlushRedo) throws IOException, ACIDException {
try {
Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet);
startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet, closeOnFlushRedo);
} finally {
logReader.close();
deleteRecoveryTemporaryFiles();
@@ -277,7 +276,7 @@ private void analyzeEntityCommitLog(ILogRecord logRecord) throws IOException {
}

private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader,
long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException {
long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException {
int redoCount = 0;
long txnId = 0;

@@ -299,6 +298,7 @@ private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogRe
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);

ILogRecord logRecord = null;
Set<Integer> flushRedoDatasets = new HashSet<>();
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -409,6 +409,7 @@ private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogRe
&& !index.isCurrentMutableComponentEmpty()) {
// schedule flush
redoFlush(index, logRecord);
flushRedoDatasets.add(datasetId);
redoCount++;
} else {
// TODO: update checkpoint file?
@@ -441,6 +442,11 @@ private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogRe
for (long r : resourceIdList) {
datasetLifecycleManager.close(resourcesMap.get(r).getPath());
}
if (closeOnFlushRedo) {
// close datasets of indexes to ensure any cached state that might've been changed by recovery is cleared
// e.g. when redoing a flush, the component id generator needs to be reinitialized
datasetLifecycleManager.closeDatasets(flushRedoDatasets);
}
}
}

@@ -525,7 +531,7 @@ public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boo
if (minLSN < readableSmallestLSN) {
minLSN = readableSmallestLSN;
}
replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
if (flush) {
appCtx.getDatasetLifecycleManager().flushAllDatasets();
}
@@ -23,7 +23,9 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;

import org.apache.asterix.app.bootstrap.TestNodeController;
@@ -62,6 +64,7 @@
import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -146,8 +149,7 @@ public void testRecovery() throws Exception {
checkComponentIds();
// insert more records
createInsertOps();
insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT,
true);
insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);

dsInfo.waitForIO();
checkComponentIds();
@@ -486,8 +488,14 @@ private void checkComponentIds(int partitionIndex) throws HyracksDataException {
List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndexes[partitionIndex].getDiskComponents();

Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size());
Set<ILSMComponentId> uniqueIds = new HashSet<>();
for (int i = 0; i < primaryDiskComponents.size(); i++) {
Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId());
ILSMComponentId id = primaryDiskComponents.get(i).getId();
boolean added = uniqueIds.add(id);
if (!added) {
throw new IllegalStateException("found duplicate component ids: " + id);
}
}
}

@@ -19,6 +19,7 @@
package org.apache.asterix.common.api;

import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

import org.apache.asterix.common.context.DatasetInfo;
@@ -111,6 +112,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
*/
List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum);

/**
* Attempts to close the datasets in {@code datasetsToClose}
*
* @param datasetsToClose
* @throws HyracksDataException
*/
void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException;

/**
* Flushes then closes all open datasets
*/
@@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

@@ -473,6 +474,16 @@ private void closeDataset(DatasetResource dsr) throws HyracksDataException {
dsInfo.setOpen(false);
}

@Override
public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
closeDataset(dsr);
}
}
}

@Override
public synchronized void closeAllDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
@@ -84,17 +84,6 @@ private ResourceType() {
*/
long getLocalMinFirstLSN() throws HyracksDataException;

/**
* Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
*
* @param partitions
* @param lowWaterMarkLSN
* @throws IOException
* @throws ACIDException
*/
void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
throws IOException, ACIDException;

/**
* Creates a temporary file to be used during recovery
*

0 comments on commit acff469

Please sign in to comment.