Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ dependencies {
testRuntime "dk.brics.automaton:automaton:1.11-8"
// specify a version of antlr that works with both hive and pig (works only during compilation)
testRuntime "org.antlr:antlr-runtime:$antlrVersion"

testCompile("org.elasticsearch:elasticsearch:$elasticsearchVersion")
}

configurations.all {
Expand Down
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ hamcrestVersion = 1.3
# pig requires 3.4 / Hive 3.0.1 - the two are incompatible
antlrVersion = 3.4
thriftVersion = 0.5.0
# elasticsearch used for testing only
elasticsearchVersion = 0.20.6

# --------------------
# Project wide version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, Record
public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
initTargetUri(conf);
conf.setOutputFormat(ESOutputFormat.class);
conf.set("mapred.output.dir", "/tmp/dummy");
}

private void initTargetUri(JobConf conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.rest.BufferedRestClient;
import org.elasticsearch.hadoop.rest.QueryResult;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.elasticsearch.hadoop;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;

public class EmbeddedElasticsearchServer {
private static final String DEFAULT_DATA_DIRECTORY = "build/elasticsearch-data";

private final Node node;
private final String dataDirectory;

public EmbeddedElasticsearchServer() {
this(DEFAULT_DATA_DIRECTORY, false, false);
}

public EmbeddedElasticsearchServer(String dataDirectory, boolean isLocal, boolean isClientOnly) {
this.dataDirectory = dataDirectory;

ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder().put("path.data",
dataDirectory);

node = nodeBuilder().local(isLocal).client(isClientOnly).settings(elasticsearchSettings.build()).node();
}

public Client getClient() {
return node.client();
}

public void shutdown() {
node.close();
deleteDataDirectory();
}

private void deleteDataDirectory() {
try {
FileUtils.deleteDirectory(new File(dataDirectory));
} catch (IOException e) {
throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
}
}

public void refresIndex(String indexName) {
new RefreshRequestBuilder(getClient().admin().indices()).setIndices(indexName).execute().actionGet();
}

public long countIndex(String indexName, String typeName) {
return getClient().prepareCount(indexName).setTypes(typeName).execute().actionGet().count();
}

public Iterator<SearchHit> searchIndex(String indexName, String typeName) {
return getClient().prepareSearch(indexName).setTypes(typeName).execute().actionGet().getHits().iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package org.elasticsearch.hadoop.cascading;

import java.util.Properties;

import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.EmbeddedElasticsearchServer;
import org.elasticsearch.hadoop.util.TestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.Identity;
import cascading.operation.filter.FilterNull;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
Expand All @@ -33,24 +33,43 @@

public class CascadingHadoopTest {

private static EmbeddedElasticsearchServer esServer;

{
TestUtils.hackHadoopStagingOnWin();
}

@BeforeClass
public static void beforeClass() {
esServer = new EmbeddedElasticsearchServer();
}

@AfterClass
public static void afterClass() {
esServer.shutdown();
}

@Test
public void testWriteToES() throws Exception {
public void testWriteToESAdnReadFromES() throws Exception {
testWriteToES();

testReadFromES();
}

private void testWriteToES() throws Exception {
// local file-system source
Tap in = new Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat");
Tap out = new ESTap("billboard/artists", new Fields("name", "url", "picture"));
Pipe pipe = new Pipe("copy");

// filter null properties
pipe = new Each(pipe, new Fields("id", "name", "url", "picture"), new FilterNull());

// rename "id" -> "garbage"
pipe = new Each(pipe, new Identity(new Fields("garbage", "name", "url", "picture")));
new HadoopFlowConnector().connect(in, out, pipe).complete();
}

@Test
public void testReadFromES() throws Exception {
Tap in = new ESTap("http://localhost:9200/billboard/artists/_search?q=me*");
Pipe copy = new Pipe("copy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package org.elasticsearch.hadoop.cascading;

import org.elasticsearch.hadoop.EmbeddedElasticsearchServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import cascading.flow.local.LocalFlowConnector;
Expand All @@ -30,8 +33,28 @@

public class CascadingLocalTest {

private static EmbeddedElasticsearchServer esServer;


@BeforeClass
public static void beforeClass() {
esServer = new EmbeddedElasticsearchServer();
}

@AfterClass
public static void afterClass() {
esServer.shutdown();
}

@Test
public void testWriteToES() throws Exception {
public void testWriteToESAdnReadFromES() throws Exception {

testWriteToES();

testReadFromES();
}

private void testWriteToES() throws Exception {
// local file-system source
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat");
Tap out = new ESTap("top/artists", new Fields("name", "url", "picture"));
Expand All @@ -43,7 +66,6 @@ public void testWriteToES() throws Exception {
new LocalFlowConnector().connect(in, out, pipe).complete();
}

@Test
public void testReadFromES() throws Exception {
Tap in = new ESTap("top/artists/_search?q=me*");
Pipe copy = new Pipe("copy");
Expand Down