From 6f6c09f0c8232afd68a9c6c26d48e4ac7bb226ce Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Wed, 21 Jun 2017 10:14:08 +0200 Subject: [PATCH] [BEAM-2488] Elasticsearch IO should read also in replica shards --- sdks/java/io/elasticsearch/pom.xml | 8 ++++++++ .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 11 +---------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml index 03632cea0007..c8e308c3ceac 100644 --- a/sdks/java/io/elasticsearch/pom.xml +++ b/sdks/java/io/elasticsearch/pom.xml @@ -137,6 +137,14 @@ test + + + net.java.dev.jna + jna + 4.1.0 + test + + org.apache.beam beam-runners-direct-java diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index e3965dc6a0c0..fa67fe194f78 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -455,16 +455,7 @@ public List> split( while (shards.hasNext()) { Map.Entry shardJson = shards.next(); String shardId = shardJson.getKey(); - JsonNode value = (JsonNode) shardJson.getValue(); - boolean isPrimaryShard = - value - .path(0) - .path("routing") - .path("primary") - .asBoolean(); - if (isPrimaryShard) { - sources.add(new BoundedElasticsearchSource(spec, shardId)); - } + sources.add(new BoundedElasticsearchSource(spec, shardId)); } checkArgument(!sources.isEmpty(), "No primary shard found"); return sources;