Skip to content

Commit

Permalink
add support for Cascading + integration tests
Browse files Browse the repository at this point in the history
relates to #9
  • Loading branch information
costin committed Jan 17, 2014
1 parent a5367b2 commit 978deda
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 16 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.mr.WritableBytesWriter;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.util.FieldAlias;
Expand Down Expand Up @@ -120,6 +121,7 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe

InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setBytesWriterIfNeeded(set, WritableBytesWriter.class, LogFactory.getLog(EsTap.class));

// NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
//conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.WritableBytesWriter;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
Expand Down Expand Up @@ -117,6 +118,7 @@ private void initClient(Properties props) {

InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setValueReaderIfNotSet(settings, JdkValueReader.class, LogFactory.getLog(EsTap.class));
InitializationUtils.setBytesWriterIfNeeded(settings, WritableBytesWriter.class, LogFactory.getLog(EsTap.class));
settings.save();
client = new RestRepository(settings);
}
Expand Down
Expand Up @@ -30,21 +30,35 @@ public class QueryTestParams {

public static Collection<Object[]> params() {
return Arrays.asList(new Object[][] {
{ "" }, // empty
{ "?q=m*" }, // uri
{ "{ \"query\" : { \"query_string\" : { \"query\":\"m*\"} } }" }, // query dsl
{ "src/test/resources/org/elasticsearch/hadoop/integration/query.uri" }, // nested uri
{ "src/test/resources/org/elasticsearch/hadoop/integration/query.dsl" } // nested dsl
});
// standard
{ "", "" }, // empty
{ "", "?q=m*" }, // uri
{ "", "{ \"query\" : { \"query_string\" : { \"query\":\"m*\"} } }" }, // query dsl
{ "", "src/test/resources/org/elasticsearch/hadoop/integration/query.uri" }, // nested uri
{ "", "src/test/resources/org/elasticsearch/hadoop/integration/query.dsl" }, // nested dsl
// json
{ "json-", "" }, // empty
{ "json-", "?q=m*" }, // uri
{ "json-", "{ \"query\" : { \"query_string\" : { \"query\":\"m*\"} } }" }, // query dsl
{ "json-", "src/test/resources/org/elasticsearch/hadoop/integration/query.uri" }, // nested uri
{ "json-", "src/test/resources/org/elasticsearch/hadoop/integration/query.dsl" } // nested dsl

});
}

public static Collection<Object[]> localParams() {
return Arrays.asList(new Object[][] {
{ "" }, // empty
{ "?q=m*" }, // uri
{ "{ \"query\" : { \"query_string\" : { \"query\":\"m*\"} } }" }, // query dsl
{ "org/elasticsearch/hadoop/integration/query.uri" }, // nested uri
{ "org/elasticsearch/hadoop/integration/query.dsl" } // nested dsl
{ "", "" }, // empty
{ "", "?q=m*" }, // uri
{ "", "{ \"query\" : { \"query_string\" : { \"query\":\"m*\"} } }" }, // query dsl
{ "", "org/elasticsearch/hadoop/integration/query.uri" }, // nested uri
{ "", "org/elasticsearch/hadoop/integration/query.dsl" }, // nested dsl

{ "json-", "" }, // empty
{ "json-", "?q=m*" }, // uri
{ "json-", "{ \"query\" : { \"query_string\" : { \"query\":\"m*\"} } }" }, // query dsl
{ "json-", "org/elasticsearch/hadoop/integration/query.uri" }, // nested uri
{ "json-", "org/elasticsearch/hadoop/integration/query.dsl" } // nested dsl
});
}

Expand Down
Expand Up @@ -36,10 +36,12 @@

public class CascadingLocalSaveTest {

private static final String INPUT = "src/test/resources/artists.dat";

@Test
public void testWriteToES() throws Exception {
// local file-system source
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat");
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), INPUT);
Tap out = new EsTap("cascading-local/artists", new Fields("name", "url", "picture"));

Pipe pipe = new Pipe("copy");
Expand All @@ -49,7 +51,7 @@ public void testWriteToES() throws Exception {
@Test
public void testWriteToESWithAlias() throws Exception {
// local file-system source
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat");
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), INPUT);
Tap out = new EsTap("cascading-local/alias", new Fields("name", "url", "picture"));
Pipe pipe = new Pipe("copy");

Expand All @@ -68,7 +70,7 @@ public void testIndexAutoCreateDisabled() throws Exception {
properties.setProperty(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "false");

// local file-system source
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), "src/test/resources/artists.dat");
Tap in = new FileTap(new TextDelimited(new Fields("id", "name", "url", "picture")), INPUT);
Tap out = new EsTap("cascading-local/non-existing", new Fields("name", "url", "picture"));

Pipe pipe = new Pipe("copy");
Expand All @@ -77,4 +79,4 @@ public void testIndexAutoCreateDisabled() throws Exception {
pipe = new Each(pipe, new Identity(new Fields("garbage", "name", "url", "picture")));
new LocalFlowConnector(properties).connect(in, out, pipe).complete();
}
}
}
Expand Up @@ -25,7 +25,7 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ CascadingLocalSaveTest.class, CascadingLocalSearchTest.class })
@Suite.SuiteClasses({ CascadingLocalJsonSaveTest.class, CascadingLocalSaveTest.class, CascadingLocalJsonSearchTest.class, CascadingLocalSearchTest.class })
public class CascadingLocalSuite {

@ClassRule
Expand Down

0 comments on commit 978deda

Please sign in to comment.