diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java index defc804bb2ebc..010a051fe7af1 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java @@ -328,7 +328,9 @@ public enum SinkOption { BULK_FLUSH_BACKOFF_RETRIES, BULK_FLUSH_BACKOFF_DELAY, REST_MAX_RETRY_TIMEOUT, - REST_PATH_PREFIX + REST_PATH_PREFIX, + USERNAME, + PASSWORD } /** diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java index a20cb330740af..89d050ed9fec8 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java @@ -84,7 +84,9 @@ import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX; import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER; import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_PASSWORD; import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_USERNAME; import static org.apache.flink.table.descriptors.ElasticsearchValidator.validateAndParseHostsString; import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE; @@ -143,6 +145,8 @@ public List supportedProperties() { properties.add(CONNECTOR_BULK_FLUSH_BACKOFF_DELAY); properties.add(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT); properties.add(CONNECTOR_CONNECTION_PATH_PREFIX); + properties.add(CONNECTOR_USERNAME); + properties.add(CONNECTOR_PASSWORD); // schema properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE); @@ -307,6 +311,8 @@ private Map getSinkOptions(DescriptorProperties descriptorPr mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_BACKOFF_DELAY, SinkOption.BULK_FLUSH_BACKOFF_DELAY); mapSinkOption(descriptorProperties, options, CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, SinkOption.REST_MAX_RETRY_TIMEOUT); mapSinkOption(descriptorProperties, options, CONNECTOR_CONNECTION_PATH_PREFIX, SinkOption.REST_PATH_PREFIX); + mapSinkOption(descriptorProperties, options, CONNECTOR_USERNAME, SinkOption.USERNAME); + mapSinkOption(descriptorProperties, options, CONNECTOR_PASSWORD, SinkOption.PASSWORD); return options; } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java index ccbd4c10b6d91..32612bd366184 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java @@ -46,7 +46,9 @@ import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX; import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER; import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_PASSWORD; import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH; +import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_USERNAME; /** * Connector descriptor for the Elasticsearch search engine. @@ -299,6 +301,26 @@ public Elasticsearch connectionPathPrefix(String pathPrefix) { return this; } + /** + * Sets connection properties to be used to connect to es http api with basic auth verification. + * + * @param userName basic auth username + */ + public Elasticsearch connectionUserName(String userName) { + internalProperties.putString(CONNECTOR_USERNAME, userName); + return this; + } + + /** + * Sets connection properties to be used to connect to es http api with basic auth verification. + * + * @param password basic auth password + */ + public Elasticsearch connectionPassword(String password) { + internalProperties.putString(CONNECTOR_PASSWORD, password); + return this; + } + @Override protected Map toConnectorProperties() { final DescriptorProperties properties = new DescriptorProperties(); diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java index b8f233182f0db..b9345e84e1979 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java @@ -69,6 +69,8 @@ public class ElasticsearchValidator extends ConnectorDescriptorValidator { public static final String CONNECTOR_BULK_FLUSH_BACKOFF_DELAY = "connector.bulk-flush.backoff.delay"; public static final String CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT = "connector.connection-max-retry-timeout"; public static final String CONNECTOR_CONNECTION_PATH_PREFIX = "connector.connection-path-prefix"; + public static final String CONNECTOR_USERNAME = "connector.username"; + public static final String CONNECTOR_PASSWORD = "connector.password"; @Override public void validate(DescriptorProperties properties) { @@ -137,6 +139,8 @@ private void validateBulkFlush(DescriptorProperties properties) { private void validateConnectionProperties(DescriptorProperties properties) { properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1); properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true); + properties.validateString(CONNECTOR_USERNAME, true, 1); + properties.validateString(CONNECTOR_PASSWORD, true, 1); } /** diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java index 4149d14c23ab3..e5fa1cdbb0df0 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java @@ -31,6 +31,10 @@ import org.apache.flink.types.Row; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -53,8 +57,10 @@ import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS; import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE; import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.DISABLE_FLUSH_ON_CHECKPOINT; +import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.PASSWORD; import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT; import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX; +import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.USERNAME; /** * Version-specific upsert table sink for Elasticsearch 6. @@ -164,7 +170,7 @@ protected SinkFunction> createSinkFunction( Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT)) .map(Integer::valueOf) .orElse(null), - sinkOptions.get(REST_PATH_PREFIX))); + sinkOptions.get(REST_PATH_PREFIX), sinkOptions.get(USERNAME), sinkOptions.get(PASSWORD))); final ElasticsearchSink> sink = builder.build(); @@ -197,10 +203,14 @@ static class DefaultRestClientFactory implements RestClientFactory { private Integer maxRetryTimeout; private String pathPrefix; + private String userName; + private String password; - public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix) { + public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix, @Nullable String userName, @Nullable String password) { this.maxRetryTimeout = maxRetryTimeout; this.pathPrefix = pathPrefix; + this.userName = userName; + this.password = password; } @Override @@ -211,6 +221,12 @@ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { if (pathPrefix != null) { restClientBuilder.setPathPrefix(pathPrefix); } + if (userName != null && password != null) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); + restClientBuilder.setHttpClientConfigCallback( + httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } } @Override diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java index a5b5ef18029b9..a950e384872ce 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java @@ -103,7 +103,7 @@ public void testBuilder() { expectedBuilder.setBulkFlushInterval(100); expectedBuilder.setBulkFlushMaxActions(1000); expectedBuilder.setBulkFlushMaxSizeMb(1); - expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp")); + expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp", "", "")); assertEquals(expectedBuilder, testSink.builder); }