Skip to content

Commit

Permalink
Allow index wild cards
Browse files Browse the repository at this point in the history
improve index validation
relates #233
  • Loading branch information
costin committed Aug 13, 2014
1 parent 02d247f commit 04f3c99
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 15 deletions.
Expand Up @@ -71,6 +71,14 @@ public void testBasicSearch() throws Exception {
new Job(conf).waitForCompletion(true);
}

@Test
public void testBasicWildSearch() throws Exception {
Configuration conf = createConf();
conf.set(ConfigurationOptions.ES_RESOURCE, indexPrefix + "mrnew*/save");

new Job(conf).waitForCompletion(true);
}

@Test
public void testSearchWithId() throws Exception {
Configuration conf = createConf();
Expand Down
Expand Up @@ -78,6 +78,15 @@ public void testBasicSearch() throws Exception {
JobClient.runJob(conf);
}


@Test
public void testBasicSearchWithWildCard() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, indexPrefix + "mrold*/save");

JobClient.runJob(conf);
}

@Test
public void testSearchWithId() throws Exception {
JobConf conf = createJobConf();
Expand Down Expand Up @@ -150,7 +159,6 @@ public void testUpsertOnlyParamScriptWithArrayOnArrayField() throws Exception {
String target = "mroldapi/createwitharrayupsert/1";
Assert.assertTrue(RestUtils.exists(target));
String result = RestUtils.get(target);
System.out.println(result);
assertThat(result, not(containsString("ArrayWritable@")));
}

Expand Down
9 changes: 8 additions & 1 deletion mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java
Expand Up @@ -132,4 +132,11 @@ public static boolean touch(String indexAndType) throws IOException {
rc.close();
return result;
}
}

public static boolean flush(String index) throws IOException {
ExtendedRestClient rc = new ExtendedRestClient();
boolean result = rc.touch(index);
rc.close();
return result;
}
}
9 changes: 4 additions & 5 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/Resource.java
Expand Up @@ -86,15 +86,14 @@ String bulk() {
return bulk;
}

// https://github.com/elasticsearch/elasticsearch/issues/2726
String targetShards() {
return indexAndType + "/_search_shards";
}

String mapping() {
return indexAndType + "/_mapping";
}

String aliases() {
return index + "/_aliases";
}

String indexAndType() {
return indexAndType;
}
Expand Down
12 changes: 9 additions & 3 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Expand Up @@ -233,11 +233,13 @@ public void deleteIndex(String index) {
execute(DELETE, index);
}

public List<List<Map<String, Object>>> targetShards(Resource resource) {
public List<List<Map<String, Object>>> targetShards(String index) {
List<List<Map<String, Object>>> shardsJson = null;

// https://github.com/elasticsearch/elasticsearch/issues/2726
String target = index + "/_search_shards";
if (indexReadMissingAsEmpty) {
Response res = execute(GET, resource.targetShards(), false);
Response res = execute(GET, target, false);
if (res.status() == HttpStatus.NOT_FOUND) {
shardsJson = Collections.emptyList();
}
Expand All @@ -246,7 +248,7 @@ public List<List<Map<String, Object>>> targetShards(Resource resource) {
}
}
else {
shardsJson = get(resource.targetShards(), "shards");
shardsJson = get(target, "shards");
}

return shardsJson;
Expand Down Expand Up @@ -364,6 +366,10 @@ public String esVersion() {
return version.get("number");
}

public Map<String, Object> aliases(String index) {
return get(index, null);
}

public boolean health(String index, HEALTH health, TimeValue timeout) {
StringBuilder sb = new StringBuilder("/_cluster/health/");
sb.append(index);
Expand Down
16 changes: 11 additions & 5 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java
Expand Up @@ -32,8 +32,8 @@
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
import org.elasticsearch.hadoop.serialization.dto.Node;
import org.elasticsearch.hadoop.serialization.dto.Shard;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
Expand Down Expand Up @@ -218,8 +218,9 @@ public RestClient getRestClient() {
public Map<Shard, Node> getReadTargetShards() {
Map<String, Node> nodes = client.getNodes();

List<List<Map<String, Object>>> info = client.targetShards(resourceR);
Map<Shard, Node> shards = new LinkedHashMap<Shard, Node>(info.size());
Map<Shard, Node> shards = new LinkedHashMap<Shard, Node>();

List<List<Map<String, Object>>> info = client.targetShards(resourceR.index());

for (List<Map<String, Object>> shardGroup : info) {
// find the first started shard in each group (round-robin)
Expand All @@ -239,7 +240,7 @@ public Map<Shard, Node> getReadTargetShards() {
public Map<Shard, Node> getWriteTargetPrimaryShards() {
Map<String, Node> nodes = client.getNodes();

List<List<Map<String, Object>>> info = client.targetShards(resourceW);
List<List<Map<String, Object>>> info = client.targetShards(resourceW.index());
Map<Shard, Node> shards = new LinkedHashMap<Shard, Node>(info.size());

for (List<Map<String, Object>> shardGroup : info) {
Expand Down Expand Up @@ -279,7 +280,12 @@ public boolean indexExists(boolean read) {
// could be a _all or a pattern which is valid for read
// try again by asking the mapping - could be expensive
if (!exists && read) {
exists = !client.getMapping(res.mapping()).isEmpty();
try {
// make sure the mapping is null since the index might exist but the type might be missing
exists = !client.getMapping(res.mapping()).isEmpty();
} catch (EsHadoopInvalidRequest ex) {
exists = false;
}
}
return exists;
}
Expand Down

0 comments on commit 04f3c99

Please sign in to comment.