Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ public enum SinkOption {
BULK_FLUSH_BACKOFF_RETRIES,
BULK_FLUSH_BACKOFF_DELAY,
REST_MAX_RETRY_TIMEOUT,
REST_PATH_PREFIX
REST_PATH_PREFIX,
USERNAME,
PASSWORD
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_PASSWORD;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_USERNAME;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.validateAndParseHostsString;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
Expand Down Expand Up @@ -143,6 +145,8 @@ public List<String> supportedProperties() {
properties.add(CONNECTOR_BULK_FLUSH_BACKOFF_DELAY);
properties.add(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT);
properties.add(CONNECTOR_CONNECTION_PATH_PREFIX);
properties.add(CONNECTOR_USERNAME);
properties.add(CONNECTOR_PASSWORD);

// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
Expand Down Expand Up @@ -307,6 +311,8 @@ private Map<SinkOption, String> getSinkOptions(DescriptorProperties descriptorPr
mapSinkOption(descriptorProperties, options, CONNECTOR_BULK_FLUSH_BACKOFF_DELAY, SinkOption.BULK_FLUSH_BACKOFF_DELAY);
mapSinkOption(descriptorProperties, options, CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, SinkOption.REST_MAX_RETRY_TIMEOUT);
mapSinkOption(descriptorProperties, options, CONNECTOR_CONNECTION_PATH_PREFIX, SinkOption.REST_PATH_PREFIX);
mapSinkOption(descriptorProperties, options, CONNECTOR_USERNAME, SinkOption.USERNAME);
mapSinkOption(descriptorProperties, options, CONNECTOR_PASSWORD, SinkOption.PASSWORD);

return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_INDEX;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_DELIMITER;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_KEY_NULL_LITERAL;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_PASSWORD;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_TYPE_VALUE_ELASTICSEARCH;
import static org.apache.flink.table.descriptors.ElasticsearchValidator.CONNECTOR_USERNAME;

/**
* Connector descriptor for the Elasticsearch search engine.
Expand Down Expand Up @@ -299,6 +301,26 @@ public Elasticsearch connectionPathPrefix(String pathPrefix) {
return this;
}

/**
* Sets connection properties to be used to connect to es http api with basic auth verification.
*
* @param userName basic auth username
*/
public Elasticsearch connectionUserName(String userName) {
internalProperties.putString(CONNECTOR_USERNAME, userName);
return this;
}

/**
* Sets connection properties to be used to connect to es http api with basic auth verification.
*
* @param password basic auth password
*/
public Elasticsearch connectionPassword(String password) {
internalProperties.putString(CONNECTOR_PASSWORD, password);
return this;
}

@Override
protected Map<String, String> toConnectorProperties() {
final DescriptorProperties properties = new DescriptorProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ElasticsearchValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_BULK_FLUSH_BACKOFF_DELAY = "connector.bulk-flush.backoff.delay";
public static final String CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT = "connector.connection-max-retry-timeout";
public static final String CONNECTOR_CONNECTION_PATH_PREFIX = "connector.connection-path-prefix";
public static final String CONNECTOR_USERNAME = "connector.username";
public static final String CONNECTOR_PASSWORD = "connector.password";

@Override
public void validate(DescriptorProperties properties) {
Expand Down Expand Up @@ -137,6 +139,8 @@ private void validateBulkFlush(DescriptorProperties properties) {
private void validateConnectionProperties(DescriptorProperties properties) {
properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1);
properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true);
properties.validateString(CONNECTOR_USERNAME, true, 1);
properties.validateString(CONNECTOR_PASSWORD, true, 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.apache.flink.types.Row;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand All @@ -53,8 +57,10 @@
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.DISABLE_FLUSH_ON_CHECKPOINT;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.PASSWORD;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX;
import static org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption.USERNAME;

/**
* Version-specific upsert table sink for Elasticsearch 6.
Expand Down Expand Up @@ -164,7 +170,7 @@ protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
.map(Integer::valueOf)
.orElse(null),
sinkOptions.get(REST_PATH_PREFIX)));
sinkOptions.get(REST_PATH_PREFIX), sinkOptions.get(USERNAME), sinkOptions.get(PASSWORD)));

final ElasticsearchSink<Tuple2<Boolean, Row>> sink = builder.build();

Expand Down Expand Up @@ -197,10 +203,14 @@ static class DefaultRestClientFactory implements RestClientFactory {

private Integer maxRetryTimeout;
private String pathPrefix;
private String userName;
private String password;

public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix) {
public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix, @Nullable String userName, @Nullable String password) {
this.maxRetryTimeout = maxRetryTimeout;
this.pathPrefix = pathPrefix;
this.userName = userName;
this.password = password;
}

@Override
Expand All @@ -211,6 +221,12 @@ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
if (pathPrefix != null) {
restClientBuilder.setPathPrefix(pathPrefix);
}
if (userName != null && password != null) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
restClientBuilder.setHttpClientConfigCallback(
httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testBuilder() {
expectedBuilder.setBulkFlushInterval(100);
expectedBuilder.setBulkFlushMaxActions(1000);
expectedBuilder.setBulkFlushMaxSizeMb(1);
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp"));
expectedBuilder.setRestClientFactory(new DefaultRestClientFactory(100, "/myapp", "", ""));
assertEquals(expectedBuilder, testSink.builder);
}

Expand Down