Skip to content
Permalink
Browse files
[NO ISSUE][OTH] Rename partition owner to origin
- user model changes: no
- storage format changes: no
- interface changes: yes

Change-Id: I97838a546ae9bdc1453f397d6809bd08e493f8f8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15263
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
  • Loading branch information
mhubail committed Feb 14, 2022
1 parent 9e33273 commit ef3da66b3fa444e3f44e6c7c2b6d9e309e885492
Showing 8 changed files with 18 additions and 18 deletions.
@@ -57,14 +57,14 @@ public class ReplicaManager implements IReplicaManager {
* current replicas
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
private final Set<Integer> nodeOriginatedPartitions = new HashSet<>();

public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
this.appCtx = appCtx;
for (Integer partition : partitions) {
this.partitions.put(partition, new Object());
}
setNodeOwnedPartitions(appCtx);
setNodeOriginatedPartitions(appCtx);
}

@Override
@@ -163,8 +163,8 @@ public synchronized List<IPartitionReplica> getReplicas() {
}

@Override
public boolean isPartitionOwner(int partition) {
return nodeOwnedPartitions.contains(partition);
public boolean isPartitionOrigin(int partition) {
return nodeOriginatedPartitions.contains(partition);
}

public void closePartitionResources(int partition) throws HyracksDataException {
@@ -185,9 +185,9 @@ private boolean isSelf(ReplicaIdentifier id) {
return id.getNodeId().equals(nodeId);
}

private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
private void setNodeOriginatedPartitions(INcApplicationContext appCtx) {
Set<Integer> nodePartitions =
appCtx.getMetadataProperties().getNodePartitions(appCtx.getServiceContext().getNodeId());
nodeOwnedPartitions.addAll(nodePartitions);
nodeOriginatedPartitions.addAll(nodePartitions);
}
}
@@ -104,10 +104,10 @@ public interface IReplicaManager {
List<IPartitionReplica> getReplicas();

/**
* Returns true if {@code partition} is owned by this node, otherwise false.
* Returns true if {@code partition} is originated by this node, otherwise false.
*
* @param partition
* @return true if the partition is owned by this node, otherwise false.
* @return true if the partition is originated by this node, otherwise false.
*/
boolean isPartitionOwner(int partition);
boolean isPartitionOrigin(int partition);
}
@@ -90,7 +90,7 @@ public static PartitionResourcesListResponse create(DataInput input) throws IOEx
return new PartitionResourcesListResponse(partition, partitionReplicatedResources, resources, owner);
}

public boolean isOwner() {
public boolean isOrigin() {
return owner;
}

@@ -59,7 +59,7 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) thr
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition,
partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOwner(partition));
partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition));
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
}

@@ -55,7 +55,7 @@ public void replicate(String file, boolean metadata) {
final IIOManager ioManager = appCtx.getIoManager();
final ISocketChannel channel = replica.getChannel();
final FileReference filePath = ioManager.resolve(file);
String masterNode = appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition())
? appCtx.getServiceContext().getNodeId() : null;
ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata, masterNode);
LOGGER.debug("attempting to replicate {} to replica {}", task, replica);
@@ -94,7 +94,7 @@ private void replicateComponent(PartitionReplica replica) throws IOException {
final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica);
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
// send mark component valid
String masterNode = appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition())
? appCtx.getServiceContext().getNodeId() : null;
MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(),
getReplicatedComponentId(), masterNode);
@@ -72,7 +72,7 @@ public void sync() throws IOException {
}
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner());
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin());
// clean up files for invalid resources (deleted or recreated while the replica was down)
Set<String> deletedReplicaFiles =
cleanupReplicaInvalidResources(replicaResourceResponse, resourceReferenceLongMap);
@@ -155,7 +155,7 @@ private Set<String> cleanupReplicaInvalidResources(PartitionResourcesListRespons
if (!validReplicaResources.containsKey(replicaRes)) {
LOGGER.debug("replica invalid file {} to be deleted", replicaRes.getFileRelativePath());
invalidFiles.add(replicaResPath);
} else if (replicaResourceResponse.isOwner() && !replicaRes.isMetadataResource()) {
} else if (replicaResourceResponse.isOrigin() && !replicaRes.isMetadataResource()) {
// find files where the owner generated and failed before replicating
Long masterValidSeq = validReplicaResources.get(replicaRes);
IndexComponentFileReference componentFileReference =
@@ -184,7 +184,7 @@ private PartitionResourcesListResponse getReplicaFiles(int partition) throws IOE
}

private Map<ResourceReference, Long> getValidReplicaResources(Map<String, Long> partitionReplicatedResources,
boolean owner) throws HyracksDataException {
boolean origin) throws HyracksDataException {
Map<ResourceReference, Long> resource2ValidSeqMap = new HashMap<>();
for (Map.Entry<String, Long> resourceEntry : partitionReplicatedResources.entrySet()) {
ResourceReference rr = ResourceReference.of(resourceEntry.getKey());
@@ -196,7 +196,7 @@ private Map<ResourceReference, Long> getValidReplicaResources(Map<String, Long>
LOGGER.info("replica has resource {} but with different resource id; ours {}, theirs {}", rr,
localResource.getId(), resourceEntry.getValue());
} else {
long resourceMasterValidSeq = owner ? getResourceMasterValidSeq(rr) : Integer.MAX_VALUE;
long resourceMasterValidSeq = origin ? getResourceMasterValidSeq(rr) : Integer.MAX_VALUE;
resource2ValidSeqMap.put(rr, resourceMasterValidSeq);
}
}
@@ -73,7 +73,7 @@ private void syncFiles(boolean deltaRecovery) throws IOException {
private void checkpointReplicaIndexes() throws IOException {
final int partition = replica.getIdentifier().getPartition();
String masterNode =
appCtx.getReplicaManager().isPartitionOwner(partition) ? appCtx.getServiceContext().getNodeId() : null;
appCtx.getReplicaManager().isPartitionOrigin(partition) ? appCtx.getServiceContext().getNodeId() : null;
CheckpointPartitionIndexesTask task =
new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition), masterNode);
ReplicationProtocol.sendTo(replica, task);

0 comments on commit ef3da66

Please sign in to comment.