Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void testDeleteOutOfSyncReplicaLogAfterCommit() throws Exception {
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
// produce many records to trigger remote log copy.
long expectedRemoteLogEndOffset = 0L;
for (int i = 0; i < 3; i++) {
assertProduceLogResponse(
leaderGateWay
Expand All @@ -77,7 +78,8 @@ void testDeleteOutOfSyncReplicaLogAfterCommit() throws Exception {
tableId, 0, -1, genMemoryLogRecordsByObject(DATA1)))
.get(),
0,
i * 10L);
expectedRemoteLogEndOffset);
expectedRemoteLogEndOffset += DATA1.size();
}

// stop replicas to mock followers are out of sync
Expand Down Expand Up @@ -106,7 +108,7 @@ void testDeleteOutOfSyncReplicaLogAfterCommit() throws Exception {
.set(
ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
Duration.ofMillis(1)));
FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(tb);
FLUSS_CLUSTER_EXTENSION.waitUntilRemoteLogEndOffset(tb, expectedRemoteLogEndOffset);

// check only has two remote log segments for the stopped replicas
for (int stopFollower : stopFollowers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,20 @@ public void waitUntilSomeLogSegmentsCopyToRemote(TableBucket tableBucket) {
});
}

/** Wait until the remote log end offset reaches the expected offset. */
public void waitUntilRemoteLogEndOffset(TableBucket tableBucket, long expectedEndOffset) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(2),
() -> {
Optional<RemoteLogManifestHandle> remoteLogManifestHandle;
remoteLogManifestHandle = zkClient.getRemoteLogManifestHandle(tableBucket);
assertThat(remoteLogManifestHandle).isPresent();
assertThat(remoteLogManifestHandle.get().getRemoteLogEndOffset())
.isGreaterThanOrEqualTo(expectedEndOffset);
});
}

public void triggerAndWaitSnapshot(TablePath tablePath) throws Exception {
Optional<TableRegistration> table = zooKeeperClient.getTable(tablePath);
//noinspection SimplifyOptionalCallChains (Java 8 compatibility)
Expand Down