Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-6770][Sort] Optimize EsSink params #6821

Merged
merged 2 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ private boolean initEsclient() {
new UsernamePasswordCredentials(userName, password));
builder.setHttpClientConfigCallback((httpAsyncClientBuilder) -> {
httpAsyncClientBuilder.disableAuthCaching();
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(120 * 1000).build();
RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(context.getConnectionRequestTimeout())
.setMaxRedirects(context.getMaxRedirects())
.setSocketTimeout(context.getSocketTimeout())
.setConnectTimeout(120 * 1000).build();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(provider)
.setMaxConnTotal(context.getMaxConnect())
.setMaxConnPerRoute(context.getMaxConnect())
.setMaxConnPerRoute(context.getMaxConnectPerRoute())
.setDefaultRequestConfig(requestConfig);
});
esClient = EsSinkFactory.createRestHighLevelClient(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,28 @@ public class EsSinkContext extends SinkContext {
public static final String KEY_BULK_SIZE_MB = "bulkSizeMb";
public static final String KEY_FLUSH_INTERVAL = "flushInterval";
public static final String KEY_CONCURRENT_REQUESTS = "concurrentRequests";
public static final String KEY_MAX_CONNECT = "maxConnect";
public static final String KEY_MAX_CONNECT_TOTAL = "maxConnect";
public static final String KEY_MAX_CONNECT_PER_ROUTE = "maxConnectPerRoute";
public static final String KEY_CONNECTION_REQUEST_TIMEOUT = "connectionRequestTimeout";
public static final String KEY_SOCKET_TIMEOUT = "socketTimeout";
public static final String KEY_MAX_REDIRECTS = "maxRedirects";
public static final String KEY_LOG_MAX_LENGTH = "logMaxLength";
public static final String KEY_KEYWORD_MAX_LENGTH = "keywordMaxLength";
public static final String KEY_HTTP_HOSTS = "httpHosts";
public static final String KEY_EVENT_INDEXREQUEST_HANDLER = "indexRequestHandler";
public static final String KEY_IS_USE_INDEX_ID = "isUseIndexId";

public static final int DEFAULT_BULK_ACTION = 4000;
public static final int DEFAULT_BULK_SIZE_MB = 10;
public static final int DEFAULT_FLUSH_INTERVAL = 60;
public static final int DEFAULT_CONCURRENT_REQUESTS = 5;
public static final int DEFAULT_MAX_CONNECT = 10;
public static final int DEFAULT_KEYWORD_MAX_LENGTH = 31 * 1024;
public static final int DEFAULT_BULK_ACTION = 10000;
public static final int DEFAULT_BULK_SIZE_MB = 20;
public static final int DEFAULT_FLUSH_INTERVAL = 2000;
public static final int DEFAULT_CONCURRENT_REQUESTS = 20;
public static final int DEFAULT_MAX_CONNECT_TOTAL = 1000;
public static final int DEFAULT_MAX_CONNECT_PER_ROUTE = 1000;
public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 0;
public static final int DEFAULT_SOCKET_TIMEOUT = 0;
public static final int DEFAULT_MAX_REDIRECTS = 0;
public static final int DEFAULT_LOG_MAX_LENGTH = 32 * 1024;
public static final int DEFAULT_KEYWORD_MAX_LENGTH = 8 * 1024;
public static final boolean DEFAULT_IS_USE_INDEX_ID = false;

private Context sinkContext;
Expand All @@ -87,7 +97,12 @@ public class EsSinkContext extends SinkContext {
private int bulkSizeMb = DEFAULT_BULK_SIZE_MB;
private int flushInterval = DEFAULT_FLUSH_INTERVAL;
private int concurrentRequests = DEFAULT_CONCURRENT_REQUESTS;
private int maxConnect = DEFAULT_MAX_CONNECT;
private int maxConnect = DEFAULT_MAX_CONNECT_TOTAL;
private int maxConnectPerRoute = DEFAULT_MAX_CONNECT_PER_ROUTE;
private int connectionRequestTimeout = DEFAULT_CONNECTION_REQUEST_TIMEOUT;
private int socketTimeout = DEFAULT_SOCKET_TIMEOUT;
private int maxRedirects = DEFAULT_MAX_REDIRECTS;
private int logMaxLength = DEFAULT_LOG_MAX_LENGTH;
private int keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
private boolean isUseIndexId = DEFAULT_IS_USE_INDEX_ID;
// http host
Expand Down Expand Up @@ -148,7 +163,13 @@ public void reload() {
this.bulkSizeMb = sinkContext.getInteger(KEY_BULK_SIZE_MB, DEFAULT_BULK_SIZE_MB);
this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL);
this.concurrentRequests = sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT, DEFAULT_MAX_CONNECT);
this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL, DEFAULT_MAX_CONNECT_TOTAL);
this.maxConnectPerRoute = sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, DEFAULT_MAX_CONNECT_PER_ROUTE);
this.connectionRequestTimeout =
sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, DEFAULT_CONNECTION_REQUEST_TIMEOUT);
this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS);
this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, DEFAULT_LOG_MAX_LENGTH);
this.keywordMaxLength = sinkContext.getInteger(KEY_KEYWORD_MAX_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
this.isUseIndexId = sinkContext.getBoolean(KEY_IS_USE_INDEX_ID, DEFAULT_IS_USE_INDEX_ID);
// http host
Expand Down Expand Up @@ -425,6 +446,41 @@ public int getMaxConnect() {
return maxConnect;
}

/**
* get MaxConnectPerRoute
*/
public int getMaxConnectPerRoute() {
return maxConnectPerRoute;
}

/**
* get ConnectionRequestTimeout
*/
public int getConnectionRequestTimeout() {
return connectionRequestTimeout;
}

/**
* get SocketTimeout
*/
public int getSocketTimeout() {
return socketTimeout;
}

/**
* get MaxRedirects
*/
public int getMaxRedirects() {
return maxRedirects;
}

/**
* get LogMaxLength
*/
public int getLogMaxLength() {
return logMaxLength;
}

/**
* set maxConnect
*
Expand Down