Skip to content

Commit

Permalink
Fix es.output.json for Cascading.
Browse files Browse the repository at this point in the history
Added tests to verify functionality.
fixes elastic#885
  • Loading branch information
jbaiera committed Nov 17, 2016
1 parent c196480 commit 15dfa29
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 11 deletions.
@@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.hadoop.integration.cascading;

import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.AssertionLevel;
import cascading.operation.aggregator.Count;
import cascading.operation.assertion.AssertSizeLessThan;
import cascading.operation.filter.FilterNotNull;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import com.google.common.collect.Lists;
import org.elasticsearch.hadoop.HdpBootstrap;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.Stream;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Collection;
import java.util.Properties;

@RunWith(Parameterized.class)
public class AbstractCascadingHadoopJsonReadTest {

@Parameters
public static Collection<Object[]> queries() {
return QueryTestParams.params();
}

private final String indexPrefix = "json-";
private final String query;
private final boolean readMetadata;

public AbstractCascadingHadoopJsonReadTest(String query, boolean readMetadata) {
this.query = query;
this.readMetadata = readMetadata;
}

@Before
public void before() throws Exception {
RestUtils.refresh(indexPrefix + "cascading-hadoop");
}

@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap(indexPrefix + "cascading-hadoop/artists");
Pipe pipe = new Pipe("copy");

Tap out = new HadoopPrintStreamTap(Stream.NULL);
build(cfg(), in, out, pipe);
}

private void build(Properties cfg, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new HadoopFlowConnector(cfg).connect(in, out, pipe)).complete();
}

private Properties cfg() {
Properties props = HdpBootstrap.asProperties(QueryTestParams.provisionQueries(CascadingHadoopSuite.configuration));
props.put(ConfigurationOptions.ES_QUERY, query);
props.put(ConfigurationOptions.ES_READ_METADATA, readMetadata);
props.put(ConfigurationOptions.ES_OUTPUT_JSON, "true");
return props;
}
}
@@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.hadoop.integration.cascading;

import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertThat;

@RunWith(Parameterized.class)
public class AbstractCascadingLocalJsonReadTest {

@Parameters
public static Collection<Object[]> queries() {
return QueryTestParams.localParams();
}

private final String indexPrefix = "json-";
private final String query;
private final boolean readMetadata;

public AbstractCascadingLocalJsonReadTest(String query, boolean readMetadata) {
this.query = query;
this.readMetadata = readMetadata;
}

@Before
public void before() throws Exception {
RestUtils.refresh(indexPrefix + "cascading-local");
}

@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap(indexPrefix + "cascading-local/artists");
Pipe pipe = new Pipe("copy");
ByteArrayOutputStream os = new ByteArrayOutputStream();
Tap out = new OutputStreamTap(new TextLine(), os);
build(cfg(), in, out, pipe);

BufferedReader r = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(os.toByteArray())));

List<String> records = new ArrayList<>();
for (String line = r.readLine(); line != null; line = r.readLine()) {
records.add(line);
}

String doc1 = "{\"number\":\"917\",\"name\":\"Iron Maiden\",\"url\":\"http://www.last.fm/music/Iron+Maiden\",\"picture\":\"http://userserve-ak.last.fm/serve/252/22493569.jpg\",\"@timestamp\":\"2870-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}";
String doc2 = "{\"number\":\"979\",\"name\":\"Smash Mouth\",\"url\":\"http://www.last.fm/music/Smash+Mouth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/82063.jpg\",\"@timestamp\":\"2931-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}";
String doc3 = "{\"number\":\"190\",\"name\":\"Muse\",\"url\":\"http://www.last.fm/music/Muse\",\"picture\":\"http://userserve-ak.last.fm/serve/252/416514.jpg\",\"@timestamp\":\"2176-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}";

assertThat(records, hasItems(doc1, doc2, doc3));
}

private void build(Properties cfg, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new LocalFlowConnector(cfg).connect(in, out, pipe)).complete();
}

private Properties cfg() {
Properties props = new TestSettings().getProperties();
props.put(ConfigurationOptions.ES_QUERY, query);
props.put(ConfigurationOptions.ES_READ_METADATA, readMetadata);
props.put(ConfigurationOptions.ES_OUTPUT_JSON, "true");
return props;
}
}
Expand Up @@ -42,7 +42,7 @@
import cascading.util.Update;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractCascadingHadoopSaveTest.class, AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopSearchTest.class, AbstractCascadingHadoopJsonSearchTest.class })
@Suite.SuiteClasses({ AbstractCascadingHadoopSaveTest.class, AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopSearchTest.class, AbstractCascadingHadoopJsonSearchTest.class, AbstractCascadingHadoopJsonReadTest.class })
//@Suite.SuiteClasses({ AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopJsonSearchTest.class })
public class CascadingHadoopSuite {

Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalSaveTest.class, AbstractCascadingLocalJsonSearchTest.class, AbstractCascadingLocalSearchTest.class })
//@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalJsonSearchTest.class })
@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalSaveTest.class, AbstractCascadingLocalJsonSearchTest.class, AbstractCascadingLocalSearchTest.class, AbstractCascadingLocalJsonReadTest.class })
//@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalJsonReadTest.class })
public class CascadingLocalSuite {

@ClassRule
Expand Down
Expand Up @@ -19,10 +19,7 @@
package org.elasticsearch.hadoop.cascading;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -83,12 +80,13 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
super.sourcePrepare(flowProcess, sourceCall);

Object[] context = new Object[3];
Object[] context = new Object[4];
context[0] = sourceCall.getInput().createKey();
context[1] = sourceCall.getInput().createValue();
// as the tuple _might_ vary (some objects might be missing), we use a map rather then a collection
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
context[3] = settings.getOutputAsJson();
sourceCall.setContext(context);
}

Expand Down Expand Up @@ -166,8 +164,18 @@ public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], Rec
return false;
}

boolean isJSON = (Boolean) context[3];

TupleEntry entry = sourceCall.getIncomingEntry();
Map data = (Map) context[1];

Map data;
if (isJSON) {
data = new HashMap(1);
data.put(new Text("data"), context[1]);
} else {
data = (Map) context[1];
}

FieldAlias alias = (FieldAlias) context[2];

if (entry.getFields().isDefined()) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.hadoop.cascading;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -74,9 +75,10 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object[], ScrollQuery> sourceCall) throws IOException {
super.sourcePrepare(flowProcess, sourceCall);

Object[] context = new Object[1];
Object[] context = new Object[2];
Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
context[1] = settings.getOutputAsJson();
sourceCall.setContext(context);
}

Expand Down Expand Up @@ -150,8 +152,18 @@ public boolean source(FlowProcess<Properties> flowProcess, SourceCall<Object[],
return false;
}

boolean isJSON = (Boolean) sourceCall.getContext()[1];

TupleEntry entry = sourceCall.getIncomingEntry();
Map<String, ?> data = (Map<String, ?>) query.next()[1];

Map<String, Object> data;
if (isJSON) {
data = new HashMap<String, Object>(1);
data.put("data", query.next()[1]);
} else {
data = (Map<String, Object>) query.next()[1];
}

FieldAlias alias = (FieldAlias) sourceCall.getContext()[0];

if (entry.getFields().isDefined()) {
Expand Down

0 comments on commit 15dfa29

Please sign in to comment.