diff --git a/build.gradle b/build.gradle index c47dac3ae..ad1e665f5 100644 --- a/build.gradle +++ b/build.gradle @@ -61,6 +61,8 @@ dependencies { testRuntime "dk.brics.automaton:automaton:1.11-8" // specify a version of antlr that works with both hive and pig (works only during compilation) testRuntime "org.antlr:antlr-runtime:$antlrVersion" + + testCompile("org.elasticsearch:elasticsearch:$elasticsearchVersion") } configurations.all { diff --git a/gradle.properties b/gradle.properties index 95914b214..f1fa67156 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,6 +21,8 @@ hamcrestVersion = 1.3 # pig requires 3.4 / Hive 3.0.1 - the two are incompatible antlrVersion = 3.4 thriftVersion = 0.5.0 +# elasticsearch used for testing only +elasticsearchVersion = 0.20.6 # -------------------- # Project wide version diff --git a/src/main/java/org/elasticsearch/hadoop/cascading/ESHadoopScheme.java b/src/main/java/org/elasticsearch/hadoop/cascading/ESHadoopScheme.java index e060f3300..8b2c47658 100644 --- a/src/main/java/org/elasticsearch/hadoop/cascading/ESHadoopScheme.java +++ b/src/main/java/org/elasticsearch/hadoop/cascading/ESHadoopScheme.java @@ -122,6 +122,7 @@ public void sourceConfInit(FlowProcess flowProcess, Tap flowProcess, Tap tap, JobConf conf) { initTargetUri(conf); conf.setOutputFormat(ESOutputFormat.class); + conf.set("mapred.output.dir", "/tmp/dummy"); } private void initTargetUri(JobConf conf) { diff --git a/src/main/java/org/elasticsearch/hadoop/cascading/ESLocalScheme.java b/src/main/java/org/elasticsearch/hadoop/cascading/ESLocalScheme.java index 4e833612a..11e5e7837 100644 --- a/src/main/java/org/elasticsearch/hadoop/cascading/ESLocalScheme.java +++ b/src/main/java/org/elasticsearch/hadoop/cascading/ESLocalScheme.java @@ -24,8 +24,8 @@ import java.util.Properties; import java.util.Set; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; +import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.cfg.SettingsManager; import org.elasticsearch.hadoop.rest.BufferedRestClient; import org.elasticsearch.hadoop.rest.QueryResult; diff --git a/src/test/java/org/elasticsearch/hadoop/EmbeddedElasticsearchServer.java b/src/test/java/org/elasticsearch/hadoop/EmbeddedElasticsearchServer.java new file mode 100644 index 000000000..ff4adadd4 --- /dev/null +++ b/src/test/java/org/elasticsearch/hadoop/EmbeddedElasticsearchServer.java @@ -0,0 +1,77 @@ +/* + * Copyright 2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elasticsearch.hadoop; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.io.FileUtils; +import org.elasticsearch.node.Node; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +public class EmbeddedElasticsearchServer { + private static final String DEFAULT_DATA_DIRECTORY = "build/elasticsearch-data"; + + private final Node node; + private final String dataDirectory; + + public EmbeddedElasticsearchServer() { + this(DEFAULT_DATA_DIRECTORY, false, false); + } + + public EmbeddedElasticsearchServer(String dataDirectory, boolean isLocal, boolean isClientOnly) { + this.dataDirectory = dataDirectory; + + ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder().put("path.data", + dataDirectory); + + node = nodeBuilder().local(isLocal).client(isClientOnly).settings(elasticsearchSettings.build()).node(); + } + + public Client getClient() { + return node.client(); + } + + public void shutdown() { + node.close(); + deleteDataDirectory(); + } + + private void deleteDataDirectory() { + try { + FileUtils.deleteDirectory(new File(dataDirectory)); + } catch (IOException e) { + throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e); + } + } + + public void refresIndex(String indexName) { + new RefreshRequestBuilder(getClient().admin().indices()).setIndices(indexName).execute().actionGet(); + } + + public long countIndex(String indexName, String typeName) { + return getClient().prepareCount(indexName).setTypes(typeName).execute().actionGet().count(); + } + + public Iterator searchIndex(String indexName, String typeName) { + return getClient().prepareSearch(indexName).setTypes(typeName).execute().actionGet().getHits().iterator(); + } +} diff --git a/src/test/java/org/elasticsearch/hadoop/cascading/CascadingHadoopTest.java b/src/test/java/org/elasticsearch/hadoop/cascading/CascadingHadoopTest.java index 527096e66..eaccd6ef3 100644 --- a/src/test/java/org/elasticsearch/hadoop/cascading/CascadingHadoopTest.java +++ b/src/test/java/org/elasticsearch/hadoop/cascading/CascadingHadoopTest.java @@ -15,15 +15,15 @@ */ package org.elasticsearch.hadoop.cascading; -import java.util.Properties; - -import org.elasticsearch.hadoop.cfg.ConfigurationOptions; +import org.elasticsearch.hadoop.EmbeddedElasticsearchServer; import org.elasticsearch.hadoop.util.TestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; -import cascading.flow.FlowDef; import cascading.flow.hadoop.HadoopFlowConnector; import cascading.operation.Identity; +import cascading.operation.filter.FilterNull; import cascading.pipe.Each; import cascading.pipe.Pipe; import cascading.scheme.hadoop.TextDelimited; @@ -33,24 +33,43 @@ public class CascadingHadoopTest { + private static EmbeddedElasticsearchServer esServer; + { TestUtils.hackHadoopStagingOnWin(); } + @BeforeClass + public static void beforeClass() { + esServer = new EmbeddedElasticsearchServer(); + } + + @AfterClass + public static void afterClass() { + esServer.shutdown(); + } @Test - public void testWriteToES() throws Exception { + public void testWriteToESAdnReadFromES() throws Exception { + testWriteToES(); + + testReadFromES(); + } + + private void testWriteToES() throws Exception { // local file-system source Tap in = new Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat"); Tap out = new ESTap("billboard/artists", new Fields("name", "url", "picture")); Pipe pipe = new Pipe("copy"); + // filter null properties + pipe = new Each(pipe, new Fields("id", "name", "url", "picture"), new FilterNull()); + // rename "id" -> "garbage" pipe = new Each(pipe, new Identity(new Fields("garbage", "name", "url", "picture"))); new HadoopFlowConnector().connect(in, out, pipe).complete(); } - @Test public void testReadFromES() throws Exception { Tap in = new ESTap("http://localhost:9200/billboard/artists/_search?q=me*"); Pipe copy = new Pipe("copy"); diff --git a/src/test/java/org/elasticsearch/hadoop/cascading/CascadingLocalTest.java b/src/test/java/org/elasticsearch/hadoop/cascading/CascadingLocalTest.java index 5c5005d6a..e198af750 100644 --- a/src/test/java/org/elasticsearch/hadoop/cascading/CascadingLocalTest.java +++ b/src/test/java/org/elasticsearch/hadoop/cascading/CascadingLocalTest.java @@ -15,6 +15,9 @@ */ package org.elasticsearch.hadoop.cascading; +import org.elasticsearch.hadoop.EmbeddedElasticsearchServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import cascading.flow.local.LocalFlowConnector; @@ -30,8 +33,28 @@ public class CascadingLocalTest { + private static EmbeddedElasticsearchServer esServer; + + + @BeforeClass + public static void beforeClass() { + esServer = new EmbeddedElasticsearchServer(); + } + + @AfterClass + public static void afterClass() { + esServer.shutdown(); + } + @Test - public void testWriteToES() throws Exception { + public void testWriteToESAdnReadFromES() throws Exception { + + testWriteToES(); + + testReadFromES(); + } + + private void testWriteToES() throws Exception { // local file-system source Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat"); Tap out = new ESTap("top/artists", new Fields("name", "url", "picture")); @@ -43,7 +66,6 @@ public void testWriteToES() throws Exception { new LocalFlowConnector().connect(in, out, pipe).complete(); } - @Test public void testReadFromES() throws Exception { Tap in = new ESTap("top/artists/_search?q=me*"); Pipe copy = new Pipe("copy");