From bd4dcdfe09b732f2f566bf8813d35637d98bbdec Mon Sep 17 00:00:00 2001 From: Shubham Patil Date: Tue, 30 Jun 2020 06:16:38 +0530 Subject: [PATCH] add support for accessing cloud/restricted environments --- docs/Elasticsearch-batchsink.md | 21 ++- docs/Elasticsearch-batchsource.md | 28 +++- pom.xml | 4 +- .../elastic/BaseElasticsearchConfig.java | 64 +++++++- .../elastic/sink/BatchElasticsearchSink.java | 1 + .../elastic/sink/ElasticsearchSinkConfig.java | 14 +- .../elastic/source/ElasticsearchSource.java | 1 + .../source/ElasticsearchSourceConfig.java | 13 +- .../sink/ElasticsearchSinkConfigTest.java | 149 +++++++++++++++++- .../source/ElasticsearchSourceConfigTest.java | 148 ++++++++++++++++- widgets/Elasticsearch-batchsink.json | 12 ++ widgets/Elasticsearch-batchsource.json | 12 ++ 12 files changed, 449 insertions(+), 18 deletions(-) diff --git a/docs/Elasticsearch-batchsink.md b/docs/Elasticsearch-batchsink.md index bf87376..08b0d02 100644 --- a/docs/Elasticsearch-batchsink.md +++ b/docs/Elasticsearch-batchsink.md @@ -14,7 +14,7 @@ with a stream batch source and Elasticsearch as a sink. Configuration ------------- -**referenceName:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc. +**Reference Name:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc. **es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled) @@ -27,6 +27,9 @@ exist, it will be created. (Macro-enabled) **es.idField:** The field that will determine the id for the document; it should match a fieldname in the Structured Record of the input. (Macro-enabled) +**Additional Properties:** Additional properties to use with the es-hadoop client when writing the data, +documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html). +(Macro-enabled) Example ------- @@ -44,3 +47,19 @@ in the record. Each run, the documents will be updated if they are still present "es.idField": "id" } } + +This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co), +and writes the data to the specified index (megacorp) and type (employee). The data is indexed using the id field +in the record. Each run, the documents will be updated if they are still present in the source: + + { + "name": "Elasticsearch", + "type": "batchsink", + "properties": { + "es.host": "https://remote.region.gcp.cloud.es.io:9243", + "es.index": "megacorp", + "es.type": "employee", + "es.idField": "id", + "additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true" + } + } diff --git a/docs/Elasticsearch-batchsource.md b/docs/Elasticsearch-batchsource.md index 9c434d0..00cf684 100644 --- a/docs/Elasticsearch-batchsource.md +++ b/docs/Elasticsearch-batchsource.md @@ -13,7 +13,7 @@ in an index and type from Elasticsearch and store the data in an HBase table. Configuration ------------- -**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc. +**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc. **es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled) @@ -26,6 +26,9 @@ see Elasticsearch for additional query examples. (Macro-enabled) **schema:** The schema or mapping of the data in Elasticsearch. +**Additional Properties:** Additional properties to use with the es-hadoop client when reading the data, +documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html). +(Macro-enabled) Example ------- @@ -50,3 +53,26 @@ All data from the index will be read on each run: {\"name\":\"age\",\"type\":\"int\"}]}" } } + +This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co), +and reads in records in the specified index (*megacorp*) and type (*employee*) which match the query to +(in this case) select all records. All data from the index will be read on each run: + + { + "name": "Elasticsearch", + "type": "batchsource", + "properties": { + "es.host": "https://remote.region.gcp.cloud.es.io:9243", + "es.index": "megacorp", + "es.type": "employee", + "query": "?q=*", + "schema": "{ + \"type\":\"record\", + \"name\":\"etlSchemaBody\", + \"fields\":[ + {\"name\":\"id\",\"type\":\"long\"}, + {\"name\":\"name\",\"type\":\"string\"}, + {\"name\":\"age\",\"type\":\"int\"}]}", + "additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true" + } + } diff --git a/pom.xml b/pom.xml index 4e8161c..8e34c52 100644 --- a/pom.xml +++ b/pom.xml @@ -84,8 +84,8 @@ 2.3.0 4.11 2.3.0-SNAPSHOT - 7.5.2 - 7.5.2 + 7.6.2 + 7.6.2 1.7.5 4.0.30.Final 2.7 diff --git a/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java b/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java index 129a84d..f96b7ae 100644 --- a/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java +++ b/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java @@ -22,20 +22,29 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.common.IdUtils; +import io.cdap.plugin.common.KeyValueListParser; import io.cdap.plugin.common.ReferencePluginConfig; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; + /** * Basic config class for Elasticsearch plugin. */ public abstract class BaseElasticsearchConfig extends ReferencePluginConfig { + public static final String INDEX_NAME = "es.index"; public static final String TYPE_NAME = "es.type"; public static final String HOST = "es.host"; + public static final String ADDITIONAL_PROPERTIES = "additionalProperties"; private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch instance; " + - "for example, localhost:9200."; + "for example, localhost:9200 or https://remote.region.gcp.cloud.es.io:9243."; private static final String INDEX_DESCRIPTION = "The name of the index to query."; private static final String TYPE_DESCRIPTION = "The name of the type where the data is stored."; + private static final String ADDITIONAL_PROPERTIES_DESCRIPTION = "Additional client properties for ES-Hadoop"; @Name(HOST) @Description(HOST_DESCRIPTION) @@ -52,11 +61,19 @@ public abstract class BaseElasticsearchConfig extends ReferencePluginConfig { @Macro private final String type; - public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type) { + @Nullable + @Name(ADDITIONAL_PROPERTIES) + @Description(ADDITIONAL_PROPERTIES_DESCRIPTION) + @Macro + private final String additionalProperties; + + public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type, + String additionalProperties) { super(referenceName); this.hostname = hostname; this.index = index; this.type = type; + this.additionalProperties = additionalProperties; } public String getHostname() { @@ -75,6 +92,28 @@ public String getResource() { return String.format("%s/%s", index, type); } + @Nullable + public String getAdditionalProperties() { + return additionalProperties; + } + + public Map getAdditionalPropertiesMap() { + Map propertiesMap = new HashMap<>(); + if (additionalProperties == null || additionalProperties.trim().isEmpty()) { + return propertiesMap; + } + + KeyValueListParser parser = new KeyValueListParser("\n", "="); + parser.parse(additionalProperties).forEach(kv -> { + if (kv.getKey().trim().isEmpty()) { + throw new IllegalArgumentException("Key should not be empty"); + } else { + propertiesMap.put(kv.getKey().trim(), kv.getValue().trim()); + } + }); + return propertiesMap; + } + public void validate(FailureCollector collector) { IdUtils.validateReferenceName(referenceName, collector); @@ -93,20 +132,33 @@ public void validate(FailureCollector collector) { if (!containsMacro(TYPE_NAME) && Strings.isNullOrEmpty(type)) { collector.addFailure("Type must be specified.", null).withConfigProperty(TYPE_NAME); } + + if (!containsMacro(ADDITIONAL_PROPERTIES)) { + try { + getAdditionalPropertiesMap(); + } catch (Exception e) { + collector.addFailure("Additional properties must be a valid KV map", null) + .withConfigProperty(ADDITIONAL_PROPERTIES).withStacktrace(e.getStackTrace()); + } + } } private void validateHost(FailureCollector collector) { String[] hostParts = hostname.split(":"); // Elasticsearch Hadoop does not support IPV6 https://github.com/elastic/elasticsearch-hadoop/issues/1105 - if (hostParts.length != 2) { + // Length range [2,3] allowed for https hosts + if ((hostParts.length < 2) || (hostParts.length > 3) || (hostParts.length == 3 + && !(hostParts[0].equalsIgnoreCase("https") || hostParts[0].equalsIgnoreCase("http")))) { + collector.addFailure( "Invalid format of hostname", - "Hostname and port must be specified for the Elasticsearch instance, for example: 'localhost:9200'" + "Hostname and port must be specified for the Elasticsearch instance, " + + "for example: 'localhost:9200' or https://remote.region.gcp.cloud.es.io:9243" ).withConfigProperty(HOST); } else { - String host = hostParts[0]; - String port = hostParts[1]; + String host = String.join(":", Arrays.asList(hostParts).subList(0, hostParts.length - 1)); + String port = hostParts[hostParts.length - 1]; if (host.isEmpty()) { collector.addFailure("Host should not be empty.", null) diff --git a/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java b/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java index d456485..38ba522 100644 --- a/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java +++ b/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java @@ -83,6 +83,7 @@ public void prepareRun(BatchSinkContext context) throws IOException { conf.set("es.resource.write", config.getResource()); conf.set("es.input.json", "yes"); conf.set("es.mapping.id", config.getIdField()); + config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v)); context.addOutput(Output.of(config.referenceName, new SinkOutputFormatProvider(EsOutputFormat.class, conf))); } diff --git a/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java b/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java index 784451a..0b3dc10 100644 --- a/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java +++ b/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java @@ -37,13 +37,14 @@ public class ElasticsearchSinkConfig extends BaseElasticsearchConfig { @Macro private final String idField; - public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField) { - super(referenceName, hostname, index, type); + public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField, + String additionalProperties) { + super(referenceName, hostname, index, type, additionalProperties); this.idField = idField; } private ElasticsearchSinkConfig(Builder builder) { - super(builder.referenceName, builder.hostname, builder.index, builder.type); + super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties); idField = builder.idField; } @@ -58,6 +59,7 @@ public static Builder newBuilder(ElasticsearchSinkConfig copy) { builder.index = copy.getIndex(); builder.type = copy.getType(); builder.idField = copy.getIdField(); + builder.additionalProperties = copy.getAdditionalProperties(); return builder; } @@ -82,6 +84,7 @@ public static final class Builder { private String index; private String type; private String idField; + private String additionalProperties; private Builder() { } @@ -111,6 +114,11 @@ public Builder setIdField(String idField) { return this; } + public Builder setAdditionalProperties(String additionalProperties) { + this.additionalProperties = additionalProperties; + return this; + } + public ElasticsearchSinkConfig build() { return new ElasticsearchSinkConfig(this); } diff --git a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java index 991b5b0..fd2900c 100644 --- a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java +++ b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java @@ -89,6 +89,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { conf.set("es.nodes", config.getHostname()); conf.set("es.resource.read", config.getResource()); conf.set("es.query", config.getQuery()); + config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v)); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(EsInputFormat.class, conf))); diff --git a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java index 23079ea..7b53738 100644 --- a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java @@ -46,14 +46,14 @@ public class ElasticsearchSourceConfig extends BaseElasticsearchConfig { private final String schema; public ElasticsearchSourceConfig(String referenceName, String hostname, String index, String type, String query, - String schema) { - super(referenceName, hostname, index, type); + String schema, String additionalProperties) { + super(referenceName, hostname, index, type, additionalProperties); this.schema = schema; this.query = query; } private ElasticsearchSourceConfig(Builder builder) { - super(builder.referenceName, builder.hostname, builder.index, builder.type); + super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties); query = builder.query; schema = builder.schema; } @@ -70,6 +70,7 @@ public static Builder newBuilder(ElasticsearchSourceConfig copy) { builder.type = copy.getType(); builder.query = copy.getQuery(); builder.schema = copy.getSchema(); + builder.additionalProperties = copy.getAdditionalProperties(); return builder; } @@ -118,6 +119,7 @@ public static final class Builder { private String type; private String query; private String schema; + private String additionalProperties; private Builder() { } @@ -152,6 +154,11 @@ public Builder setSchema(String schema) { return this; } + public Builder setAdditionalProperties(String additionalProperties) { + this.additionalProperties = additionalProperties; + return this; + } + public ElasticsearchSourceConfig build() { return new ElasticsearchSourceConfig(this); } diff --git a/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java b/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java index b47627b..855d3f5 100644 --- a/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java +++ b/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java @@ -18,6 +18,7 @@ import io.cdap.cdap.etl.mock.validation.MockFailureCollector; import io.cdap.plugin.elastic.ValidationAssertions; +import io.cdap.plugin.elastic.source.ElasticsearchSourceConfig; import org.junit.Assert; import org.junit.Test; @@ -33,7 +34,8 @@ public class ElasticsearchSinkConfigTest { "localhost:9200", "test", "test", - "id" + "id", + null ); @Test @@ -56,6 +58,17 @@ public void testInvalidReferenceName() { @Test public void testInvalidHostname() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("abc:abc:abc:abc") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + + @Test + public void testInvalidHostProtocol() { ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) .setHostname("abc:abc:abc") .build(); @@ -65,6 +78,28 @@ public void testInvalidHostname() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); } + @Test + public void testValidHttpHostname() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:9200") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidHttpsHostName() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:9200") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + @Test public void testEmptyHostname() { ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) @@ -98,6 +133,28 @@ public void testEmptyPort() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); } + @Test + public void testEmptyPortWithHttpHost() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + + @Test + public void testEmptyPortWithHttpsHost() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + @Test public void testInvalidPort() { ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) @@ -109,6 +166,28 @@ public void testInvalidPort() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); } + @Test + public void testInvalidPortWithHttpHost() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:abc") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + + @Test + public void testInvalidPortWithHttpsHost() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:abc") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + @Test public void testInvalidNumberInPort() { ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) @@ -120,6 +199,28 @@ public void testInvalidNumberInPort() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); } + @Test + public void testInvalidNumberInPortWithHttpHost() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:100000") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + + @Test + public void testInvalidNumberInPortWithHttpsHost() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:100000") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST); + } + @Test public void testInvalidIndex() { ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) @@ -152,4 +253,50 @@ public void testInvalidIdField() { config.validate(failureCollector); ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.ID_FIELD); } + + @Test + public void testEmptyAdditionalProperties() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + + @Test + public void testInvalidAdditionalProperties() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("es.net.http.auth.user") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, + ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES); + } + + @Test + public void testValidAdditionalProperties() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("es.nodes.wan.only=true") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidAdditionalPropertiesWithEmptyKey() { + ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("es.nodes.wan.only=true;=false") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, + ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES); + } } diff --git a/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java b/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java index f470f66..f965ed8 100644 --- a/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java +++ b/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java @@ -35,7 +35,8 @@ public class ElasticsearchSourceConfigTest { "test", "test", "?q=*", - Schema.recordOf("record", Schema.Field.of("id", Schema.of(Schema.Type.LONG))).toString() + Schema.recordOf("record", Schema.Field.of("id", Schema.of(Schema.Type.LONG))).toString(), + null ); @Test @@ -58,6 +59,17 @@ public void testInvalidReferenceName() { @Test public void testInvalidHostname() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("abc:abc:abc:abc") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + + @Test + public void testInvalidHostProtocol() { ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) .setHostname("abc:abc:abc") .build(); @@ -67,6 +79,28 @@ public void testInvalidHostname() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); } + @Test + public void testValidHttpHostname() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:9200") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidHttpsHostName() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:9200") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + @Test public void testEmptyHostname() { ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) @@ -100,6 +134,28 @@ public void testEmptyPort() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); } + @Test + public void testEmptyPortWithHttpHost() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + + @Test + public void testEmptyPortWithHttpsHost() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + @Test public void testInvalidPort() { ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) @@ -111,6 +167,28 @@ public void testInvalidPort() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); } + @Test + public void testInvalidPortWithHttpHost() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:abc") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + + @Test + public void testInvalidPortWithHttpsHost() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:abc") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + @Test public void testInvalidNumberInPort() { ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) @@ -122,6 +200,28 @@ public void testInvalidNumberInPort() { ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); } + @Test + public void testInvalidNumberInPortWithHttpHost() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("http://abc:100000") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + + @Test + public void testInvalidNumberInPortWithHttpsHost() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setHostname("https://abc:100000") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST); + } + @Test public void testInvalidIndex() { ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) @@ -176,4 +276,50 @@ public void testInvalidSchema() { config.validate(failureCollector); ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.SCHEMA); } + + @Test + public void testEmptyAdditionalProperties() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + + @Test + public void testInvalidAdditionalProperties() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("es.net.http.auth.user") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, + ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES); + } + + @Test + public void testValidAdditionalProperties() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("es.nodes.wan.only=true") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + Assert.assertTrue(failureCollector.getValidationFailures().isEmpty()); + } + + @Test + public void testValidAdditionalPropertiesWithEmptyKey() { + ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG) + .setAdditionalProperties("es.nodes.wan.only=true;=false") + .build(); + + MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE); + config.validate(failureCollector); + ValidationAssertions.assertPropertyValidationFailed(failureCollector, + ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES); + } } diff --git a/widgets/Elasticsearch-batchsink.json b/widgets/Elasticsearch-batchsink.json index 43955f2..2d9ee03 100644 --- a/widgets/Elasticsearch-batchsink.json +++ b/widgets/Elasticsearch-batchsink.json @@ -31,6 +31,18 @@ "widget-type": "textbox", "label": "ID Field", "name": "es.idField" + }, + { + "widget-type": "keyvalue", + "label": "Additional Properties", + "name": "additionalProperties", + "widget-attributes": { + "showDelimiter": "false", + "delimiter": "\n", + "kv-delimiter": "=", + "key-placeholder": "Property Name", + "value-placeholder": "Property Value" + } } ] } diff --git a/widgets/Elasticsearch-batchsource.json b/widgets/Elasticsearch-batchsource.json index 32e33c1..83be866 100644 --- a/widgets/Elasticsearch-batchsource.json +++ b/widgets/Elasticsearch-batchsource.json @@ -31,6 +31,18 @@ "widget-type": "textbox", "label": "Query", "name": "query" + }, + { + "widget-type": "keyvalue", + "label": "Additional Properties", + "name": "additionalProperties", + "widget-attributes": { + "showDelimiter": "false", + "delimiter": "\n", + "kv-delimiter": "=", + "key-placeholder": "Property Name", + "value-placeholder": "Property Value" + } } ] }