Skip to content

Commit

Permalink
Add Pig integration tests for date formatting
Browse files Browse the repository at this point in the history
Add Hive integration tests for date formatting
Improve yet again Hive compatibility on Windows
Beef up integration tests for timestamp formatting
Add timestamp field to sample data set
Add integration tests for formatting to MR old & new
Add integration tests to Cascading suite
Expand Cascading integration suite with JSON data

Fix elastic#187
  • Loading branch information
costin committed Apr 16, 2014
1 parent 4c0a4a4 commit 00b2e1f
Show file tree
Hide file tree
Showing 41 changed files with 2,789 additions and 2,050 deletions.
@@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.integration.cascading;

import java.util.Properties;

import org.elasticsearch.hadoop.HdpBootstrap;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.util.TestUtils;
import org.junit.Test;

import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

public class AbstractCascadingHadoopJsonSaveTest {

private static final String INPUT = TestUtils.sampleArtistsJson(CascadingHadoopSuite.configuration);

@Test
public void testWriteToES() throws Exception {
// local file-system source
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-hadoop/artists");

Pipe pipe = new Pipe("copy");
build(cfg(), in, out, pipe);
}

@Test(expected = Exception.class)
public void testIndexAutoCreateDisabled() throws Exception {
Properties cfg = cfg();
cfg.setProperty(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "false");

// local file-system source
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-hadoop/non-existing", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(cfg, in, out, pipe);
}

@Test
public void testIndexPattern() throws Exception {
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-hadoop/pattern-{number}", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(cfg(), in, out, pipe);
}

@Test
public void testIndexPatternWithFormat() throws Exception {
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-hadoop/pattern-format-{@timestamp:YYYY-MM-dd}", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(cfg(), in, out, pipe);
}

private void build(Properties cfg, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new HadoopFlowConnector(cfg).connect(in, out, pipe)).complete();
}

private Properties cfg() {
Properties props = HdpBootstrap.asProperties(QueryTestParams.provisionQueries(CascadingHadoopSuite.configuration));
props.put(ConfigurationOptions.ES_INPUT_JSON, "true");
return props;
}

private Tap sourceTap() {
return new Hfs(new TextDelimited(new Fields("line")), INPUT);
}
}
@@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.integration.cascading;

import java.io.OutputStream;
import java.util.Collection;
import java.util.Properties;

import org.elasticsearch.hadoop.HdpBootstrap;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.Stream;
import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.AssertionLevel;
import cascading.operation.aggregator.Count;
import cascading.operation.assertion.AssertSizeLessThan;
import cascading.operation.filter.FilterNotNull;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.Tap;
import cascading.tuple.Fields;

@RunWith(Parameterized.class)
public class AbstractCascadingHadoopJsonSearchTest {

@Parameters
public static Collection<Object[]> queries() {
return QueryTestParams.params();
}

private final String indexPrefix = "json-";
private final String query;

public AbstractCascadingHadoopJsonSearchTest(String query) {
this.query = query;
}

private OutputStream OUT = Stream.NULL.stream();

@Before
public void before() throws Exception {
RestUtils.refresh(indexPrefix + "cascading-hadoop");
}


@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap(indexPrefix + "cascading-hadoop/artists");
Pipe pipe = new Pipe("copy");
pipe = new Each(pipe, new FilterNotNull());
pipe = new Each(pipe, AssertionLevel.STRICT, new AssertSizeLessThan(5));
// can't select when using unknown
//pipe = new Each(pipe, new Fields("name"), AssertionLevel.STRICT, new AssertNotNull());
pipe = new GroupBy(pipe);
pipe = new Every(pipe, new Count());

// print out
Tap out = new HadoopPrintStreamTap(Stream.NULL);
build(cfg(), in, out, pipe);
}

@Test
public void testNestedField() throws Exception {
String data = "{ \"data\" : { \"map\" : { \"key\" : [ 10, 20 ] } } }";
RestUtils.putData(indexPrefix + "cascading-hadoop/nestedmap", StringUtils.toUTF(data));

RestUtils.refresh(indexPrefix + "cascading-hadoop");

Properties cfg = cfg();
cfg.setProperty("es.mapping.names", "nested:data.map.key");

Tap in = new EsTap(indexPrefix + "cascading-hadoop/nestedmap", new Fields("nested"));
Pipe pipe = new Pipe("copy");
pipe = new Each(pipe, new FilterNotNull());
pipe = new Each(pipe, AssertionLevel.STRICT, new AssertSizeLessThan(2));

// print out
Tap out = new HadoopPrintStreamTap(Stream.NULL);
build(cfg, in, out, pipe);
}

@Test
public void testDynamicPattern() throws Exception {
Assert.assertTrue(RestUtils.exists(indexPrefix + "cascading-hadoop/pattern-1"));
Assert.assertTrue(RestUtils.exists(indexPrefix + "cascading-hadoop/pattern-500"));
Assert.assertTrue(RestUtils.exists(indexPrefix + "cascading-hadoop/pattern-990"));
}

@Test
public void testDynamicPatternWithFormat() throws Exception {
Assert.assertTrue(RestUtils.exists(indexPrefix + "cascading-hadoop/pattern-format-2001-10-06"));
Assert.assertTrue(RestUtils.exists(indexPrefix + "cascading-hadoop/pattern-format-2501-10-06"));
Assert.assertTrue(RestUtils.exists(indexPrefix + "cascading-hadoop/pattern-format-2890-10-06"));
}

private void build(Properties cfg, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new HadoopFlowConnector(cfg).connect(in, out, pipe)).complete();
}

private Properties cfg() {
Properties props = HdpBootstrap.asProperties(QueryTestParams.provisionQueries(CascadingHadoopSuite.configuration));
props.put(ConfigurationOptions.ES_QUERY, query);
return props;
}
}
Expand Up @@ -41,8 +41,7 @@ public class AbstractCascadingHadoopSaveTest {

@Test
public void testWriteToES() throws Exception {
// local file-system source
Tap in = new Hfs(new TextDelimited(new Fields("id", "name", "url", "picture")), INPUT);
Tap in = sourceTap();
Tap out = new EsTap("cascading-hadoop/artists", new Fields("name", "url", "picture"));
Pipe pipe = new Pipe("copy");

Expand All @@ -52,13 +51,12 @@ public void testWriteToES() throws Exception {

@Test
public void testWriteToESWithAlias() throws Exception {
// local file-system source
Tap in = new Hfs(new TextDelimited(new Fields("id", "name", "url", "picture")), INPUT);
Tap in = sourceTap();
Tap out = new EsTap("cascading-hadoop/alias", "", new Fields("name", "url", "picture"));
Pipe pipe = new Pipe("copy");

// rename "id" -> "garbage"
pipe = new Each(pipe, new Identity(new Fields("garbage", "name", "url", "picture")));
pipe = new Each(pipe, new Identity(new Fields("garbage", "name", "url", "picture", "ts")));

Properties props = HdpBootstrap.asProperties(CascadingHadoopSuite.configuration);
props.setProperty("es.mapping.names", "url:address");
Expand All @@ -69,10 +67,23 @@ public void testWriteToESWithAlias() throws Exception {
public void testIndexPattern() throws Exception {
Properties props = HdpBootstrap.asProperties(CascadingHadoopSuite.configuration);

// local file-system source
Tap in = new Hfs(new TextDelimited(new Fields("id", "name", "url", "picture")), INPUT);
Tap in = sourceTap();
Tap out = new EsTap("cascading-hadoop/pattern-{id}", new Fields("id", "name", "url", "picture"));
Pipe pipe = new Pipe("copy");
StatsUtils.proxy(new HadoopFlowConnector(props).connect(in, out, pipe)).complete();
}

@Test
public void testIndexPatternWithFormat() throws Exception {
Properties props = HdpBootstrap.asProperties(CascadingHadoopSuite.configuration);

Tap in = sourceTap();
Tap out = new EsTap("cascading-hadoop/pattern-format-{ts:YYYY-MM-dd}", new Fields("id", "name", "url", "picture", "ts"));
Pipe pipe = new Pipe("copy");
StatsUtils.proxy(new HadoopFlowConnector(props).connect(in, out, pipe)).complete();
}

private Tap sourceTap() {
return new Hfs(new TextDelimited(new Fields("id", "name", "url", "picture", "ts")), INPUT);
}
}
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -58,6 +59,11 @@ public AbstractCascadingHadoopSearchTest(String query) {
this.query = query;
}

@Before
public void before() throws Exception {
RestUtils.refresh("cascading-hadoop");
}

@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap("cascading-hadoop/artists", query);
Expand Down Expand Up @@ -136,6 +142,13 @@ public void testDynamicPattern() throws Exception {
Assert.assertTrue(RestUtils.exists("cascading-hadoop/pattern-990"));
}

@Test
public void testDynamicPatternFormat() throws Exception {
Assert.assertTrue(RestUtils.exists("cascading-hadoop/pattern-format-2001-10-06"));
Assert.assertTrue(RestUtils.exists("cascading-hadoop/pattern-format-2500-10-06"));
Assert.assertTrue(RestUtils.exists("cascading-hadoop/pattern-format-2990-10-06"));
}

private Properties cfg() {
Properties props = HdpBootstrap.asProperties(QueryTestParams.provisionQueries(CascadingHadoopSuite.configuration));
//props.put(ConfigurationOptions.ES_QUERY, query);
Expand Down
Expand Up @@ -40,8 +40,7 @@ public void testWriteToES() throws Exception {
Properties props = new TestSettings().getProperties();
props.setProperty(ConfigurationOptions.ES_INPUT_JSON, "true");

// local file-system source
Tap in = new FileTap(new TextLine(new Fields("line")), TestUtils.sampleArtistsJson());
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-local/artists");

Pipe pipe = new Pipe("copy");
Expand All @@ -54,8 +53,7 @@ public void testIndexAutoCreateDisabled() throws Exception {
properties.setProperty(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "false");
properties.setProperty(ConfigurationOptions.ES_INPUT_JSON, "true");

// local file-system source
Tap in = new FileTap(new TextLine(new Fields("line")), TestUtils.sampleArtistsJson());
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-local/non-existing", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
Expand All @@ -66,14 +64,28 @@ public void testIndexPattern() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.setProperty(ConfigurationOptions.ES_INPUT_JSON, "yes");

// local file-system source
Tap in = new FileTap(new TextLine(new Fields("line")), TestUtils.sampleArtistsJson());
Tap out = new EsTap("json-cascading/pattern-{number}", new Fields("line"));
Tap in = sourceTap();
Tap out = new EsTap("json-cascading-local/pattern-{number}", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}

@Test
public void testIndexPatternWithFormat() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.setProperty(ConfigurationOptions.ES_INPUT_JSON, "yes");

Tap in = sourceTap();
Tap out = new EsTap("json-cascading-local/pattern-format-{@timestamp:YYYY-MM-dd}", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}

private void build(Properties props, Tap in, Tap out, Pipe pipe) {
StatsUtils.proxy(new LocalFlowConnector(props).connect(in, out, pipe)).complete();
}

private Tap sourceTap() {
return new FileTap(new TextLine(new Fields("line")), TestUtils.sampleArtistsJson());
}
}

0 comments on commit 00b2e1f

Please sign in to comment.