Skip to content

Commit

Permalink
PrimaryNode: add configurable timeout to waitForAllRemotesToClose
Browse files Browse the repository at this point in the history
Fixes #11674
  • Loading branch information
stevenschlansker committed Oct 18, 2022
1 parent 2ed16c7 commit 4280bb7
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 3 deletions.
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Expand Up @@ -44,6 +44,8 @@ New Features
* LUCENE-10626 Hunspell: add tools to aid dictionary editing:
analysis introspection, stem expansion and stem/flag suggestion (Peter Gromov)

* GITHUB#11822: Configure replicator PrimaryNode replia shutdown timeout. (Steven Schlansker)

Improvements
---------------------

Expand Down
Expand Up @@ -61,6 +61,7 @@ public abstract class PrimaryNode extends Node {
private CopyState copyState;

protected final long primaryGen;
private int remoteCloseTimeoutMs = -1;

/**
* Contains merged segments that have been copied to all running replicas (as of when that merge
Expand Down Expand Up @@ -196,6 +197,24 @@ public synchronized long getLastCommitVersion() {
throw new AssertionError("missing VERSION_KEY");
}

/**
* @return the number of milliseconds to wait during shutdown for remote replicas to close
*/
public int getRemoteCloseTimeoutMs() {
return remoteCloseTimeoutMs;
}

/**
* Set the number of milliseconds to wait during shutdown for remote replicas to close. {@code -1}
* (the default) means forever, and {@code 0} means don't wait at all.
*/
public void setRemoteCloseTimeoutMs(int remoteCloseTimeoutMs) {
if (remoteCloseTimeoutMs < -1) {
throw new IllegalArgumentException("bad timeout + + remoteCloseTimeoutMs");
}
this.remoteCloseTimeoutMs = remoteCloseTimeoutMs;
}

@Override
public void commit() throws IOException {
Map<String, String> commitData = new HashMap<>();
Expand Down Expand Up @@ -318,9 +337,13 @@ private synchronized boolean setCurrentInfos(Set<String> completedMergeFiles) th
}

private synchronized void waitForAllRemotesToClose() throws IOException {

// Wait for replicas to finish or crash:
while (true) {
if (remoteCloseTimeoutMs == 0) {
return;
}
long waitStartNs = System.nanoTime();
// Wait for replicas to finish or crash or timeout:
while (remoteCloseTimeoutMs < 0
|| (System.nanoTime() - waitStartNs) / 1_000_000 < remoteCloseTimeoutMs) {
int count = copyingCount.get();
if (count == 0) {
return;
Expand Down
Expand Up @@ -189,6 +189,22 @@ public void newNRTPoint(long version, long primaryGen, int primaryTCPPort) throw
}
}

// Simulate a replica holding a copy state open forever, by just leaking it.
public void leakCopyState() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_LEAK_COPY_STATE);
c.flush();
}
}

public void setRemoteCloseTimeoutMs(int timeoutMs) throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_SET_CLOSE_WAIT_MS);
c.out.writeInt(timeoutMs);
c.flush();
}
}

public void addOrUpdateDocument(Connection c, Document doc, boolean isUpdate) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can index");
Expand Down
Expand Up @@ -650,6 +650,10 @@ private void handleDeleteDocument(DataInput in, DataOutput out) throws IOExcepti
// merges:
static final byte CMD_NEW_REPLICA = 20;

// Leak a CopyState to simulate failure
static final byte CMD_LEAK_COPY_STATE = 24;
static final byte CMD_SET_CLOSE_WAIT_MS = 25;

/** Handles incoming request to the naive TCP server wrapping this node */
void handleOneConnection(
Random random,
Expand Down Expand Up @@ -821,6 +825,15 @@ void handleOneConnection(
}
break;

case CMD_LEAK_COPY_STATE:
message("leaking a CopyState");
getCopyState();
continue outer;

case CMD_SET_CLOSE_WAIT_MS:
setRemoteCloseTimeoutMs(in.readInt());
continue outer;

default:
throw new IllegalArgumentException("unrecognized cmd=" + cmd + " via socket=" + socket);
}
Expand Down
Expand Up @@ -805,6 +805,70 @@ public void testCrashReplica() throws Exception {
primary.close();
}

@Nightly
public void testPrimaryCloseWhileCopyingNoWait() throws Exception {
Path path1 = createTempDir("A");
NodeProcess primary = startNode(-1, 0, path1, -1, true);

Path path2 = createTempDir("B");
NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);

assertWriteLockHeld(path2);

sendReplicasToPrimary(primary, replica);

LineFileDocs docs = new LineFileDocs(random());
try (Connection c = new Connection(primary.tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
Document doc = docs.nextDoc();
primary.addOrUpdateDocument(c, doc, false);
}

// Refresh primary, which also pushes to replica:
long primaryVersion1 = primary.flush(0);
assertTrue(primaryVersion1 > 0);

// Wait for replica to sync up:
waitForVersionAndHits(replica, primaryVersion1, 1);

primary.setRemoteCloseTimeoutMs(0);
primary.leakCopyState();
primary.close();
replica.close();
}

@Nightly
public void testPrimaryCloseWhileCopyingShortWait() throws Exception {
Path path1 = createTempDir("A");
NodeProcess primary = startNode(-1, 0, path1, -1, true);

Path path2 = createTempDir("B");
NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);

assertWriteLockHeld(path2);

sendReplicasToPrimary(primary, replica);

LineFileDocs docs = new LineFileDocs(random());
try (Connection c = new Connection(primary.tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
Document doc = docs.nextDoc();
primary.addOrUpdateDocument(c, doc, false);
}

// Refresh primary, which also pushes to replica:
long primaryVersion1 = primary.flush(0);
assertTrue(primaryVersion1 > 0);

// Wait for replica to sync up:
waitForVersionAndHits(replica, primaryVersion1, 1);

primary.setRemoteCloseTimeoutMs(1000);
primary.leakCopyState();
primary.close();
replica.close();
}

@Nightly
public void testFullClusterCrash() throws Exception {

Expand Down

0 comments on commit 4280bb7

Please sign in to comment.