Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config keepalive and soTimeout,connectTimeout ,thread still block on RestCleint.performRequest #103742

Closed
chen8238065 opened this issue Dec 29, 2023 · 1 comment
Labels
>bug needs:triage Requires assignment of a team area label

Comments

@chen8238065
Copy link

Elasticsearch Version

7.17.0

Installed Plugins

No response

Java Version

1.8.0_231

OS Version

Linux tjtx176-77-179.58os.org 4.19.91-27.5_0.1.an7.x86_64 #1 SMP Wed Aug 16 10:17:57 CST 2023 x86_64 x86_64 x86_64 GNU/Linux

Problem Description

I set tcp timeout config, but some time it can not normally work .

here is my ElasticSearchConfig:

<dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.0.1</version>
        </dependency>
public class ElasticSearchConfig {
    public static final Map<String, String> BEAN_MAP = ImmutableMap.of("default", "aifangreportlistESClient");

    @Value("${aifangreportlistESClient.userName:}")
    String userName;
    @Value("${aifangreportlistESClient.password:}")
    String password;

    @AutoRefresh
    @Value("${aifangreportlistESClient.timeout:400}")
    volatile Integer timeout;

    @AutoRefresh
    @Value("${aifangreportlistESClient.soTimeout:5000}")
    volatile Integer soTimeout;

    @Value("${aifangreportlistESClient.url}")
    String url;

    @Primary
    @Bean("aifangreportlistESClient")
    public ElasticsearchClient elasticsearchClient() {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
        RestClient restClient = RestClient.builder(HttpHost.create(url))
                .setHttpClientConfigCallback(c -> c
                        .setKeepAliveStrategy(new CustomConnectionKeepAliveStrategy())
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .addInterceptorFirst(new LogHttpRequestInterceptor())
                        .addInterceptorFirst(new DynamicTimeOutRequestInterceptor(this))
                        .setDefaultIOReactorConfig(IOReactorConfig.custom()
                                .setSelectInterval(2)
                                .setSoTimeout(soTimeout)
                                .setConnectTimeout(1010)
                                .setSoKeepAlive(true)
                                .build()))
                .setRequestConfigCallback(q -> {
                    q.setConnectTimeout(1010);
                    q.setConnectionRequestTimeout(1010);
                    q.setSocketTimeout(timeout);
                    return q;
                })
                .build();
        // 使用Jackson映射器创建传输层
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());
        // 创建API客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);
        return client;
    }


    public static ElasticsearchClient getInstance(String group) {
        String s = BEAN_MAP.get(group);
        return AjkGlobal.getApplicationContext().getBean(s, ElasticsearchClient.class);
    }


    /**
     * 自定义keep alive策略
     */
    public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {

        public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();

        public CustomConnectionKeepAliveStrategy() {
            super();
        }

        /**
         * 最大keep alive的时间(分钟)
         * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
         */
        private final long MAX_KEEP_ALIVE_MINUTES = 3;

        @Override
        public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
            long keepAliveDuration = super.getKeepAliveDuration(response, context);
            // <0 为无限期keepalive
            // 将无限期替换成一个默认的时间
            if (keepAliveDuration < 0) {
                return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
            }
            return keepAliveDuration;
        }
    }


    /**
     * 打印请求体
     */
    public static class LogHttpRequestInterceptor implements HttpRequestInterceptor {
        @Setter
        public static boolean debug = false;

        @Override
        public void process(org.apache.http.HttpRequest request, HttpContext context) throws java.io.IOException {
            if (log.isDebugEnabled() || debug) {
                if (request instanceof org.apache.http.HttpEntityEnclosingRequest) {
                    try {
                        HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
                        if (entity == null) {
                            return;
                        }
                        //InputStream to String
                        InputStream inputStream = entity.getContent();
                        if (inputStream == null) {
                            return;
                        }
                        String body = org.apache.commons.io.IOUtils.toString(inputStream, "UTF-8");
                        log.debug("request body: {}", body);
                    } catch (Exception e) {
                        log.error("LogHttpRequestInterceptor error", e);
                    }
                }
            }
        }
    }


    /**
     * 动态调整耗时
     */
    public static class DynamicTimeOutRequestInterceptor implements HttpRequestInterceptor {
        @Setter
        public static boolean debug = false;

        public static volatile int curTimeout = 0;

        ElasticSearchConfig elasticSearchConfig;

        public DynamicTimeOutRequestInterceptor(ElasticSearchConfig elasticSearchConfig) {
            this.elasticSearchConfig = elasticSearchConfig;
            curTimeout = elasticSearchConfig.getTimeout();
        }

        @Override
        public void process(org.apache.http.HttpRequest request, HttpContext context) throws java.io.IOException {
            Integer errCount = RetryHandler.currentErrorCount();
            HttpClientContext clientContext = (HttpClientContext) context;
            RequestConfig requestConfig = clientContext.getRequestConfig();

            if (elasticSearchConfig.getTimeout() != curTimeout) {
                curTimeout = elasticSearchConfig.getTimeout();
                RequestConfig.Builder copy = RequestConfig.copy(requestConfig);
                copy.setSocketTimeout(curTimeout);
                clientContext.setRequestConfig(copy.build());
            }
            // 重试的时候放宽超时限制
            if (errCount > 0) {
                RequestConfig.Builder copy = RequestConfig.copy(requestConfig);
                copy.setSocketTimeout(elasticSearchConfig.getSoTimeout());
                clientContext.setRequestConfig(copy.build());
            }

        }
    }
}

jstack info:

"SYNCSTEAL-ASYNC-ORDER-4" #283 prio=5 os_prio=0 tid=0x00007f7db24a9800 nid=0x855 in Object.wait() [0x00007f7ced5dd000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.http.concurrent.BasicFuture.get(BasicFuture.java:82)
- locked <0x000000073a8a33f8> (a org.apache.http.concurrent.BasicFuture)
at org.apache.http.impl.nio.client.FutureWrapper.get(FutureWrapper.java:70)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:295)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:287)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:146)
at co.elastic.clients.elasticsearch.ElasticsearchClient.index(ElasticsearchClient.java:960)

Steps to Reproduce

I cannot reproduce. I run unit test ,it always normally return timeout exception when server run long time .

Logs (if relevant)

No response

@chen8238065 chen8238065 added >bug needs:triage Requires assignment of a team area label labels Dec 29, 2023
@DaveCTurner
Copy link
Contributor

Thanks very much for your interest in Elasticsearch.

This appears to be a user question, and we'd like to direct these kinds of things to the Elasticsearch forum. If you can stop by there, we'd appreciate it. This allows us to use GitHub for verified bug reports, feature requests, and pull requests.

There's an active community in the forum that should be able to help get an answer to your question. As such, I hope you don't mind that I close this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug needs:triage Requires assignment of a team area label
Projects
None yet
Development

No branches or pull requests

2 participants