Skip to content
Permalink
Browse files
[NO ISSUE][STO] Limit flushes to impacted partitions
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- When requesting a flush, limit the indexes to be flushed
  to the impacted partitions.
- Invalidate cached resources on replica promotion.
- Invalidate cached resources on resource file deletion.

Change-Id: I4c1408627c8e11240c3575c4b8f190d746588867
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15683
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Mar 15, 2022
1 parent a808320 commit 662f8a942b082eef8c951bd86d0ee1c4cc2d25c8
Showing 7 changed files with 47 additions and 28 deletions.
@@ -535,7 +535,7 @@ public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boo
}
replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
if (flush) {
appCtx.getDatasetLifecycleManager().flushAllDatasets();
appCtx.getDatasetLifecycleManager().flushAllDatasets(partitions::contains);
}
cleanUp(partitions);
} catch (IOException | ACIDException e) {
@@ -130,6 +130,7 @@ public synchronized void promote(int partition) throws HyracksDataException {
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
localResourceRepository.clearResourcesCache();
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.put(partition, new Object());
@@ -169,8 +170,7 @@ public boolean isPartitionOrigin(int partition) {

public void closePartitionResources(int partition) throws HyracksDataException {
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
//TODO(mhubail) we can flush only datasets of the requested partition
datasetLifecycleManager.flushAllDatasets();
datasetLifecycleManager.flushAllDatasets(p -> p == partition);
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
@@ -20,6 +20,7 @@

import java.util.List;
import java.util.Set;
import java.util.function.IntPredicate;
import java.util.function.Predicate;

import org.apache.asterix.common.context.DatasetInfo;
@@ -60,6 +61,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
*/
void flushAllDatasets() throws HyracksDataException;

/**
* Flushes all open datasets synchronously for partitions {@code partitions}
*
* @param partitions
* @throws HyracksDataException
*/
void flushAllDatasets(IntPredicate partitions) throws HyracksDataException;

/**
* Schedules asynchronous flush on indexes matching the predicate {@code indexPredicate}
*
@@ -142,20 +151,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
*/
List<IndexInfo> getOpenIndexesInfo();

/**
* Flushes and closes all user datasets (non-metadata datasets)
*
* @throws HyracksDataException
*/
void closeUserDatasets() throws HyracksDataException;

/**
* Flushes all opened datasets that are matching {@code replicationStrategy}.
*
* @param replicationStrategy
* @param partitions
* @throws HyracksDataException
*/
void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException;
void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;

/**
* Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntPredicate;
import java.util.function.Predicate;

import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -362,9 +363,14 @@ public void start() {

@Override
public synchronized void flushAllDatasets() throws HyracksDataException {
flushAllDatasets(partition -> true);
}

@Override
public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.getDatasetInfo().isOpen()) {
flushDatasetOpenIndexes(dsr, false);
flushDatasetOpenIndexes(dsr, partitions, false);
}
}
}
@@ -373,7 +379,7 @@ public synchronized void flushAllDatasets() throws HyracksDataException {
public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
DatasetResource dsr = datasets.get(datasetId);
if (dsr != null) {
flushDatasetOpenIndexes(dsr, asyncFlush);
flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
}
}

@@ -407,7 +413,8 @@ private void asyncFlush(DatasetResource dsr, PrimaryIndexOperationTracker opTrac
* This method can only be called asynchronously safely if we're sure no modify operation
* will take place until the flush is scheduled
*/
private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
private void flushDatasetOpenIndexes(DatasetResource dsr, IntPredicate partitions, boolean asyncFlush)
throws HyracksDataException {
DatasetInfo dsInfo = dsr.getDatasetInfo();
if (!dsInfo.isOpen()) {
throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed");
@@ -419,6 +426,9 @@ private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) th
// ensure all in-flight flushes gets scheduled
logManager.log(waitLog);
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
if (!partitions.test(primaryOpTracker.getPartition())) {
continue;
}
// flush each partition one by one
int numActiveOperations = primaryOpTracker.getNumActiveOperations();
if (numActiveOperations > 0) {
@@ -433,6 +443,9 @@ private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) th
if (!asyncFlush) {
List<FlushOperation> flushes = new ArrayList<>();
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
if (!partitions.test(primaryOpTracker.getPartition())) {
continue;
}
flushes.addAll(primaryOpTracker.getScheduledFlushes());
}
LSMIndexUtil.waitFor(flushes);
@@ -443,7 +456,7 @@ private void closeDataset(DatasetResource dsr) throws HyracksDataException {
// First wait for any ongoing IO operations
DatasetInfo dsInfo = dsr.getDatasetInfo();
try {
flushDatasetOpenIndexes(dsr, false);
flushDatasetOpenIndexes(dsr, p -> true, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -479,16 +492,6 @@ public synchronized void closeAllDatasets() throws HyracksDataException {
}
}

@Override
public synchronized void closeUserDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
if (!dsr.isMetadataDataset()) {
closeDataset(dsr);
}
}
}

@Override
public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
if (stopped) {
@@ -539,10 +542,11 @@ public void dumpState(OutputStream outputStream) throws IOException {
}

@Override
public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
flushDatasetOpenIndexes(dsr, false);
flushDatasetOpenIndexes(dsr, partitions, false);
}
}
}
@@ -27,8 +27,10 @@

import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.logging.log4j.LogManager;
@@ -53,6 +55,11 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
final File localFile = ioManager.resolve(file).getFile();
if (localFile.exists()) {
Files.delete(localFile.toPath());
ResourceReference replicaRes = ResourceReference.of(localFile.getAbsolutePath());
if (replicaRes.isMetadataResource()) {
((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
.invalidateResource(replicaRes.getRelativePath().toString());
}
LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
} else {
LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
@@ -65,7 +65,8 @@ private void syncFiles(boolean deltaRecovery) throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy,
p -> p == replica.getIdentifier().getPartition());
waitForReplicatedDatasetsIO();
fileSync.sync();
}
@@ -300,6 +300,10 @@ public void invalidateResource(String relativePath) {
resourceCache.invalidate(relativePath);
}

public void clearResourcesCache() {
resourceCache.invalidateAll();
}

private static String getFileName(String path) {
return path.endsWith(File.separator) ? (path + StorageConstants.METADATA_FILE_NAME)
: (path + File.separator + StorageConstants.METADATA_FILE_NAME);

0 comments on commit 662f8a9

Please sign in to comment.