Skip to content
Permalink
Browse files
[NO ISSUE][REP] Add API to perform non-delta recovery for a replica
- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Add an option to perform non-delta recovery for a replica.

Change-Id: Ib1837e8f1aefdd9e085ccfd62f1c6e6d4eb969e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13223
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Sep 14, 2021
1 parent bc8aaf6 commit 8f278c042d92135f737cd7b26bd56a3479e11106
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 10 deletions.
@@ -79,17 +79,17 @@ public synchronized void notifyFailure(Exception failure) {
}

public synchronized void sync() {
sync(true);
sync(true, true);
}

public synchronized void sync(boolean register) {
public synchronized void sync(boolean register, boolean deltaRecovery) {
if (status == IN_SYNC || status == CATCHING_UP) {
return;
}
setStatus(CATCHING_UP);
appCtx.getThreadExecutor().execute(() -> {
try {
new ReplicaSynchronizer(appCtx, this).sync(register);
new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
setStatus(IN_SYNC);
} catch (Exception e) {
LOGGER.error(() -> "Failed to sync replica " + this, e);
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.replication.messaging;

import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DeletePartitionTask implements IReplicaTask {

private static final Logger LOGGER = LogManager.getLogger();
private final int partitionId;

public DeletePartitionTask(int partitionId) {
this.partitionId = partitionId;
}

@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
LOGGER.warn("deleting storage partition {}", partitionId);
localResourceRepository.deletePartition(partitionId);
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
} catch (Exception e) {
throw new ReplicationException(e);
}
}

@Override
public ReplicationProtocol.ReplicationRequestType getMessageType() {
return ReplicationProtocol.ReplicationRequestType.DELETE_PARTITION;
}

@Override
public void serialize(OutputStream out) throws HyracksDataException {
try {
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(partitionId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}

public static DeletePartitionTask create(DataInput input) throws IOException {
return new DeletePartitionTask(input.readInt());
}
}
@@ -33,7 +33,7 @@
public class PartitionResourcesListResponse implements IReplicationMessage {

private final int partition;
private Map<String, Long> partitionReplicatedResources;
private final Map<String, Long> partitionReplicatedResources;
private final List<String> files;
private final boolean owner;

@@ -61,7 +61,8 @@ public enum ReplicationRequestType {
LSM_COMPONENT_MASK,
MARK_COMPONENT_VALID,
DROP_INDEX,
REPLICATE_LOGS
REPLICATE_LOGS,
DELETE_PARTITION
}

private static final Map<Integer, ReplicationRequestType> TYPES = new HashMap<>();
@@ -177,6 +178,8 @@ public static IReplicationMessage readMessage(ReplicationRequestType type, ISock
return MarkComponentValidTask.create(dis);
case REPLICATE_LOGS:
return ReplicateLogsTask.create(dis);
case DELETE_PARTITION:
return DeletePartitionTask.create(dis);
default:
throw new IllegalStateException("Unrecognized replication message");
}
@@ -35,6 +35,7 @@
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.DeletePartitionTask;
import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -55,14 +56,19 @@ public class ReplicaFilesSynchronizer {
private static final Logger LOGGER = LogManager.getLogger();
private final PartitionReplica replica;
private final INcApplicationContext appCtx;
private final boolean deltaRecovery;

public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica, boolean deltaRecovery) {
this.appCtx = appCtx;
this.replica = replica;
this.deltaRecovery = deltaRecovery;
}

public void sync() throws IOException {
final int partition = replica.getIdentifier().getPartition();
if (!deltaRecovery) {
deletePartitionFromReplica(partition);
}
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner());
@@ -82,6 +88,12 @@ public void sync() throws IOException {
deleteReplicaExtraFiles(replicaFiles, masterFiles);
}

private void deletePartitionFromReplica(int partitionId) throws IOException {
DeletePartitionTask deletePartitionTask = new DeletePartitionTask(partitionId);
ReplicationProtocol.sendTo(replica, deletePartitionTask);
ReplicationProtocol.waitForAck(replica);
}

private void deleteReplicaExtraFiles(Set<String> replicaFiles, Set<String> masterFiles) {
final List<String> replicaInvalidFiles =
replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
@@ -43,13 +43,13 @@ public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica replic
this.replica = replica;
}

public void sync(boolean register) throws IOException {
public void sync(boolean register, boolean deltaRecovery) throws IOException {
synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
checkpointManager.suspend();
syncFiles();
syncFiles(deltaRecovery);
checkpointReplicaIndexes();
if (register) {
appCtx.getReplicationManager().register(replica);
@@ -60,8 +60,8 @@ public void sync(boolean register) throws IOException {
}
}

private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
private void syncFiles(boolean deltaRecovery) throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
@@ -695,4 +695,16 @@ public Path getPartitionRoot(int partition) throws HyracksDataException {
FileReference resolve = ioManager.resolve(path.toString());
return resolve.getFile().toPath();
}

public void deletePartition(int partitionId) {
List<File> onDiskPartitions = getOnDiskPartitions();
for (File onDiskPartition : onDiskPartitions) {
int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
if (partitionNum == partitionId) {
LOGGER.warn("deleting partition {}", partitionNum);
FileUtils.deleteQuietly(onDiskPartition);
return;
}
}
}
}

0 comments on commit 8f278c0

Please sign in to comment.