From 6b2654e5cec3e5416915e0177738d8c38f8da7b4 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 7 Dec 2016 21:57:33 -0500 Subject: [PATCH] Fix `es.output.json` for Cascading. (#898) Added tests to verify functionality. fixes #885 --- .../AbstractCascadingHadoopJsonReadTest.java | 97 ++++++++++++++++ .../AbstractCascadingLocalJsonReadTest.java | 104 ++++++++++++++++++ .../cascading/CascadingHadoopSuite.java | 2 +- .../cascading/CascadingLocalSuite.java | 4 +- .../cascading/CascadingValueWriter.java | 5 +- .../hadoop/cascading/EsHadoopScheme.java | 43 +++++--- .../hadoop/cascading/EsLocalScheme.java | 32 ++++-- 7 files changed, 263 insertions(+), 24 deletions(-) create mode 100644 cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingHadoopJsonReadTest.java create mode 100644 cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonReadTest.java diff --git a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingHadoopJsonReadTest.java b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingHadoopJsonReadTest.java new file mode 100644 index 000000000..e74d6948d --- /dev/null +++ b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingHadoopJsonReadTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.hadoop.integration.cascading; + +import cascading.flow.hadoop.HadoopFlowConnector; +import cascading.operation.AssertionLevel; +import cascading.operation.aggregator.Count; +import cascading.operation.assertion.AssertSizeLessThan; +import cascading.operation.filter.FilterNotNull; +import cascading.pipe.Each; +import cascading.pipe.Every; +import cascading.pipe.GroupBy; +import cascading.pipe.Pipe; +import cascading.scheme.local.TextLine; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import com.google.common.collect.Lists; +import org.elasticsearch.hadoop.HdpBootstrap; +import org.elasticsearch.hadoop.QueryTestParams; +import org.elasticsearch.hadoop.Stream; +import org.elasticsearch.hadoop.cascading.EsTap; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; +import org.elasticsearch.hadoop.mr.RestUtils; +import org.elasticsearch.hadoop.util.StringUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Collection; +import java.util.Properties; + +@RunWith(Parameterized.class) +public class AbstractCascadingHadoopJsonReadTest { + + @Parameters + public static Collection queries() { + return QueryTestParams.params(); + } + + private final String indexPrefix = "json-"; + private final String query; + private final boolean readMetadata; + + public AbstractCascadingHadoopJsonReadTest(String query, boolean readMetadata) { + this.query = query; + this.readMetadata = readMetadata; + } + + @Before + public void before() throws Exception { + RestUtils.refresh(indexPrefix + "cascading-hadoop"); + } + + @Test + public void testReadFromES() throws Exception { + Tap in = new EsTap(indexPrefix + "cascading-hadoop/artists"); + Pipe pipe = new Pipe("copy"); + + Tap out = new HadoopPrintStreamTap(Stream.NULL); + build(cfg(), in, out, pipe); + } + + private void build(Properties cfg, Tap in, Tap out, Pipe pipe) { + StatsUtils.proxy(new HadoopFlowConnector(cfg).connect(in, out, pipe)).complete(); + } + + private Properties cfg() { + Properties props = HdpBootstrap.asProperties(QueryTestParams.provisionQueries(CascadingHadoopSuite.configuration)); + props.put(ConfigurationOptions.ES_QUERY, query); + props.put(ConfigurationOptions.ES_READ_METADATA, readMetadata); + props.put(ConfigurationOptions.ES_OUTPUT_JSON, "true"); + return props; + } +} \ No newline at end of file diff --git a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonReadTest.java b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonReadTest.java new file mode 100644 index 000000000..4037dc63c --- /dev/null +++ b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonReadTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.hadoop.integration.cascading; + +import cascading.flow.local.LocalFlowConnector; +import cascading.pipe.Pipe; +import cascading.scheme.local.TextLine; +import cascading.tap.Tap; +import org.elasticsearch.hadoop.QueryTestParams; +import org.elasticsearch.hadoop.cascading.EsTap; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; +import org.elasticsearch.hadoop.mr.RestUtils; +import org.elasticsearch.hadoop.util.TestSettings; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import static org.hamcrest.Matchers.hasItems; +import static org.junit.Assert.assertThat; + +@RunWith(Parameterized.class) +public class AbstractCascadingLocalJsonReadTest { + + @Parameters + public static Collection queries() { + return QueryTestParams.localParams(); + } + + private final String indexPrefix = "json-"; + private final String query; + private final boolean readMetadata; + + public AbstractCascadingLocalJsonReadTest(String query, boolean readMetadata) { + this.query = query; + this.readMetadata = readMetadata; + } + + @Before + public void before() throws Exception { + RestUtils.refresh(indexPrefix + "cascading-local"); + } + + @Test + public void testReadFromES() throws Exception { + Tap in = new EsTap(indexPrefix + "cascading-local/artists"); + Pipe pipe = new Pipe("copy"); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + Tap out = new OutputStreamTap(new TextLine(), os); + build(cfg(), in, out, pipe); + + BufferedReader r = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(os.toByteArray()))); + + List records = new ArrayList<>(); + for (String line = r.readLine(); line != null; line = r.readLine()) { + records.add(line); + } + + String doc1 = "{\"number\":\"917\",\"name\":\"Iron Maiden\",\"url\":\"http://www.last.fm/music/Iron+Maiden\",\"picture\":\"http://userserve-ak.last.fm/serve/252/22493569.jpg\",\"@timestamp\":\"2870-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}"; + String doc2 = "{\"number\":\"979\",\"name\":\"Smash Mouth\",\"url\":\"http://www.last.fm/music/Smash+Mouth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/82063.jpg\",\"@timestamp\":\"2931-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}"; + String doc3 = "{\"number\":\"190\",\"name\":\"Muse\",\"url\":\"http://www.last.fm/music/Muse\",\"picture\":\"http://userserve-ak.last.fm/serve/252/416514.jpg\",\"@timestamp\":\"2176-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}"; + + assertThat(records, hasItems(doc1, doc2, doc3)); + } + + private void build(Properties cfg, Tap in, Tap out, Pipe pipe) { + StatsUtils.proxy(new LocalFlowConnector(cfg).connect(in, out, pipe)).complete(); + } + + private Properties cfg() { + Properties props = new TestSettings().getProperties(); + props.put(ConfigurationOptions.ES_QUERY, query); + props.put(ConfigurationOptions.ES_READ_METADATA, readMetadata); + props.put(ConfigurationOptions.ES_OUTPUT_JSON, "true"); + return props; + } +} \ No newline at end of file diff --git a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingHadoopSuite.java b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingHadoopSuite.java index b8c891593..c36b50df0 100644 --- a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingHadoopSuite.java +++ b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingHadoopSuite.java @@ -42,7 +42,7 @@ import cascading.util.Update; @RunWith(Suite.class) -@Suite.SuiteClasses({ AbstractCascadingHadoopSaveTest.class, AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopSearchTest.class, AbstractCascadingHadoopJsonSearchTest.class }) +@Suite.SuiteClasses({ AbstractCascadingHadoopSaveTest.class, AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopSearchTest.class, AbstractCascadingHadoopJsonSearchTest.class, AbstractCascadingHadoopJsonReadTest.class }) //@Suite.SuiteClasses({ AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopJsonSearchTest.class }) public class CascadingHadoopSuite { diff --git a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingLocalSuite.java b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingLocalSuite.java index 906dc1880..fe16990b0 100644 --- a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingLocalSuite.java +++ b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/CascadingLocalSuite.java @@ -25,8 +25,8 @@ import org.junit.runners.Suite; @RunWith(Suite.class) -@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalSaveTest.class, AbstractCascadingLocalJsonSearchTest.class, AbstractCascadingLocalSearchTest.class }) -//@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalJsonSearchTest.class }) +@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalSaveTest.class, AbstractCascadingLocalJsonSearchTest.class, AbstractCascadingLocalSearchTest.class, AbstractCascadingLocalJsonReadTest.class }) +//@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalJsonReadTest.class }) public class CascadingLocalSuite { @ClassRule diff --git a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/CascadingValueWriter.java b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/CascadingValueWriter.java index 73a30fdc5..15c33650b 100644 --- a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/CascadingValueWriter.java +++ b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/CascadingValueWriter.java @@ -35,6 +35,9 @@ */ public class CascadingValueWriter extends FilteringValueWriter> { + final static int SINK_CTX_SIZE = 1; + final static int SINK_CTX_ALIASES = 0; + private final JdkValueWriter jdkWriter; private final WritableValueWriter writableWriter; @@ -52,7 +55,7 @@ public CascadingValueWriter(boolean writeUnknownTypes) { public Result write(SinkCall sinkCall, Generator generator) { Tuple tuple = CascadingUtils.coerceToString(sinkCall); // consider names (in case of aliases these are already applied) - List names = (List) sinkCall.getContext()[0]; + List names = (List) sinkCall.getContext()[SINK_CTX_ALIASES]; generator.writeBeginObject(); for (int i = 0; i < tuple.size(); i++) { diff --git a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java index e7ecc4efe..70267df3d 100644 --- a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java +++ b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java @@ -19,10 +19,7 @@ package org.elasticsearch.hadoop.cascading; import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,6 +48,9 @@ import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; +import static org.elasticsearch.hadoop.cascading.CascadingValueWriter.SINK_CTX_ALIASES; +import static org.elasticsearch.hadoop.cascading.CascadingValueWriter.SINK_CTX_SIZE; + /** * Cascading Scheme handling */ @@ -59,6 +59,12 @@ class EsHadoopScheme extends Scheme flowProcess, SourceCall sourceCall) throws IOException { super.sourcePrepare(flowProcess, sourceCall); - Object[] context = new Object[3]; - context[0] = sourceCall.getInput().createKey(); - context[1] = sourceCall.getInput().createValue(); + Object[] context = new Object[SRC_CTX_SIZE]; + context[SRC_CTX_KEY] = sourceCall.getInput().createKey(); + context[SRC_CTX_VALUE] = sourceCall.getInput().createValue(); // as the tuple _might_ vary (some objects might be missing), we use a map rather then a collection Settings settings = loadSettings(flowProcess.getConfigCopy(), true); - context[2] = CascadingUtils.alias(settings); + context[SRC_CTX_ALIASES] = CascadingUtils.alias(settings); + context[SRC_CTX_OUTPUT_JSON] = settings.getOutputAsJson(); sourceCall.setContext(context); } @@ -103,10 +110,10 @@ public void sourceCleanup(FlowProcess flowProcess, SourceCall flowProcess, SinkCall sinkCall) throws IOException { super.sinkPrepare(flowProcess, sinkCall); - Object[] context = new Object[1]; + Object[] context = new Object[SINK_CTX_SIZE]; // the tuple is fixed, so we can just use a collection/index Settings settings = loadSettings(flowProcess.getConfigCopy(), false); - context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields()); + context[SINK_CTX_ALIASES] = CascadingUtils.fieldToAlias(settings, getSinkFields()); sinkCall.setContext(context); } @@ -162,13 +169,23 @@ private Settings loadSettings(Object source, boolean read) { public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException { Object[] context = sourceCall.getContext(); - if (!sourceCall.getInput().next(context[0], context[1])) { + if (!sourceCall.getInput().next(context[SRC_CTX_KEY], context[1])) { return false; } + boolean isJSON = (Boolean) context[SRC_CTX_OUTPUT_JSON]; + TupleEntry entry = sourceCall.getIncomingEntry(); - Map data = (Map) context[1]; - FieldAlias alias = (FieldAlias) context[2]; + + Map data; + if (isJSON) { + data = new HashMap(1); + data.put(new Text("data"), context[SRC_CTX_VALUE]); + } else { + data = (Map) context[SRC_CTX_VALUE]; + } + + FieldAlias alias = (FieldAlias) context[SRC_CTX_ALIASES]; if (entry.getFields().isDefined()) { // lookup using writables diff --git a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalScheme.java b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalScheme.java index 0d2228b19..7372cff33 100644 --- a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalScheme.java +++ b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalScheme.java @@ -19,6 +19,7 @@ package org.elasticsearch.hadoop.cascading; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -32,7 +33,6 @@ import org.elasticsearch.hadoop.rest.ScrollQuery; import org.elasticsearch.hadoop.rest.stats.Stats; import org.elasticsearch.hadoop.util.FieldAlias; -import org.elasticsearch.hadoop.util.SettingsUtils; import org.elasticsearch.hadoop.util.StringUtils; import cascading.flow.FlowProcess; @@ -44,6 +44,9 @@ import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; +import static org.elasticsearch.hadoop.cascading.CascadingValueWriter.SINK_CTX_ALIASES; +import static org.elasticsearch.hadoop.cascading.CascadingValueWriter.SINK_CTX_SIZE; + /** * Cascading Scheme handling */ @@ -51,6 +54,10 @@ class EsLocalScheme extends Scheme flowProcess, SourceCall sourceCall) throws IOException { super.sourcePrepare(flowProcess, sourceCall); - Object[] context = new Object[1]; + Object[] context = new Object[SRC_CTX_SIZE]; Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props); - context[0] = CascadingUtils.alias(settings); + context[SRC_CTX_ALIASES] = CascadingUtils.alias(settings); + context[SRC_CTX_OUTPUT_JSON] = settings.getOutputAsJson(); sourceCall.setContext(context); } @@ -115,9 +123,9 @@ private void report(Stats stats, FlowProcess flowProcess) { public void sinkPrepare(FlowProcess flowProcess, SinkCall sinkCall) throws IOException { super.sinkPrepare(flowProcess, sinkCall); - Object[] context = new Object[1]; + Object[] context = new Object[SINK_CTX_SIZE]; Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props); - context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields()); + context[SINK_CTX_ALIASES] = CascadingUtils.fieldToAlias(settings, getSinkFields()); sinkCall.setContext(context); } @@ -150,9 +158,19 @@ public boolean source(FlowProcess flowProcess, SourceCall data = (Map) query.next()[1]; - FieldAlias alias = (FieldAlias) sourceCall.getContext()[0]; + + Map data; + if (isJSON) { + data = new HashMap(1); + data.put("data", query.next()[1]); + } else { + data = (Map) query.next()[1]; + } + + FieldAlias alias = (FieldAlias) sourceCall.getContext()[SRC_CTX_ALIASES]; if (entry.getFields().isDefined()) { // lookup using writables