Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docs/Elasticsearch-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ with a stream batch source and Elasticsearch as a sink.

Configuration
-------------
**referenceName:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.
**Reference Name:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.

**es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled)

Expand All @@ -27,6 +27,9 @@ exist, it will be created. (Macro-enabled)
**es.idField:** The field that will determine the id for the document; it should match a fieldname
in the Structured Record of the input. (Macro-enabled)

**Additional Properties:** Additional properties to use with the es-hadoop client when writing the data,
documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html).
(Macro-enabled)

Example
-------
Expand All @@ -44,3 +47,19 @@ in the record. Each run, the documents will be updated if they are still present
"es.idField": "id"
}
}

This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co),
and writes the data to the specified index (megacorp) and type (employee). The data is indexed using the id field
in the record. Each run, the documents will be updated if they are still present in the source:

{
"name": "Elasticsearch",
"type": "batchsink",
"properties": {
"es.host": "https://remote.region.gcp.cloud.es.io:9243",
"es.index": "megacorp",
"es.type": "employee",
"es.idField": "id",
"additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true"
}
}
28 changes: 27 additions & 1 deletion docs/Elasticsearch-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ in an index and type from Elasticsearch and store the data in an HBase table.

Configuration
-------------
**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.

**es.host:** The hostname and port for the Elasticsearch instance. (Macro-enabled)

Expand All @@ -26,6 +26,9 @@ see Elasticsearch for additional query examples. (Macro-enabled)

**schema:** The schema or mapping of the data in Elasticsearch.

**Additional Properties:** Additional properties to use with the es-hadoop client when reading the data,
documented at [elastic.co](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html).
(Macro-enabled)

Example
-------
Expand All @@ -50,3 +53,26 @@ All data from the index will be read on each run:
{\"name\":\"age\",\"type\":\"int\"}]}"
}
}

This example connects to Elasticsearch, which is running in a remote restricted environment (e.g. elastic.co),
and reads in records in the specified index (*megacorp*) and type (*employee*) which match the query to
(in this case) select all records. All data from the index will be read on each run:

{
"name": "Elasticsearch",
"type": "batchsource",
"properties": {
"es.host": "https://remote.region.gcp.cloud.es.io:9243",
"es.index": "megacorp",
"es.type": "employee",
"query": "?q=*",
"schema": "{
\"type\":\"record\",
\"name\":\"etlSchemaBody\",
\"fields\":[
{\"name\":\"id\",\"type\":\"long\"},
{\"name\":\"name\",\"type\":\"string\"},
{\"name\":\"age\",\"type\":\"int\"}]}",
"additionalProperties": "es.net.http.auth.user=username\nes.net.http.auth.pass=password\nes.nodes.wan.only=true"
}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@
<hadoop.version>2.3.0</hadoop.version>
<junit.version>4.11</junit.version>
<hydrator.version>2.3.0-SNAPSHOT</hydrator.version>
<es.version>7.5.2</es.version>
<es-hadoop.version>7.5.2</es-hadoop.version>
<es.version>7.6.2</es.version>
<es-hadoop.version>7.6.2</es-hadoop.version>
<slf4j.version>1.7.5</slf4j.version>
<netty.version>4.0.30.Final</netty.version>
<log4j.version>2.7</log4j.version>
Expand Down
64 changes: 58 additions & 6 deletions src/main/java/io/cdap/plugin/elastic/BaseElasticsearchConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,29 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.IdUtils;
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.common.ReferencePluginConfig;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Basic config class for Elasticsearch plugin.
*/
public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {

public static final String INDEX_NAME = "es.index";
public static final String TYPE_NAME = "es.type";
public static final String HOST = "es.host";
public static final String ADDITIONAL_PROPERTIES = "additionalProperties";

private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch instance; " +
"for example, localhost:9200.";
"for example, localhost:9200 or https://remote.region.gcp.cloud.es.io:9243.";
private static final String INDEX_DESCRIPTION = "The name of the index to query.";
private static final String TYPE_DESCRIPTION = "The name of the type where the data is stored.";
private static final String ADDITIONAL_PROPERTIES_DESCRIPTION = "Additional client properties for ES-Hadoop";

@Name(HOST)
@Description(HOST_DESCRIPTION)
Expand All @@ -52,11 +61,19 @@ public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {
@Macro
private final String type;

public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type) {
@Nullable
@Name(ADDITIONAL_PROPERTIES)
@Description(ADDITIONAL_PROPERTIES_DESCRIPTION)
@Macro
private final String additionalProperties;

public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type,
String additionalProperties) {
super(referenceName);
this.hostname = hostname;
this.index = index;
this.type = type;
this.additionalProperties = additionalProperties;
}

public String getHostname() {
Expand All @@ -75,6 +92,28 @@ public String getResource() {
return String.format("%s/%s", index, type);
}

@Nullable
public String getAdditionalProperties() {
return additionalProperties;
}

public Map<String, String> getAdditionalPropertiesMap() {
Map<String, String> propertiesMap = new HashMap<>();
if (additionalProperties == null || additionalProperties.trim().isEmpty()) {
return propertiesMap;
}

KeyValueListParser parser = new KeyValueListParser("\n", "=");
parser.parse(additionalProperties).forEach(kv -> {
if (kv.getKey().trim().isEmpty()) {
throw new IllegalArgumentException("Key should not be empty");
} else {
propertiesMap.put(kv.getKey().trim(), kv.getValue().trim());
}
});
return propertiesMap;
}

public void validate(FailureCollector collector) {
IdUtils.validateReferenceName(referenceName, collector);

Expand All @@ -93,20 +132,33 @@ public void validate(FailureCollector collector) {
if (!containsMacro(TYPE_NAME) && Strings.isNullOrEmpty(type)) {
collector.addFailure("Type must be specified.", null).withConfigProperty(TYPE_NAME);
}

if (!containsMacro(ADDITIONAL_PROPERTIES)) {
try {
getAdditionalPropertiesMap();
} catch (Exception e) {
collector.addFailure("Additional properties must be a valid KV map", null)
.withConfigProperty(ADDITIONAL_PROPERTIES).withStacktrace(e.getStackTrace());
}
}
}

private void validateHost(FailureCollector collector) {
String[] hostParts = hostname.split(":");

// Elasticsearch Hadoop does not support IPV6 https://github.com/elastic/elasticsearch-hadoop/issues/1105
if (hostParts.length != 2) {
// Length range [2,3] allowed for https hosts
if ((hostParts.length < 2) || (hostParts.length > 3) || (hostParts.length == 3
&& !(hostParts[0].equalsIgnoreCase("https") || hostParts[0].equalsIgnoreCase("http")))) {

collector.addFailure(
"Invalid format of hostname",
"Hostname and port must be specified for the Elasticsearch instance, for example: 'localhost:9200'"
"Hostname and port must be specified for the Elasticsearch instance, " +
"for example: 'localhost:9200' or https://remote.region.gcp.cloud.es.io:9243"
).withConfigProperty(HOST);
} else {
String host = hostParts[0];
String port = hostParts[1];
String host = String.join(":", Arrays.asList(hostParts).subList(0, hostParts.length - 1));
String port = hostParts[hostParts.length - 1];

if (host.isEmpty()) {
collector.addFailure("Host should not be empty.", null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void prepareRun(BatchSinkContext context) throws IOException {
conf.set("es.resource.write", config.getResource());
conf.set("es.input.json", "yes");
conf.set("es.mapping.id", config.getIdField());
config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v));

context.addOutput(Output.of(config.referenceName, new SinkOutputFormatProvider(EsOutputFormat.class, conf)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ public class ElasticsearchSinkConfig extends BaseElasticsearchConfig {
@Macro
private final String idField;

public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField) {
super(referenceName, hostname, index, type);
public ElasticsearchSinkConfig(String referenceName, String hostname, String index, String type, String idField,
String additionalProperties) {
super(referenceName, hostname, index, type, additionalProperties);
this.idField = idField;
}

private ElasticsearchSinkConfig(Builder builder) {
super(builder.referenceName, builder.hostname, builder.index, builder.type);
super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties);
idField = builder.idField;
}

Expand All @@ -58,6 +59,7 @@ public static Builder newBuilder(ElasticsearchSinkConfig copy) {
builder.index = copy.getIndex();
builder.type = copy.getType();
builder.idField = copy.getIdField();
builder.additionalProperties = copy.getAdditionalProperties();
return builder;
}

Expand All @@ -82,6 +84,7 @@ public static final class Builder {
private String index;
private String type;
private String idField;
private String additionalProperties;

private Builder() {
}
Expand Down Expand Up @@ -111,6 +114,11 @@ public Builder setIdField(String idField) {
return this;
}

public Builder setAdditionalProperties(String additionalProperties) {
this.additionalProperties = additionalProperties;
return this;
}

public ElasticsearchSinkConfig build() {
return new ElasticsearchSinkConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
conf.set("es.nodes", config.getHostname());
conf.set("es.resource.read", config.getResource());
conf.set("es.query", config.getQuery());
config.getAdditionalPropertiesMap().forEach((k, v) -> conf.set(k, v));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(EsInputFormat.class, conf)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public class ElasticsearchSourceConfig extends BaseElasticsearchConfig {
private final String schema;

public ElasticsearchSourceConfig(String referenceName, String hostname, String index, String type, String query,
String schema) {
super(referenceName, hostname, index, type);
String schema, String additionalProperties) {
super(referenceName, hostname, index, type, additionalProperties);
this.schema = schema;
this.query = query;
}

private ElasticsearchSourceConfig(Builder builder) {
super(builder.referenceName, builder.hostname, builder.index, builder.type);
super(builder.referenceName, builder.hostname, builder.index, builder.type, builder.additionalProperties);
query = builder.query;
schema = builder.schema;
}
Expand All @@ -70,6 +70,7 @@ public static Builder newBuilder(ElasticsearchSourceConfig copy) {
builder.type = copy.getType();
builder.query = copy.getQuery();
builder.schema = copy.getSchema();
builder.additionalProperties = copy.getAdditionalProperties();
return builder;
}

Expand Down Expand Up @@ -118,6 +119,7 @@ public static final class Builder {
private String type;
private String query;
private String schema;
private String additionalProperties;

private Builder() {
}
Expand Down Expand Up @@ -152,6 +154,11 @@ public Builder setSchema(String schema) {
return this;
}

public Builder setAdditionalProperties(String additionalProperties) {
this.additionalProperties = additionalProperties;
return this;
}

public ElasticsearchSourceConfig build() {
return new ElasticsearchSourceConfig(this);
}
Expand Down
Loading