Skip to content
Permalink
Browse files
[NO ISSUE][REP] Add replica sync progress
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add replica sync progress based on the replica missing
  files.
- Add replica last progress timestamp that can be used
  to determine replica progress inactivity.

Change-Id: Iab2cd7e745c4150e2d0aef3af864ec0f66dd96e7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13063
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Sep 2, 2021
1 parent 087eaf5 commit a1de795e82273b128b706794988fb7dd09a0267d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
@@ -51,4 +51,18 @@ enum PartitionReplicaStatus {
* @param failure
*/
void notifyFailure(Exception failure);

/**
* Gets the current sync progress
*
* @return the current sync progress
*/
double getSyncProgress();

/**
* Gets the last progress time of this replica based on System.nanoTime
*
* @return the last progress time
*/
long getLastProgressTime();
}
@@ -52,6 +52,8 @@ public class PartitionReplica implements IPartitionReplica {
private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
private final INcApplicationContext appCtx;
private final ReplicaIdentifier id;
private double syncProgress = -1;
private long lastProgressTime = -1;
private ByteBuffer reusbaleBuf;
private PartitionReplicaStatus status = DISCONNECTED;
private ISocketChannel sc;
@@ -133,6 +135,16 @@ public synchronized ByteBuffer getReusableBuffer() {
return reusbaleBuf;
}

public synchronized void setSyncProgress(double syncProgress) {
this.syncProgress = syncProgress;
lastProgressTime = System.nanoTime();
}

@Override
public synchronized double getSyncProgress() {
return syncProgress;
}

private JsonNode asJson() {
ObjectNode json = OBJECT_MAPPER.createObjectNode();
json.put("id", id.toString());
@@ -152,6 +164,19 @@ public boolean equals(Object o) {
return id.equals(that.id);
}

@Override
public synchronized long getLastProgressTime() {
switch (status) {
case IN_SYNC:
return System.nanoTime();
case CATCHING_UP:
return lastProgressTime;
case DISCONNECTED:
return -1;
}
return -1;
}

@Override
public int hashCode() {
return id.hashCode();
@@ -172,6 +197,17 @@ private synchronized void setStatus(PartitionReplicaStatus status) {
}
LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
this.status = status;
switch (status) {
case IN_SYNC:
syncProgress = 1;
break;
case CATCHING_UP:
lastProgressTime = System.nanoTime();
break;
case DISCONNECTED:
syncProgress = -1;
break;
}
}

private void sendGoodBye() {
@@ -105,7 +105,12 @@ private void replicateMissingFiles(List<String> files) {
final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
// sort files to ensure index metadata files starting with "." are replicated first
files.sort(String::compareTo);
files.forEach(sync::replicate);
int missingFilesCount = files.size();
for (int i = 0; i < missingFilesCount; i++) {
String file = files.get(i);
sync.replicate(file);
replica.setSyncProgress((i + 1d) / missingFilesCount);
}
}

private void deleteInvalidFiles(List<String> files) {

0 comments on commit a1de795

Please sign in to comment.