Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP-1.10][FLINK-14316] Properly manage rpcConnection in JobManagerLeaderListener under leader change #11604

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -228,20 +231,27 @@ public void reconnect(final JobID jobId) {
/**
* Leader listener which tries to establish a connection to a newly detected job leader.
*/
@ThreadSafe
private final class JobManagerLeaderListener implements LeaderRetrievalListener {

private final Object lock = new Object();

/** Job id identifying the job to look for a leader. */
private final JobID jobId;

/** Rpc connection to the job leader. */
private volatile RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
@GuardedBy("lock")
@Nullable
private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;

/** Leader id of the current job leader. */
@GuardedBy("lock")
@Nullable
private JobMasterId currentJobMasterId;

/** State of the listener. */
private volatile boolean stopped;

/** Leader id of the current job leader. */
private volatile JobMasterId currentJobMasterId;

private JobManagerLeaderListener(JobID jobId) {
this.jobId = Preconditions.checkNotNull(jobId);

Expand All @@ -250,91 +260,96 @@ private JobManagerLeaderListener(JobID jobId) {
currentJobMasterId = null;
}

private JobMasterId getCurrentJobMasterId() {
synchronized (lock) {
return currentJobMasterId;
}
}

public void stop() {
stopped = true;
synchronized (lock) {
if (!stopped) {
stopped = true;

if (rpcConnection != null) {
rpcConnection.close();
closeRpcConnection();
}
}
}

public void reconnect() {
if (stopped) {
LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
} else {
final RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> currentRpcConnection = rpcConnection;

if (currentRpcConnection != null) {
if (currentRpcConnection.isConnected()) {

if (currentRpcConnection.tryReconnect()) {
// double check for concurrent stop operation
if (stopped) {
currentRpcConnection.close();
}
} else {
LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress());
}
synchronized (lock) {
if (stopped) {
LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
} else {
if (rpcConnection != null) {
Preconditions.checkState(
rpcConnection.tryReconnect(),
"Illegal concurrent modification of the JobManagerLeaderListener rpc connection.");
} else {
LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress());
LOG.debug("Cannot reconnect to an unknown JobMaster.");
}
} else {
LOG.debug("Cannot reconnect to an unknown JobMaster.");
}
}
}

@Override
public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
if (stopped) {
LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
} else {
final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();

LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
jobId, leaderAddress, jobMasterId);

if (leaderAddress == null || leaderAddress.isEmpty()) {
// the leader lost leadership but there is no other leader yet.
if (rpcConnection != null) {
rpcConnection.close();
}

jobLeaderListener.jobManagerLostLeadership(jobId, currentJobMasterId);

currentJobMasterId = jobMasterId;
synchronized (lock) {
if (stopped) {
LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
} else {
currentJobMasterId = jobMasterId;
final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);

if (rpcConnection != null) {
// check if we are already trying to connect to this leader
if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) {
rpcConnection.close();

rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());
}
} else {
rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());
}
LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
jobId, leaderAddress, jobMasterId);

// double check for a concurrent stop operation
if (stopped) {
rpcConnection.close();
if (leaderAddress == null || leaderAddress.isEmpty()) {
// the leader lost leadership but there is no other leader yet.
jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);
closeRpcConnection();
} else {
LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId);
rpcConnection.start();
// check whether we are already connecting to this leader
if (Objects.equals(jobMasterId, currentJobMasterId)) {
LOG.debug("Trying already connecting to leader of job {}. Ignoring duplicate leader information.", jobId);
} else {
closeRpcConnection();
openRpcConnectionTo(leaderAddress, jobMasterId);
}
}
}
}

// send callbacks outside of the lock scope
jobManagerLostLeadership.ifPresent(oldJobMasterId -> jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
}

@GuardedBy("lock")
private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) {
Preconditions.checkState(
currentJobMasterId == null && rpcConnection == null,
"Cannot open a new rpc connection if the previous connection has not been closed.");

currentJobMasterId = jobMasterId;
rpcConnection = new JobManagerRegisteredRpcConnection(
LOG,
leaderAddress,
jobMasterId,
rpcService.getExecutor());

LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, jobMasterId.toUUID());
rpcConnection.start();
}

@GuardedBy("lock")
private void closeRpcConnection() {
if (rpcConnection != null) {
rpcConnection.close();
rpcConnection = null;
currentJobMasterId = null;
}
}

@Override
Expand Down Expand Up @@ -378,7 +393,7 @@ protected RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSu
@Override
protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
// filter out old registration attempts
if (Objects.equals(getTargetLeaderId(), currentJobMasterId)) {
if (Objects.equals(getTargetLeaderId(), getCurrentJobMasterId())) {
log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId);

jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), success);
Expand All @@ -390,7 +405,7 @@ protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
@Override
protected void onRegistrationFailure(Throwable failure) {
// filter out old registration attempts
if (Objects.equals(getTargetLeaderId(), currentJobMasterId)) {
if (Objects.equals(getTargetLeaderId(), getCurrentJobMasterId())) {
log.info("Failed to register at job manager {} for job {}.", getTargetAddress(), jobId);
jobLeaderListener.handleError(failure);
} else {
Expand Down