Skip to content

Commit

Permalink
Fix es.output.json for Cascading. (#898)
Browse files Browse the repository at this point in the history
Added tests to verify functionality.
fixes #885
  • Loading branch information
jbaiera committed May 8, 2017
1 parent 8bdbc70 commit 6b2654e
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 24 deletions.
@@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.hadoop.integration.cascading;

import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.AssertionLevel;
import cascading.operation.aggregator.Count;
import cascading.operation.assertion.AssertSizeLessThan;
import cascading.operation.filter.FilterNotNull;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import com.google.common.collect.Lists;
import org.elasticsearch.hadoop.HdpBootstrap;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.Stream;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Collection;
import java.util.Properties;

@RunWith(Parameterized.class)
public class AbstractCascadingHadoopJsonReadTest {

@Parameters
public static Collection<Object[]> queries() {
return QueryTestParams.params();
}

private final String indexPrefix = "json-";
private final String query;
private final boolean readMetadata;

public AbstractCascadingHadoopJsonReadTest(String query, boolean readMetadata) {
this.query = query;
this.readMetadata = readMetadata;
}

@Before
public void before() throws Exception {
RestUtils.refresh(indexPrefix + "cascading-hadoop");
}

@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap(indexPrefix + "cascading-hadoop/artists");
Pipe pipe = new Pipe("copy");

Tap out = new HadoopPrintStreamTap(Stream.NULL);
build(cfg(), in, out, pipe);
}

private void build(Properties cfg, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new HadoopFlowConnector(cfg).connect(in, out, pipe)).complete();
}

private Properties cfg() {
Properties props = HdpBootstrap.asProperties(QueryTestParams.provisionQueries(CascadingHadoopSuite.configuration));
props.put(ConfigurationOptions.ES_QUERY, query);
props.put(ConfigurationOptions.ES_READ_METADATA, readMetadata);
props.put(ConfigurationOptions.ES_OUTPUT_JSON, "true");
return props;
}
}
@@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.hadoop.integration.cascading;

import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertThat;

@RunWith(Parameterized.class)
public class AbstractCascadingLocalJsonReadTest {

@Parameters
public static Collection<Object[]> queries() {
return QueryTestParams.localParams();
}

private final String indexPrefix = "json-";
private final String query;
private final boolean readMetadata;

public AbstractCascadingLocalJsonReadTest(String query, boolean readMetadata) {
this.query = query;
this.readMetadata = readMetadata;
}

@Before
public void before() throws Exception {
RestUtils.refresh(indexPrefix + "cascading-local");
}

@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap(indexPrefix + "cascading-local/artists");
Pipe pipe = new Pipe("copy");
ByteArrayOutputStream os = new ByteArrayOutputStream();
Tap out = new OutputStreamTap(new TextLine(), os);
build(cfg(), in, out, pipe);

BufferedReader r = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(os.toByteArray())));

List<String> records = new ArrayList<>();
for (String line = r.readLine(); line != null; line = r.readLine()) {
records.add(line);
}

String doc1 = "{\"number\":\"917\",\"name\":\"Iron Maiden\",\"url\":\"http://www.last.fm/music/Iron+Maiden\",\"picture\":\"http://userserve-ak.last.fm/serve/252/22493569.jpg\",\"@timestamp\":\"2870-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}";
String doc2 = "{\"number\":\"979\",\"name\":\"Smash Mouth\",\"url\":\"http://www.last.fm/music/Smash+Mouth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/82063.jpg\",\"@timestamp\":\"2931-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}";
String doc3 = "{\"number\":\"190\",\"name\":\"Muse\",\"url\":\"http://www.last.fm/music/Muse\",\"picture\":\"http://userserve-ak.last.fm/serve/252/416514.jpg\",\"@timestamp\":\"2176-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]}";

assertThat(records, hasItems(doc1, doc2, doc3));
}

private void build(Properties cfg, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new LocalFlowConnector(cfg).connect(in, out, pipe)).complete();
}

private Properties cfg() {
Properties props = new TestSettings().getProperties();
props.put(ConfigurationOptions.ES_QUERY, query);
props.put(ConfigurationOptions.ES_READ_METADATA, readMetadata);
props.put(ConfigurationOptions.ES_OUTPUT_JSON, "true");
return props;
}
}
Expand Up @@ -42,7 +42,7 @@
import cascading.util.Update;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractCascadingHadoopSaveTest.class, AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopSearchTest.class, AbstractCascadingHadoopJsonSearchTest.class })
@Suite.SuiteClasses({ AbstractCascadingHadoopSaveTest.class, AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopSearchTest.class, AbstractCascadingHadoopJsonSearchTest.class, AbstractCascadingHadoopJsonReadTest.class })
//@Suite.SuiteClasses({ AbstractCascadingHadoopJsonSaveTest.class, AbstractCascadingHadoopJsonSearchTest.class })
public class CascadingHadoopSuite {

Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalSaveTest.class, AbstractCascadingLocalJsonSearchTest.class, AbstractCascadingLocalSearchTest.class })
//@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalJsonSearchTest.class })
@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalSaveTest.class, AbstractCascadingLocalJsonSearchTest.class, AbstractCascadingLocalSearchTest.class, AbstractCascadingLocalJsonReadTest.class })
//@Suite.SuiteClasses({ AbstractCascadingLocalJsonSaveTest.class, AbstractCascadingLocalJsonReadTest.class })
public class CascadingLocalSuite {

@ClassRule
Expand Down
Expand Up @@ -35,6 +35,9 @@
*/
public class CascadingValueWriter extends FilteringValueWriter<SinkCall<Object[], ?>> {

final static int SINK_CTX_SIZE = 1;
final static int SINK_CTX_ALIASES = 0;

private final JdkValueWriter jdkWriter;
private final WritableValueWriter writableWriter;

Expand All @@ -52,7 +55,7 @@ public CascadingValueWriter(boolean writeUnknownTypes) {
public Result write(SinkCall<Object[], ?> sinkCall, Generator generator) {
Tuple tuple = CascadingUtils.coerceToString(sinkCall);
// consider names (in case of aliases these are already applied)
List<String> names = (List<String>) sinkCall.getContext()[0];
List<String> names = (List<String>) sinkCall.getContext()[SINK_CTX_ALIASES];

generator.writeBeginObject();
for (int i = 0; i < tuple.size(); i++) {
Expand Down
Expand Up @@ -19,10 +19,7 @@
package org.elasticsearch.hadoop.cascading;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -51,6 +48,9 @@
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

import static org.elasticsearch.hadoop.cascading.CascadingValueWriter.SINK_CTX_ALIASES;
import static org.elasticsearch.hadoop.cascading.CascadingValueWriter.SINK_CTX_SIZE;

/**
* Cascading Scheme handling
*/
Expand All @@ -59,6 +59,12 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje

private static final long serialVersionUID = 4304172465362298925L;

private static final int SRC_CTX_SIZE = 4;
private static final int SRC_CTX_KEY = 0;
private static final int SRC_CTX_VALUE = 1;
private static final int SRC_CTX_ALIASES = 2;
private static final int SRC_CTX_OUTPUT_JSON = 3;

private final String index;
private final String query;
private final String nodes;
Expand All @@ -83,12 +89,13 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
super.sourcePrepare(flowProcess, sourceCall);

Object[] context = new Object[3];
context[0] = sourceCall.getInput().createKey();
context[1] = sourceCall.getInput().createValue();
Object[] context = new Object[SRC_CTX_SIZE];
context[SRC_CTX_KEY] = sourceCall.getInput().createKey();
context[SRC_CTX_VALUE] = sourceCall.getInput().createValue();
// as the tuple _might_ vary (some objects might be missing), we use a map rather then a collection
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
context[SRC_CTX_ALIASES] = CascadingUtils.alias(settings);
context[SRC_CTX_OUTPUT_JSON] = settings.getOutputAsJson();
sourceCall.setContext(context);
}

Expand All @@ -103,10 +110,10 @@ public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
super.sinkPrepare(flowProcess, sinkCall);

Object[] context = new Object[1];
Object[] context = new Object[SINK_CTX_SIZE];
// the tuple is fixed, so we can just use a collection/index
Settings settings = loadSettings(flowProcess.getConfigCopy(), false);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
context[SINK_CTX_ALIASES] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
}

Expand Down Expand Up @@ -162,13 +169,23 @@ private Settings loadSettings(Object source, boolean read) {
public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
Object[] context = sourceCall.getContext();

if (!sourceCall.getInput().next(context[0], context[1])) {
if (!sourceCall.getInput().next(context[SRC_CTX_KEY], context[1])) {
return false;
}

boolean isJSON = (Boolean) context[SRC_CTX_OUTPUT_JSON];

TupleEntry entry = sourceCall.getIncomingEntry();
Map data = (Map) context[1];
FieldAlias alias = (FieldAlias) context[2];

Map data;
if (isJSON) {
data = new HashMap(1);
data.put(new Text("data"), context[SRC_CTX_VALUE]);
} else {
data = (Map) context[SRC_CTX_VALUE];
}

FieldAlias alias = (FieldAlias) context[SRC_CTX_ALIASES];

if (entry.getFields().isDefined()) {
// lookup using writables
Expand Down

0 comments on commit 6b2654e

Please sign in to comment.