Skip to content
Permalink
Browse files
[NO ISSUE][REP] Persist master last valid seq on index checkpoint
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- When a partition owner replicates a component to a replica,
  maintain the last received component sequence from master.
  This will be used to ensure that any component generated on master,
  but the master fails before replicating it, will not be used when
  the master is re-synced (recovered) from a promoted replica.

Change-Id: I102947712daa07c83b32103b3c58fad46de2dc6d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12966
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Aug 25, 2021
1 parent 81578f9 commit bf22dbcb42fbb252dd2506b41ade61af1b3fa63c
Show file tree
Hide file tree
Showing 17 changed files with 308 additions and 58 deletions.
@@ -56,7 +56,7 @@ public IndexCheckpointManager(Path indexPath) {
}

@Override
public synchronized void init(long validComponentSequence, long lsn, long validComponentId)
public synchronized void init(long validComponentSequence, long lsn, long validComponentId, String masterNodeId)
throws HyracksDataException {
List<IndexCheckpoint> checkpoints;
try {
@@ -68,34 +68,42 @@ public synchronized void init(long validComponentSequence, long lsn, long validC
LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
delete();
}
IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
IndexCheckpoint firstCheckpoint =
IndexCheckpoint.first(validComponentSequence, lsn, validComponentId, masterNodeId);
persist(firstCheckpoint);
}

@Override
public synchronized void replicated(long componentSequence, long masterLsn, long componentId)
public synchronized void replicated(long componentSequence, long masterLsn, long componentId, String masterNodeId)
throws HyracksDataException {
final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
if (localLsn == null) {
throw new IllegalStateException("Component flushed before lsn mapping was received");
throw new IllegalStateException("Component replicated before lsn mapping was received");
}
flushed(componentSequence, localLsn, componentId);
flushed(componentSequence, localLsn, componentId, masterNodeId);
}

@Override
public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException {
public synchronized void flushed(long componentSequence, long lsn, long componentId, String masterNodeId)
throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentSequence, componentId);
IndexCheckpoint nextCheckpoint =
IndexCheckpoint.next(latest, lsn, componentSequence, componentId, masterNodeId);
persist(nextCheckpoint);
deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
}

@Override
public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException {
flushed(componentSequence, lsn, componentId, null);
}

@Override
public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
latest.getValidComponentSequence(), latest.getLastComponentId());
latest.getValidComponentSequence(), latest.getLastComponentId(), null);
persist(next);
notifyAll();
}
@@ -155,8 +163,8 @@ public synchronized IndexCheckpoint getLatest() throws HyracksDataException {
@Override
public synchronized void setLastComponentId(long componentId) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
final IndexCheckpoint next =
IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), componentId);
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
latest.getValidComponentSequence(), componentId, null);
persist(next);
}

@@ -165,7 +173,7 @@ public synchronized void advanceValidComponentSequence(long componentSequence) t
final IndexCheckpoint latest = getLatest();
if (componentSequence > latest.getValidComponentSequence()) {
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
latest.getLastComponentId());
latest.getLastComponentId(), null);
persist(next);
}
}
@@ -19,6 +19,7 @@
package org.apache.asterix.app.nc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,6 +31,7 @@

import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
@@ -58,10 +60,12 @@ public class ReplicaManager implements IReplicaManager {
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
private final Object replicaSyncLock = new Object();
private final Set<Integer> nodeOwnedPartitions = new HashSet<>();

public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
this.appCtx = appCtx;
this.partitions.addAll(partitions);
setNodeOwnedPartitions(appCtx);
}

@Override
@@ -154,6 +158,11 @@ public synchronized List<IPartitionReplica> getReplicas() {
return new ArrayList<>(replicas.values());
}

@Override
public boolean isPartitionOwner(int partition) {
return nodeOwnedPartitions.contains(partition);
}

public void closePartitionResources(int partition) throws HyracksDataException {
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
//TODO(mhubail) we can flush only datasets of the requested partition
@@ -171,4 +180,13 @@ private boolean isSelf(ReplicaIdentifier id) {
String nodeId = appCtx.getServiceContext().getNodeId();
return id.getNodeId().equals(nodeId);
}

private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
ClusterPartition[] clusterPartitions =
appCtx.getMetadataProperties().getNodePartitions().get(appCtx.getServiceContext().getNodeId());
if (clusterPartitions != null) {
nodeOwnedPartitions.addAll(Arrays.stream(clusterPartitions).map(ClusterPartition::getPartitionId)
.collect(Collectors.toList()));
}
}
}
@@ -28,9 +28,23 @@ public interface IIndexCheckpointManager {
* @param validComponentSequence
* @param lsn
* @param validComponentId
* @param masterNodeId
* @throws HyracksDataException
*/
void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException;
void init(long validComponentSequence, long lsn, long validComponentId, String masterNodeId)
throws HyracksDataException;

/**
* Called when a new LSM disk component is flushed due to a replicated component.
* When called, the index checkpoint is updated with the latest valid {@code componentSequence}
* and low watermark {@code lsn}
*
* @param componentSequence
* @param lsn
* @param masterNodeId
* @throws HyracksDataException
*/
void flushed(long componentSequence, long lsn, long componentId, String masterNodeId) throws HyracksDataException;

/**
* Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
@@ -50,9 +64,11 @@ public interface IIndexCheckpointManager {
* @param componentSequence
* @param masterLsn
* @param componentId
* @param masterNodeId
* @throws HyracksDataException
*/
void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException;
void replicated(long componentSequence, long masterLsn, long componentId, String masterNodeId)
throws HyracksDataException;

/**
* Called when a flush log is received and replicated from master. The mapping between
@@ -100,4 +100,12 @@ public interface IReplicaManager {
* @return the list of replicas
*/
List<IPartitionReplica> getReplicas();

/**
* Returns true if {@code partition} is owned by this node, otherwise false.
*
* @param partition
* @return true if the partition is owned by this node, otherwise false.
*/
boolean isPartitionOwner(int partition);
}
@@ -44,20 +44,25 @@ public class IndexCheckpoint {
private long lowWatermark;
private long lastComponentId;
private Map<Long, Long> masterNodeFlushMap;
private String masterNodeId;
private long masterValidSeq;

public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) {
public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId,
String masterNodeId) {
IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
firstCheckpoint.lowWatermark = lowWatermark;
firstCheckpoint.validComponentSequence = lastComponentSequence;
firstCheckpoint.lastComponentId = validComponentId;
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
firstCheckpoint.masterNodeFlushMap.put(HAS_NULL_MISSING_VALUES_FIX, HAS_NULL_MISSING_VALUES_FIX);
firstCheckpoint.masterNodeId = masterNodeId;
firstCheckpoint.masterValidSeq = lastComponentSequence;
return firstCheckpoint;
}

public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
long lastComponentId) {
long lastComponentId, String masterNodeId) {
if (lowWatermark < latest.getLowWatermark()) {
if (LOGGER.isErrorEnabled()) {
LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest);
@@ -70,6 +75,13 @@ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, lo
next.lastComponentId = lastComponentId;
next.validComponentSequence = validComponentSequence;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
if (masterNodeId != null) {
next.masterNodeId = masterNodeId;
next.masterValidSeq = validComponentSequence;
} else {
next.masterNodeId = latest.getMasterNodeId();
next.masterValidSeq = latest.getMasterValidSeq();
}
// remove any lsn from the map that wont be used anymore
next.masterNodeFlushMap.values().removeIf(lsn -> lsn < lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
return next;
@@ -111,6 +123,14 @@ public String asJson() throws HyracksDataException {
}
}

public String getMasterNodeId() {
return masterNodeId;
}

public long getMasterValidSeq() {
return masterValidSeq;
}

public static IndexCheckpoint fromJson(String json) throws HyracksDataException {
try {
return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
@@ -147,6 +147,10 @@ public ResourceReference getDatasetReference() {
return ResourceReference.ofIndex(relativePath.getParent().resolve(dataset).toFile().getPath());
}

public boolean isMetadataResource() {
return getName().equals(StorageConstants.METADATA_FILE_NAME);
}

public Path getFileRelativePath() {
return relativePath.resolve(name);
}
@@ -37,6 +37,7 @@ public class StorageConstants {
* begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try to
* use them as index files.
*/
public static final String INDEX_NON_DATA_FILES_PREFIX = ".";
public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
public static final String METADATA_FILE_NAME = ".metadata";
public static final String MASK_FILE_PREFIX = ".mask_";
@@ -48,10 +48,12 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {

private final int partition;
private final long maxComponentId;
private final String masterNodeId;

public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
public CheckpointPartitionIndexesTask(int partition, long maxComponentId, String masterNodeId) {
this.partition = partition;
this.maxComponentId = maxComponentId;
this.masterNodeId = masterNodeId;
}

@Override
@@ -66,7 +68,6 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) thr
for (LocalResource ls : partitionResources) {
DatasetResourceReference ref = DatasetResourceReference.of(ls);
final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
indexCheckpointManager.delete();
// Get most recent sequence of existing files to avoid deletion
Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
@@ -79,7 +80,11 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) thr
maxComponentSequence =
Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
}
indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId);
if (indexCheckpointManager.getCheckpointCount() > 0) {
indexCheckpointManager.flushed(maxComponentSequence, currentLSN, maxComponentId, masterNodeId);
} else {
indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId, masterNodeId);
}
}
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
}
@@ -95,6 +100,11 @@ public void serialize(OutputStream out) throws HyracksDataException {
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(partition);
dos.writeLong(maxComponentId);
boolean hasMaster = masterNodeId != null;
dos.writeBoolean(hasMaster);
if (hasMaster) {
dos.writeUTF(masterNodeId);
}
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -104,7 +114,9 @@ public static CheckpointPartitionIndexesTask create(DataInput input) throws Hyra
try {
int partition = input.readInt();
long maxComponentId = input.readLong();
return new CheckpointPartitionIndexesTask(partition, maxComponentId);
final boolean hasMaster = input.readBoolean();
final String masterNodeId = hasMaster ? input.readUTF() : null;
return new CheckpointPartitionIndexesTask(partition, maxComponentId, masterNodeId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -46,11 +46,13 @@ public class MarkComponentValidTask implements IReplicaTask {
private final long masterLsn;
private final long lastComponentId;
private final String file;
private final String masterNodeId;

public MarkComponentValidTask(String file, long masterLsn, long lastComponentId) {
public MarkComponentValidTask(String file, long masterLsn, long lastComponentId, String masterNodeId) {
this.file = file;
this.lastComponentId = lastComponentId;
this.masterLsn = masterLsn;
this.masterNodeId = masterNodeId;
}

@Override
@@ -95,7 +97,7 @@ private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId);
indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId, masterNodeId);
}
}

@@ -111,6 +113,11 @@ public void serialize(OutputStream out) throws HyracksDataException {
dos.writeUTF(file);
dos.writeLong(masterLsn);
dos.writeLong(lastComponentId);
boolean hasMaster = masterNodeId != null;
dos.writeBoolean(hasMaster);
if (hasMaster) {
dos.writeUTF(masterNodeId);
}
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -120,6 +127,8 @@ public static MarkComponentValidTask create(DataInput input) throws IOException
final String indexFile = input.readUTF();
final long lsn = input.readLong();
final long lastComponentId = input.readLong();
return new MarkComponentValidTask(indexFile, lsn, lastComponentId);
final boolean hasMaster = input.readBoolean();
final String masterNodeId = hasMaster ? input.readUTF() : null;
return new MarkComponentValidTask(indexFile, lsn, lastComponentId, masterNodeId);
}
}

0 comments on commit bf22dbc

Please sign in to comment.