diff --git a/docs/Elasticsearch-batchsink.md b/docs/Elasticsearch-batchsink.md
index bf87376..08b0d02 100644
--- a/docs/Elasticsearch-batchsink.md
+++ b/docs/Elasticsearch-batchsink.md
@@ -14,7 +14,7 @@ with a stream batch source and Elasticsearch as a sink.
Configuration
-------------
-**referenceName:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.
+**Reference Name:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.
**es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled)
@@ -27,6 +27,9 @@ exist, it will be created. (Macro-enabled)
**es.idField:** The field that will determine the id for the document; it should match a fieldname
in the Structured Record of the input. (Macro-enabled)
+**Additional Properties:** Additional properties to use with the es-hadoop client when writing the data,
+documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html).
+(Macro-enabled)
Example
-------
@@ -44,3 +47,19 @@ in the record. Each run, the documents will be updated if they are still present
"es.idField": "id"
}
}
+
+This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co),
+and writes the data to the specified index (megacorp) and type (employee). The data is indexed using the id field
+in the record. Each run, the documents will be updated if they are still present in the source:
+
+ {
+ "name": "Elasticsearch",
+ "type": "batchsink",
+ "properties": {
+ "es.host": "https://remote.region.gcp.cloud.es.io:9243",
+ "es.index": "megacorp",
+ "es.type": "employee",
+ "es.idField": "id",
+ "additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true"
+ }
+ }
diff --git a/docs/Elasticsearch-batchsource.md b/docs/Elasticsearch-batchsource.md
index 9c434d0..00cf684 100644
--- a/docs/Elasticsearch-batchsource.md
+++ b/docs/Elasticsearch-batchsource.md
@@ -13,7 +13,7 @@ in an index and type from Elasticsearch and store the data in an HBase table.
Configuration
-------------
-**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
+**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
**es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled)
@@ -26,6 +26,9 @@ see Elasticsearch for additional query examples. (Macro-enabled)
**schema:** The schema or mapping of the data in Elasticsearch.
+**Additional Properties:** Additional properties to use with the es-hadoop client when reading the data,
+documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html).
+(Macro-enabled)
Example
-------
@@ -50,3 +53,26 @@ All data from the index will be read on each run:
{\"name\":\"age\",\"type\":\"int\"}]}"
}
}
+
+This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co),
+and reads in records in the specified index (*megacorp*) and type (*employee*) which match the query to
+(in this case) select all records. All data from the index will be read on each run:
+
+ {
+ "name": "Elasticsearch",
+ "type": "batchsource",
+ "properties": {
+ "es.host": "https://remote.region.gcp.cloud.es.io:9243",
+ "es.index": "megacorp",
+ "es.type": "employee",
+ "query": "?q=*",
+ "schema": "{
+ \"type\":\"record\",
+ \"name\":\"etlSchemaBody\",
+ \"fields\":[
+ {\"name\":\"id\",\"type\":\"long\"},
+ {\"name\":\"name\",\"type\":\"string\"},
+ {\"name\":\"age\",\"type\":\"int\"}]}",
+ "additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true"
+ }
+ }
diff --git a/pom.xml b/pom.xml
index 4e8161c..8e34c52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,8 +84,8 @@
2.3.0
4.11
2.3.0-SNAPSHOT
- 7.5.2
- 7.5.2
+ 7.6.2
+ 7.6.2
1.7.5
4.0.30.Final
2.7
diff --git a/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java b/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java
index 129a84d..f96b7ae 100644
--- a/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java
+++ b/src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java
@@ -22,20 +22,29 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.IdUtils;
+import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.common.ReferencePluginConfig;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
/**
* Basic config class for Elasticsearch plugin.
*/
public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {
+
public static final String INDEX_NAME = "es.index";
public static final String TYPE_NAME = "es.type";
public static final String HOST = "es.host";
+ public static final String ADDITIONAL_PROPERTIES = "additionalProperties";
private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch instance; " +
- "for example, localhost:9200.";
+ "for example, localhost:9200 or https://remote.region.gcp.cloud.es.io:9243.";
private static final String INDEX_DESCRIPTION = "The name of the index to query.";
private static final String TYPE_DESCRIPTION = "The name of the type where the data is stored.";
+ private static final String ADDITIONAL_PROPERTIES_DESCRIPTION = "Additional client properties for ES-Hadoop";
@Name(HOST)
@Description(HOST_DESCRIPTION)
@@ -52,11 +61,19 @@ public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {
@Macro
private final String type;
- public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type) {
+ @Nullable
+ @Name(ADDITIONAL_PROPERTIES)
+ @Description(ADDITIONAL_PROPERTIES_DESCRIPTION)
+ @Macro
+ private final String additionalProperties;
+
+ public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type,
+ String additionalProperties) {
super(referenceName);
this.hostname = hostname;
this.index = index;
this.type = type;
+ this.additionalProperties = additionalProperties;
}
public String getHostname() {
@@ -75,6 +92,28 @@ public String getResource() {
return String.format("%s/%s", index, type);
}
+ @Nullable
+ public String getAdditionalProperties() {
+ return additionalProperties;
+ }
+
+ public Map getAdditionalPropertiesMap() {
+ Map propertiesMap = new HashMap<>();
+ if (additionalProperties == null || additionalProperties.trim().isEmpty()) {
+ return propertiesMap;
+ }
+
+ KeyValueListParser parser = new KeyValueListParser("\n", "=");
+ parser.parse(additionalProperties).forEach(kv -> {
+ if (kv.getKey().trim().isEmpty()) {
+ throw new IllegalArgumentException("Key should not be empty");
+ } else {
+ propertiesMap.put(kv.getKey().trim(), kv.getValue().trim());
+ }
+ });
+ return propertiesMap;
+ }
+
public void validate(FailureCollector collector) {
IdUtils.validateReferenceName(referenceName, collector);
@@ -93,20 +132,33 @@ public void validate(FailureCollector collector) {
if (!containsMacro(TYPE_NAME) && Strings.isNullOrEmpty(type)) {
collector.addFailure("Type must be specified.", null).withConfigProperty(TYPE_NAME);
}
+
+ if (!containsMacro(ADDITIONAL_PROPERTIES)) {
+ try {
+ getAdditionalPropertiesMap();
+ } catch (Exception e) {
+ collector.addFailure("Additional properties must be a valid KV map", null)
+ .withConfigProperty(ADDITIONAL_PROPERTIES).withStacktrace(e.getStackTrace());
+ }
+ }
}
private void validateHost(FailureCollector collector) {
String[] hostParts = hostname.split(":");
// Elasticsearch Hadoop does not support IPV6 https://github.com/elastic/elasticsearch-hadoop/issues/1105
- if (hostParts.length != 2) {
+ // Length range [2,3] allowed for https hosts
+ if ((hostParts.length < 2) || (hostParts.length > 3) || (hostParts.length == 3
+ && !(hostParts[0].equalsIgnoreCase("https") || hostParts[0].equalsIgnoreCase("http")))) {
+
collector.addFailure(
"Invalid format of hostname",
- "Hostname and port must be specified for the Elasticsearch instance, for example: 'localhost:9200'"
+ "Hostname and port must be specified for the Elasticsearch instance, " +
+ "for example: 'localhost:9200' or https://remote.region.gcp.cloud.es.io:9243"
).withConfigProperty(HOST);
} else {
- String host = hostParts[0];
- String port = hostParts[1];
+ String host = String.join(":", Arrays.asList(hostParts).subList(0, hostParts.length - 1));
+ String port = hostParts[hostParts.length - 1];
if (host.isEmpty()) {
collector.addFailure("Host should not be empty.", null)
diff --git a/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java b/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java
index d456485..38ba522 100644
--- a/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java
+++ b/src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java
@@ -83,6 +83,7 @@ public void prepareRun(BatchSinkContext context) throws IOException {
conf.set("es.resource.write", config.getResource());
conf.set("es.input.json", "yes");
conf.set("es.mapping.id", config.getIdField());
+ config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v));
context.addOutput(Output.of(config.referenceName, new SinkOutputFormatProvider(EsOutputFormat.class, conf)));
}
diff --git a/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java b/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java
index 784451a..0b3dc10 100644
--- a/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java
+++ b/src/main/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfig.java
@@ -37,13 +37,14 @@ public class ElasticsearchSinkConfig extends BaseElasticsearchConfig {
@Macro
private final String idField;
- public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField) {
- super(referenceName, hostname, index, type);
+ public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField,
+ String additionalProperties) {
+ super(referenceName, hostname, index, type, additionalProperties);
this.idField = idField;
}
private ElasticsearchSinkConfig(Builder builder) {
- super(builder.referenceName, builder.hostname, builder.index, builder.type);
+ super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties);
idField = builder.idField;
}
@@ -58,6 +59,7 @@ public static Builder newBuilder(ElasticsearchSinkConfig copy) {
builder.index = copy.getIndex();
builder.type = copy.getType();
builder.idField = copy.getIdField();
+ builder.additionalProperties = copy.getAdditionalProperties();
return builder;
}
@@ -82,6 +84,7 @@ public static final class Builder {
private String index;
private String type;
private String idField;
+ private String additionalProperties;
private Builder() {
}
@@ -111,6 +114,11 @@ public Builder setIdField(String idField) {
return this;
}
+ public Builder setAdditionalProperties(String additionalProperties) {
+ this.additionalProperties = additionalProperties;
+ return this;
+ }
+
public ElasticsearchSinkConfig build() {
return new ElasticsearchSinkConfig(this);
}
diff --git a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java
index 991b5b0..fd2900c 100644
--- a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java
+++ b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSource.java
@@ -89,6 +89,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
conf.set("es.nodes", config.getHostname());
conf.set("es.resource.read", config.getResource());
conf.set("es.query", config.getQuery());
+ config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(EsInputFormat.class, conf)));
diff --git a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java
index 23079ea..7b53738 100644
--- a/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfig.java
@@ -46,14 +46,14 @@ public class ElasticsearchSourceConfig extends BaseElasticsearchConfig {
private final String schema;
public ElasticsearchSourceConfig(String referenceName, String hostname, String index, String type, String query,
- String schema) {
- super(referenceName, hostname, index, type);
+ String schema, String additionalProperties) {
+ super(referenceName, hostname, index, type, additionalProperties);
this.schema = schema;
this.query = query;
}
private ElasticsearchSourceConfig(Builder builder) {
- super(builder.referenceName, builder.hostname, builder.index, builder.type);
+ super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties);
query = builder.query;
schema = builder.schema;
}
@@ -70,6 +70,7 @@ public static Builder newBuilder(ElasticsearchSourceConfig copy) {
builder.type = copy.getType();
builder.query = copy.getQuery();
builder.schema = copy.getSchema();
+ builder.additionalProperties = copy.getAdditionalProperties();
return builder;
}
@@ -118,6 +119,7 @@ public static final class Builder {
private String type;
private String query;
private String schema;
+ private String additionalProperties;
private Builder() {
}
@@ -152,6 +154,11 @@ public Builder setSchema(String schema) {
return this;
}
+ public Builder setAdditionalProperties(String additionalProperties) {
+ this.additionalProperties = additionalProperties;
+ return this;
+ }
+
public ElasticsearchSourceConfig build() {
return new ElasticsearchSourceConfig(this);
}
diff --git a/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java b/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java
index b47627b..855d3f5 100644
--- a/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java
+++ b/src/test/java/io/cdap/plugin/elastic/sink/ElasticsearchSinkConfigTest.java
@@ -18,6 +18,7 @@
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import io.cdap.plugin.elastic.ValidationAssertions;
+import io.cdap.plugin.elastic.source.ElasticsearchSourceConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -33,7 +34,8 @@ public class ElasticsearchSinkConfigTest {
"localhost:9200",
"test",
"test",
- "id"
+ "id",
+ null
);
@Test
@@ -56,6 +58,17 @@ public void testInvalidReferenceName() {
@Test
public void testInvalidHostname() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("abc:abc:abc:abc")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
+ @Test
+ public void testInvalidHostProtocol() {
ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
.setHostname("abc:abc:abc")
.build();
@@ -65,6 +78,28 @@ public void testInvalidHostname() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
}
+ @Test
+ public void testValidHttpHostname() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:9200")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testValidHttpsHostName() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:9200")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
@Test
public void testEmptyHostname() {
ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
@@ -98,6 +133,28 @@ public void testEmptyPort() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
}
+ @Test
+ public void testEmptyPortWithHttpHost() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
+ @Test
+ public void testEmptyPortWithHttpsHost() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
@Test
public void testInvalidPort() {
ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
@@ -109,6 +166,28 @@ public void testInvalidPort() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
}
+ @Test
+ public void testInvalidPortWithHttpHost() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:abc")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
+ @Test
+ public void testInvalidPortWithHttpsHost() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:abc")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
@Test
public void testInvalidNumberInPort() {
ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
@@ -120,6 +199,28 @@ public void testInvalidNumberInPort() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
}
+ @Test
+ public void testInvalidNumberInPortWithHttpHost() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:100000")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
+ @Test
+ public void testInvalidNumberInPortWithHttpsHost() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:100000")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.HOST);
+ }
+
@Test
public void testInvalidIndex() {
ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
@@ -152,4 +253,50 @@ public void testInvalidIdField() {
config.validate(failureCollector);
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSinkConfig.ID_FIELD);
}
+
+ @Test
+ public void testEmptyAdditionalProperties() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testInvalidAdditionalProperties() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("es.net.http.auth.user")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector,
+ ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES);
+ }
+
+ @Test
+ public void testValidAdditionalProperties() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("es.nodes.wan.only=true")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testValidAdditionalPropertiesWithEmptyKey() {
+ ElasticsearchSinkConfig config = ElasticsearchSinkConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("es.nodes.wan.only=true;=false")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector,
+ ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES);
+ }
}
diff --git a/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java b/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java
index f470f66..f965ed8 100644
--- a/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java
+++ b/src/test/java/io/cdap/plugin/elastic/source/ElasticsearchSourceConfigTest.java
@@ -35,7 +35,8 @@ public class ElasticsearchSourceConfigTest {
"test",
"test",
"?q=*",
- Schema.recordOf("record", Schema.Field.of("id", Schema.of(Schema.Type.LONG))).toString()
+ Schema.recordOf("record", Schema.Field.of("id", Schema.of(Schema.Type.LONG))).toString(),
+ null
);
@Test
@@ -58,6 +59,17 @@ public void testInvalidReferenceName() {
@Test
public void testInvalidHostname() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("abc:abc:abc:abc")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
+ @Test
+ public void testInvalidHostProtocol() {
ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
.setHostname("abc:abc:abc")
.build();
@@ -67,6 +79,28 @@ public void testInvalidHostname() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
}
+ @Test
+ public void testValidHttpHostname() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:9200")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testValidHttpsHostName() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:9200")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
@Test
public void testEmptyHostname() {
ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
@@ -100,6 +134,28 @@ public void testEmptyPort() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
}
+ @Test
+ public void testEmptyPortWithHttpHost() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
+ @Test
+ public void testEmptyPortWithHttpsHost() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
@Test
public void testInvalidPort() {
ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
@@ -111,6 +167,28 @@ public void testInvalidPort() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
}
+ @Test
+ public void testInvalidPortWithHttpHost() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:abc")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
+ @Test
+ public void testInvalidPortWithHttpsHost() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:abc")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
@Test
public void testInvalidNumberInPort() {
ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
@@ -122,6 +200,28 @@ public void testInvalidNumberInPort() {
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
}
+ @Test
+ public void testInvalidNumberInPortWithHttpHost() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("http://abc:100000")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
+ @Test
+ public void testInvalidNumberInPortWithHttpsHost() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setHostname("https://abc:100000")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.HOST);
+ }
+
@Test
public void testInvalidIndex() {
ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
@@ -176,4 +276,50 @@ public void testInvalidSchema() {
config.validate(failureCollector);
ValidationAssertions.assertPropertyValidationFailed(failureCollector, ElasticsearchSourceConfig.SCHEMA);
}
+
+ @Test
+ public void testEmptyAdditionalProperties() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testInvalidAdditionalProperties() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("es.net.http.auth.user")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector,
+ ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES);
+ }
+
+ @Test
+ public void testValidAdditionalProperties() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("es.nodes.wan.only=true")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testValidAdditionalPropertiesWithEmptyKey() {
+ ElasticsearchSourceConfig config = ElasticsearchSourceConfig.newBuilder(VALID_CONFIG)
+ .setAdditionalProperties("es.nodes.wan.only=true;=false")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.validate(failureCollector);
+ ValidationAssertions.assertPropertyValidationFailed(failureCollector,
+ ElasticsearchSourceConfig.ADDITIONAL_PROPERTIES);
+ }
}
diff --git a/widgets/Elasticsearch-batchsink.json b/widgets/Elasticsearch-batchsink.json
index 43955f2..2d9ee03 100644
--- a/widgets/Elasticsearch-batchsink.json
+++ b/widgets/Elasticsearch-batchsink.json
@@ -31,6 +31,18 @@
"widget-type": "textbox",
"label": "ID Field",
"name": "es.idField"
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Additional Properties",
+ "name": "additionalProperties",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "delimiter": "\n",
+ "kv-delimiter": "=",
+ "key-placeholder": "Property Name",
+ "value-placeholder": "Property Value"
+ }
}
]
}
diff --git a/widgets/Elasticsearch-batchsource.json b/widgets/Elasticsearch-batchsource.json
index 32e33c1..83be866 100644
--- a/widgets/Elasticsearch-batchsource.json
+++ b/widgets/Elasticsearch-batchsource.json
@@ -31,6 +31,18 @@
"widget-type": "textbox",
"label": "Query",
"name": "query"
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Additional Properties",
+ "name": "additionalProperties",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "delimiter": "\n",
+ "kv-delimiter": "=",
+ "key-placeholder": "Property Name",
+ "value-placeholder": "Property Value"
+ }
}
]
}