From a11aa555ba2c6bd271da365bd111ba4956048b87 Mon Sep 17 00:00:00 2001 From: Philip Chan Date: Tue, 14 Nov 2017 11:21:21 -0800 Subject: [PATCH] add regex hack to parse doc id --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 023eb13c0cf6..730f45d6e8c4 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -41,6 +41,8 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import org.apache.beam.sdk.annotations.Experimental; @@ -835,10 +837,24 @@ public void startBundle(StartBundleContext context) throws Exception { @ProcessElement public void processElement(ProcessContext context) throws Exception { String document = context.element(); - batch.add(String.format("{ \"index\" : {} }%n%s%n", document)); + String _id; + + // TODO(Philip): Massive hack to parse incoming ES doc ID to use as the ID in bulk request. + Pattern pattern = Pattern.compile("\"id\":(\\d+),"); + Matcher matcher = pattern.matcher(document); + if (matcher.find()) { + _id = matcher.group(1); + String s = String.format("{ \"index\" : {\"_id\":\"%s\"} }%n%s%n", _id, document); + System.out.print(s); + batch.add(s); + } + else { + System.out.print("_id not found.%n"); + batch.add(String.format("{ \"index\" : {} }%n%s%n", document)); + } currentBatchSizeBytes += document.getBytes(StandardCharsets.UTF_8).length; if (batch.size() >= spec.getMaxBatchSize() - || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) { + || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) { flushBatch(); } }