Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException in maybeReadTimeOut() method can cause Eureka cluster registration inconsistency #1497

Open
laniakea1990 opened this issue Apr 20, 2023 · 2 comments

Comments

@laniakea1990
Copy link

Description:

An issue has been discovered where an uncommon but occasional exception can occur in the maybeReadTimeOut() method in ReplicationTaskProcessor.java. This can lead to the batchingDispatcher thread pool in the PeerEurekaNode class being exhausted, resulting in registration inconsistencies between nodes in the Eureka cluster.

To reproduce the issue, a 3-node Eureka cluster (node1, node2, node3) was deployed on a Kubernetes container cloud with relevant peer node configurations. In certain exceptional scenarios, the maybeReadTimeOut() method in ReplicationTaskProcessor.java can throw a NullPointerException in its String message = e.getMessage().toLowerCase() line. This exception is caught in the process() method, which passes it to the BatchWorkerRunnable inner class. Each occurrence of the exception causes one less working thread in the batchingDispatcher. The batchingDispatcher's initial working thread count is 20 (read from config.getMaxThreadsForPeerReplication()). When all 20 working threads on Eureka node node1 are exhausted due to the NullPointerException, this causes the Eureka node to fail to execute synchronization registration instance information and other tasks.

some essential code list below:

ReplicationTaskProcessor.java

 public ProcessingResult process(List<ReplicationTask> tasks) {
        ReplicationList list = createReplicationListOf(tasks);
        try {
            EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
            int statusCode = response.getStatusCode();
            if (!isSuccess(statusCode)) {
                if (statusCode == 503) {
                    logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                    return ProcessingResult.Congestion;
                } else {
                    // Unexpected error returned from the server. This should ideally never happen.
                    logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                    return ProcessingResult.PermanentError;
                }
            } else {
                handleBatchResponse(tasks, response.getEntity().getResponseList());
            }
        } catch (Throwable e) {
            if (maybeReadTimeOut(e)) {
                logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
            	//read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
                return ProcessingResult.Congestion;
            } else if (isNetworkConnectException(e)) {
                logNetworkErrorSample(null, e);
                return ProcessingResult.TransientError;
            } else {
                logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
                return ProcessingResult.PermanentError;
            }
        }
        return ProcessingResult.Success;
    }

private static boolean maybeReadTimeOut(Throwable e) {
        do {
            if (IOException.class.isInstance(e)) {
               ### !!!!!this line occurs NullPointerException in our prod env.
            	String message = e.getMessage().toLowerCase();
            	Matcher matcher = READ_TIME_OUT_PATTERN.matcher(message);
            	if(matcher.find()) {
            		return true;
            	}
            }
            e = e.getCause();
        } while (e != null);
        return false;
}

TaskExecutors.java 
static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,
                                                       int workerCount,
                                                       final TaskProcessor<T> processor,
                                                       final AcceptorExecutor<ID, T> acceptorExecutor) {
        final AtomicBoolean isShutdown = new AtomicBoolean();
        final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
        registeredMonitors.put(name, metrics);
        return new TaskExecutors<>(idx -> new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
}
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {

        BatchWorkerRunnable(String workerName,
                            AtomicBoolean isShutdown,
                            TaskExecutorMetrics metrics,
                            TaskProcessor<T> processor,
                            AcceptorExecutor<ID, T> acceptorExecutor) {
            super(workerName, isShutdown, metrics, processor, acceptorExecutor);
        }

        @Override
        public void run() {
            try {
                while (!isShutdown.get()) {
                    List<TaskHolder<ID, T>> holders = getWork();
                    metrics.registerExpiryTimes(holders);

                    List<T> tasks = getTasksOf(holders);
                    ProcessingResult result = processor.process(tasks);
                    switch (result) {
                        case Success:
                            break;
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                            break;
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
                    }
                    metrics.registerTaskResult(result, tasks.size());
                }
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
               ###  !!! Log 'Discovery WorkerThread error' print some times in our prod env.
                logger.warn("Discovery WorkerThread error", e);
            }
        }
}

Environment:

Kubernetes container cloud
Eureka version: 1.10.7
Java version: 1.8.0_202

laniakea1990 added a commit to laniakea1990/eureka that referenced this issue Apr 20, 2023
this commit fix issue:
NullPointerException in maybeReadTimeOut() method can cause Eureka cluster registration inconsistency Netflix#1497
@niu-dali
Copy link

Cannot invoke "String.toLowerCase()" because the return value of "java.lang.Throwable.getMessage()" is null

@niu-dali
Copy link

2023-04-21 22:08:00.705 WARN 10165 --- [1.117.233.35-19] c.n.eureka.util.batcher.TaskExecutors : Discovery WorkerThread error

java.lang.NullPointerException: Cannot invoke "String.toLowerCase()" because the return value of "java.lang.Throwable.getMessage()" is null
at com.netflix.eureka.cluster.ReplicationTaskProcessor.maybeReadTimeOut(ReplicationTaskProcessor.java:196)
at com.netflix.eureka.cluster.ReplicationTaskProcessor.process(ReplicationTaskProcessor.java:95)
at com.netflix.eureka.util.batcher.TaskExecutors$BatchWorkerRunnable.run(TaskExecutors.java:190)
at java.base/java.lang.Thread.run(Thread.java:833)

niu-dali added a commit to niu-dali/eureka that referenced this issue Apr 21, 2023
fix "NullPointerException throw by maybeReadTimeOut() that can cause Eureka cluster registration inconsistency"

Netflix#1497
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants