diff --git a/src/main/java/org/elasticsearch/hadoop/cascading/CascadingUtils.java b/src/main/java/org/elasticsearch/hadoop/cascading/CascadingUtils.java index 4a7ec33da..2ce098154 100644 --- a/src/main/java/org/elasticsearch/hadoop/cascading/CascadingUtils.java +++ b/src/main/java/org/elasticsearch/hadoop/cascading/CascadingUtils.java @@ -127,7 +127,7 @@ static Properties extractOriginalProperties(Properties copy) { return ReflectionUtils.getField(field, copy); } - static Settings init(Settings settings, String nodes, int port, String resource, String query) { + static Settings init(Settings settings, String nodes, int port, String resource, String query, boolean read) { if (StringUtils.hasText(nodes)) { settings.setHosts(nodes); } @@ -141,7 +141,12 @@ static Settings init(Settings settings, String nodes, int port, String resource, } if (StringUtils.hasText(resource)) { - settings.setResource(resource); + if (read) { + settings.setResourceRead(resource); + } + else { + settings.setResourceWrite(resource); + } } return settings; diff --git a/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java b/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java index 4fcad1f56..e56a7a651 100644 --- a/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java +++ b/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java @@ -89,7 +89,7 @@ public void sourcePrepare(FlowProcess flowProcess, SourceCall flowProcess, SinkCall flowProcess, SinkCall flowProcess, Tap tap, JobConf conf) { - - initTargetUri(conf); conf.setInputFormat(EsInputFormat.class); - Settings set = loadSettings(conf); + Settings set = loadSettings(conf, true); + Collection fields = CascadingUtils.fieldToAlias(set, getSourceFields()); // load only the necessary fields conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(fields, ",")); @@ -137,10 +136,10 @@ public void sourceConfInit(FlowProcess flowProcess, Tap flowProcess, Tap tap, JobConf conf) { - initTargetUri(conf); + conf.setOutputFormat(EsOutputFormat.class); // define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat - Settings set = loadSettings(conf); + Settings set = loadSettings(conf, false); InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, LogFactory.getLog(EsTap.class)); InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, LogFactory.getLog(EsTap.class)); @@ -148,7 +147,7 @@ public void sinkConfInit(FlowProcess flowProcess, Tap flowProcess, Tap flowProcess, SinkCall flowProcess, Tap tap, Properties conf) { - initClient(conf); + initClient(conf, true); } @Override public void sinkConfInit(FlowProcess flowProcess, Tap tap, Properties conf) { - initClient(conf); + initClient(conf, false); InitializationUtils.checkIndexExistence(SettingsManager.loadFrom(conf).merge(props), client); } - private void initClient(Properties props) { + private void initClient(Properties props, boolean read) { if (client == null) { Settings settings = SettingsManager.loadFrom(props).merge(this.props); - CascadingUtils.init(settings, host, port, resource, query); + CascadingUtils.init(settings, host, port, resource, query, read); Log log = LogFactory.getLog(EsTap.class); InitializationUtils.setValueWriterIfNotSet(settings, CascadingValueWriter.class, log); diff --git a/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java b/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java index 793b49d33..023e5e256 100644 --- a/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java +++ b/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java @@ -208,11 +208,6 @@ public Settings setPort(int port) { return this; } - public Settings setResource(String index) { - setProperty(ES_RESOURCE, index); - return this; - } - public Settings setResourceRead(String index) { setProperty(ES_RESOURCE_READ, index); return this; diff --git a/src/main/java/org/elasticsearch/hadoop/hive/EsStorageHandler.java b/src/main/java/org/elasticsearch/hadoop/hive/EsStorageHandler.java index 500e94d67..1f2d6b439 100644 --- a/src/main/java/org/elasticsearch/hadoop/hive/EsStorageHandler.java +++ b/src/main/java/org/elasticsearch/hadoop/hive/EsStorageHandler.java @@ -76,15 +76,15 @@ public HiveMetaHook getMetaHook() { @Override public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { - init(tableDesc); + init(tableDesc, true); } @Override public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { - init(tableDesc); + init(tableDesc, false); } - private void init(TableDesc tableDesc) { + private void init(TableDesc tableDesc, boolean read) { Configuration cfg = getConf(); Settings settings = SettingsManager.loadFrom(cfg).merge(tableDesc.getProperties()); @@ -92,9 +92,21 @@ private void init(TableDesc tableDesc) { // NB: the value writer is not needed by Hive but it's set for consistency and debugging purposes InitializationUtils.checkIdForOperation(settings); - InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log); - InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log); - InitializationUtils.setBytesConverterIfNeeded(settings, HiveBytesConverter.class, log); + if (read) { + InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log); + settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ",")); + // set read resource + settings.setResourceRead(settings.getResourceRead()); + } + else { + InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log); + InitializationUtils.setBytesConverterIfNeeded(settings, HiveBytesConverter.class, log); + // replace the default committer when using the old API + HadoopCfgUtils.setOutputCommitterClass(cfg, EsOutputFormat.ESOutputCommitter.class.getName()); + // set write resource + settings.setResourceWrite(settings.getResourceWrite()); + } + InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log); try { InitializationUtils.discoverEsVersion(settings, log); @@ -103,12 +115,6 @@ private void init(TableDesc tableDesc) { } - settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, - StringUtils.concatenate(HiveUtils.columnToAlias(settings), ",")); - - // replace the default committer when using the old API - HadoopCfgUtils.setOutputCommitterClass(cfg, EsOutputFormat.ESOutputCommitter.class.getName()); - Assert.hasText(tableDesc.getProperties().getProperty(TABLE_LOCATION), String.format( "no table location [%s] declared by Hive resulting in abnormal execution;", TABLE_LOCATION)); } diff --git a/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java b/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java index 4a0f4fc1b..c15703d03 100644 --- a/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java +++ b/src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java @@ -417,22 +417,22 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit String savedSettings = settings.save(); RestRepository client = new RestRepository(settings); - boolean indexExists = client.indexExists(); + boolean indexExists = client.indexExists(true); Map targetShards = null; if (!indexExists) { if (settings.getIndexReadMissingAsEmpty()) { - log.info(String.format("Index [%s] missing - treating it as empty", settings.getResource())); + log.info(String.format("Index [%s] missing - treating it as empty", settings.getResourceRead())); targetShards = Collections.emptyMap(); } else { client.close(); throw new EsHadoopIllegalArgumentException( - String.format("Index [%s] missing and settings [%s] is set to false", settings.getResource(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL)); + String.format("Index [%s] missing and settings [%s] is set to false", settings.getResourceRead(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL)); } } else { - targetShards = client.getTargetShards(); + targetShards = client.getReadTargetShards(); if (log.isTraceEnabled()) { log.trace("Creating splits for shards " + targetShards); } @@ -443,7 +443,7 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit Field mapping = client.getMapping(); //TODO: implement this more efficiently savedMapping = IOUtils.serializeToBase64(mapping); - log.info(String.format("Discovered mapping {%s} for [%s]", mapping, settings.getResource())); + log.info(String.format("Discovered mapping {%s} for [%s]", mapping, settings.getResourceRead())); } client.close(); diff --git a/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java b/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java index 012b72065..8b2f37efd 100644 --- a/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java +++ b/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java @@ -192,7 +192,7 @@ protected void init() throws IOException { } } - Map targetShards = client.getTargetPrimaryShards(); + Map targetShards = client.getWriteTargetPrimaryShards(); client.close(); List orderedShards = new ArrayList(targetShards.keySet()); diff --git a/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java b/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java index 02e55b407..35a5f24f4 100644 --- a/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java +++ b/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java @@ -143,11 +143,14 @@ public void checkSchema(ResourceSchema s) throws IOException { @Override public void setStoreLocation(String location, Job job) throws IOException { - init(location, job); + init(location, job, false); } - private void init(String location, Job job) { - Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties).setResource(location); + private void init(String location, Job job, boolean read) { + Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties); + + settings = (read ? settings.setResourceRead(location) : settings.setResourceWrite(location)); + boolean changed = false; InitializationUtils.checkIdForOperation(settings); @@ -222,7 +225,7 @@ public void storeSchema(ResourceSchema schema, String location, Job job) throws // @SuppressWarnings("unchecked") public void setLocation(String location, Job job) throws IOException { - init(location, job); + init(location, job, true); Configuration cfg = job.getConfiguration(); diff --git a/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java b/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java index 8201a4d5d..72e839923 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java @@ -80,6 +80,15 @@ public static boolean discoverNodesIfNeeded(Settings settings, Log log) throws I } public static String discoverEsVersion(Settings settings, Log log) throws IOException { + String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION); + if (StringUtils.hasText(version)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Elasticsearch version [%s] already present in configuration; skipping discovery", version)); + } + + return version; + } + RestClient bootstrap = new RestClient(settings); // first get ES version try { @@ -101,10 +110,10 @@ public static void checkIndexExistence(Settings settings, RestRepository client) client = new RestRepository(settings); } try { - if (!client.indexExists()) { + if (!client.indexExists(false)) { client.close(); throw new EsHadoopIllegalArgumentException(String.format("Target index [%s] does not exist and auto-creation is disabled [setting '%s' is '%s']", - settings.getResource(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate())); + settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate())); } } catch (IOException ex) { throw new EsHadoopIllegalStateException("Cannot check index existance", ex); @@ -139,13 +148,12 @@ public static void saveSchemaIfNeeded(Object conf, ValueWriter schemaWrit if (settings.getIndexAutoCreate()) { RestRepository client = new RestRepository(settings); - if (!client.indexExists()) { + if (!client.indexExists(false)) { if (schemaWriter == null) { log.warn(String.format("No mapping found [%s] and no schema found; letting Elasticsearch perform auto-mapping...", settings.getResourceWrite())); } else { - log.info(String.format("No mapping found [%s], creating one based on given schema", - settings.getResource())); + log.info(String.format("No mapping found [%s], creating one based on given schema", settings.getResourceWrite())); ContentBuilder builder = ContentBuilder.generate(schemaWriter).value(schema).flush(); BytesArray content = ((FastByteArrayOutputStream) builder.content()).bytes(); builder.close(); diff --git a/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java b/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java index 24c131047..fe036ab24 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java @@ -54,7 +54,7 @@ public class QueryBuilder { private String fields; QueryBuilder(Settings settings) { - this.resource = new Resource(settings); + this.resource = new Resource(settings, true); IS_ES_10 = SettingsUtils.isEs10(settings); String query = settings.getQuery(); if (!StringUtils.hasText(query)) { diff --git a/src/main/java/org/elasticsearch/hadoop/rest/Resource.java b/src/main/java/org/elasticsearch/hadoop/rest/Resource.java index fbc5d2e5d..d5cd3ddc6 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/Resource.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/Resource.java @@ -64,7 +64,7 @@ public Resource(Settings settings, boolean read) { Assert.isTrue(index >= 0 && index < resource.length() - 1, errorMessage); resource = resource.substring(0, index); - settings.setResource(resource); + settings.setProperty(ConfigurationOptions.ES_RESOURCE, resource); settings.setQuery(query); } } diff --git a/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java b/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java index 9b7229414..e3390359e 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java @@ -39,6 +39,7 @@ import org.elasticsearch.hadoop.util.Assert; import org.elasticsearch.hadoop.util.BytesArray; import org.elasticsearch.hadoop.util.BytesRef; +import org.elasticsearch.hadoop.util.StringUtils; import org.elasticsearch.hadoop.util.TrackingBytesArray; import org.elasticsearch.hadoop.util.unit.TimeValue; @@ -61,15 +62,26 @@ public class RestRepository implements Closeable, StatsAware { private boolean writeInitialized = false; private RestClient client; - private Resource resource; + private Resource resourceR; + private Resource resourceW; private Command command; private final Settings settings; private final Stats stats = new Stats(); public RestRepository(Settings settings) { this.settings = settings; + + if (StringUtils.hasText(settings.getResourceRead())) { + this.resourceR = new Resource(settings, true); + } + + if (StringUtils.hasText(settings.getResourceWrite())) { + this.resourceW = new Resource(settings, false); + } + + Assert.isTrue(resourceR != null || resourceW != null, "Invalid configuration - No read or write resource specified"); + this.client = new RestClient(settings); - this.resource = new Resource(settings); } /** postpone writing initialization since we can do only reading so there's no need to allocate buffers */ @@ -148,7 +160,7 @@ private void sendBatch() throws IOException { log.debug(String.format("Sending batch of [%d] bytes/[%s] entries", data.length(), dataEntries)); } - client.bulk(resource, data); + client.bulk(resourceW, data); data.reset(); dataEntries = 0; executedBulkWrite = true; @@ -165,10 +177,10 @@ public void close() { } if (requiresRefreshAfterBulk && executedBulkWrite) { // refresh batch - client.refresh(resource); + client.refresh(resourceW); if (log.isDebugEnabled()) { - log.debug(String.format("Refreshing index [%s]", resource)); + log.debug(String.format("Refreshing index [%s]", resourceW)); } } } catch (IOException ex) { @@ -182,10 +194,10 @@ public RestClient getRestClient() { return client; } - public Map getTargetShards() throws IOException { + public Map getReadTargetShards() throws IOException { Map nodes = client.getNodes(); - List>> info = client.targetShards(resource); + List>> info = client.targetShards(resourceR); Map shards = new LinkedHashMap(info.size()); for (List> shardGroup : info) { @@ -203,10 +215,10 @@ public Map getTargetShards() throws IOException { return shards; } - public Map getTargetPrimaryShards() throws IOException { + public Map getWriteTargetPrimaryShards() throws IOException { Map nodes = client.getNodes(); - List>> info = client.targetShards(resource); + List>> info = client.targetShards(resourceW); Map shards = new LinkedHashMap(info.size()); for (List> shardGroup : info) { @@ -225,7 +237,7 @@ public Map getTargetPrimaryShards() throws IOException { } public Field getMapping() throws IOException { - return Field.parseField((Map) client.getMapping(resource.mapping())); + return Field.parseField((Map) client.getMapping(resourceR.mapping())); } public List scroll(String scrollId, ScrollReader reader) throws IOException { @@ -239,20 +251,21 @@ public List scroll(String scrollId, ScrollReader reader) throws IOExce } } - public boolean indexExists() throws IOException { - return client.exists(resource.indexAndType()); + public boolean indexExists(boolean read) throws IOException { + Resource res = (read ? resourceR : resourceW); + return client.exists(res.indexAndType()); } public void putMapping(BytesArray mapping) throws IOException { - client.putMapping(resource.index(), resource.mapping(), mapping.bytes()); + client.putMapping(resourceW.index(), resourceW.mapping(), mapping.bytes()); } public boolean touch() throws IOException { - return client.touch(resource.index()); + return client.touch(resourceW.index()); } public boolean waitForYellow() throws IOException { - return client.health(resource.index(), RestClient.HEALTH.YELLOW, TimeValue.timeValueSeconds(10)); + return client.health(resourceW.index(), RestClient.HEALTH.YELLOW, TimeValue.timeValueSeconds(10)); } @Override diff --git a/src/test/java/org/elasticsearch/hadoop/integration/EsEmbeddedServer.java b/src/test/java/org/elasticsearch/hadoop/integration/EsEmbeddedServer.java index 824ab9049..d8641e210 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/EsEmbeddedServer.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/EsEmbeddedServer.java @@ -36,6 +36,7 @@ public EsEmbeddedServer(String clusterName, String dataPath, String httpRange, S props.setProperty("transport.tcp.port", transportRange); props.setProperty("es.index.store.type", "memory"); props.setProperty("gateway.type", "none"); + props.setProperty("discovery.zen.ping.multicast.enabled", "false"); Settings settings = ImmutableSettings.settingsBuilder().put(props).build(); node = NodeBuilder.nodeBuilder().local(false).client(false).settings(settings).clusterName(clusterName).build(); diff --git a/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSearchTest.java b/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSearchTest.java index d497f3c69..da93a2e12 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSearchTest.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSearchTest.java @@ -257,6 +257,40 @@ public void testParentChild() throws Exception { assertContains(result, "last.fm/serve/252/2181591.jpg"); } + + @Test + public void testReadWriteSameJob() throws Exception { + + String write = + "CREATE EXTERNAL TABLE rwwrite" + testInstance +" (" + + "id BIGINT, " + + "name STRING, " + + "links STRUCT) " + + tableProps("hive/rwwrite"); + + + String read = + "CREATE EXTERNAL TABLE rwread" + testInstance +" (" + + "id BIGINT, " + + "name STRING, " + + "links STRUCT) " + + tableProps("hive/artists"); + + String selectInsert = "INSERT OVERWRITE TABLE rwwrite" + testInstance + " SELECT * FROM rwread" + testInstance; + String select = "SELECT * FROM rwwrite" + testInstance; + + System.out.println(server.execute(read)); + System.out.println(server.execute(write)); + + System.out.println(server.execute(selectInsert)); + List result = server.execute(select); + assertTrue("Hive returned null", containsNoNull(result)); + assertTrue(result.size() > 1); + assertContains(result, "Marilyn"); + assertContains(result, "last.fm/music/MALICE"); + assertContains(result, "last.fm/serve/252/2181591.jpg"); + } + private static boolean containsNoNull(List str) { for (String string : str) { if (string.contains("NULL")) { diff --git a/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java b/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java index 2f4512dd6..8c4364c79 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java @@ -38,7 +38,7 @@ @RunWith(Suite.class) @Suite.SuiteClasses({ HiveSaveTest.class, HiveSaveJsonTest.class, HiveSearchTest.class, HiveSearchJsonTest.class }) -//@Suite.SuiteClasses({ HiveSearchJsonTest.class }) +//@Suite.SuiteClasses({ HiveSaveTest.class, HiveSearchTest.class }) public class HiveSuite { static HiveInstance server; diff --git a/src/test/java/org/elasticsearch/hadoop/integration/rest/RestQueryTest.java b/src/test/java/org/elasticsearch/hadoop/integration/rest/RestQueryTest.java index f54535ad4..02b3f0350 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/rest/RestQueryTest.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/rest/RestQueryTest.java @@ -64,7 +64,7 @@ public void stop() throws Exception { @Test public void testShardInfo() throws Exception { - Map shards = client.getTargetShards(); + Map shards = client.getReadTargetShards(); System.out.println(shards); assertNotNull(shards); } @@ -89,7 +89,7 @@ public void testQueryBuilder() throws Exception { @Test public void testQueryShards() throws Exception { - Map targetShards = client.getTargetShards(); + Map targetShards = client.getReadTargetShards(); Field mapping = client.getMapping(); ScrollReader reader = new ScrollReader(new JdkValueReader(), mapping); diff --git a/src/test/java/org/elasticsearch/hadoop/integration/rest/RestSaveTest.java b/src/test/java/org/elasticsearch/hadoop/integration/rest/RestSaveTest.java index a8575ebaf..5de462b89 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/rest/RestSaveTest.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/rest/RestSaveTest.java @@ -64,7 +64,7 @@ public void testEmptyBulkWrite() throws Exception { testSettings.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName()); RestRepository restRepo = new RestRepository(testSettings); RestClient client = restRepo.getRestClient(); - client.bulk(new Resource(testSettings), new TrackingBytesArray(new BytesArray("{}"))); + client.bulk(new Resource(testSettings, false), new TrackingBytesArray(new BytesArray("{}"))); restRepo.waitForYellow(); restRepo.close(); client.close(); diff --git a/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java b/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java index 613f7605e..c6940b456 100644 --- a/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java +++ b/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.hadoop.rest; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.util.TestSettings; import org.junit.Test; @@ -52,8 +53,8 @@ public void testQueryUriWithParams() throws Exception { private Resource createResource(String target) { Settings s = new TestSettings(); - s.setResource(target); - return new Resource(s); + s.setProperty(ConfigurationOptions.ES_RESOURCE, target); + return new Resource(s, true); } }