Skip to content

Commit

Permalink
Improve escaping of projected fields
Browse files Browse the repository at this point in the history
Fix elastic#264

(cherry picked from commit c759218fee9c02e20b7ecc17bbec21f5aa8020a6)
  • Loading branch information
costin committed Sep 9, 2014
1 parent 9df0ab0 commit 60fff2d
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 27 deletions.
Expand Up @@ -126,7 +126,7 @@ public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, Record

Collection<String> fields = CascadingUtils.fieldToAlias(set, getSourceFields());
// load only the necessary fields
conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(fields, ","));
conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenateAndUriEncode(fields, ","));

if (log.isTraceEnabled()) {
log.trace("Initialized (source) configuration " + HadoopCfgUtils.asProperties(conf));
Expand Down
Expand Up @@ -83,7 +83,7 @@ public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, Scrol
MappingUtils.validateMapping(fields, mapping, validation, log);
}

input = QueryBuilder.query(settings).fields(StringUtils.concatenate(fields, ",")).build(client, new ScrollReader(new JdkValueReader(), mapping));
input = QueryBuilder.query(settings).fields(StringUtils.concatenateAndUriEncode(fields, ",")).build(client, new ScrollReader(new JdkValueReader(), mapping));
}
return new TupleEntrySchemeIterator<Properties, ScrollQuery>(flowProcess, getScheme(), input, getIdentifier());
}
Expand Down
Expand Up @@ -54,8 +54,9 @@ public void testQuery() throws Exception {
String create = "CREATE EXTERNAL TABLE cars2 ("
+ "color STRING,"
+ "price BIGINT,"
+ "sold TIMESTAMP) "
+ HiveSuite.tableProps("cars/transactions", null, (String[]) null);
+ "sold TIMESTAMP, "
+ "alias STRING) "
+ HiveSuite.tableProps("cars/transactions", null, "'es.mapping.names'='alias:&c'");

String query = "SELECT * from cars2";
String count = "SELECT count(1) from cars2";
Expand All @@ -64,6 +65,7 @@ public void testQuery() throws Exception {
server.execute(create);
List<String> result = server.execute(query);
assertEquals(6, result.size());
assertTrue(result.get(0).contains("foobar"));
result = server.execute(count);
assertEquals("6", result.get(0));
}
Expand Down
Expand Up @@ -37,9 +37,9 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractHiveSaveTest.class, AbstractHiveSaveJsonTest.class, AbstractHiveSearchTest.class, AbstractHiveSearchJsonTest.class, AbstractHiveExtraTests.class, AbstractHiveExtraTests.class })
//@Suite.SuiteClasses({ AbstractHiveSaveTest.class, AbstractHiveSaveJsonTest.class, AbstractHiveSearchTest.class, AbstractHiveSearchJsonTest.class, AbstractHiveExtraTests.class, AbstractHiveExtraTests.class })
//@Suite.SuiteClasses({ AbstractHiveSaveJsonTest.class, AbstractHiveSearchJsonTest.class })
//@Suite.SuiteClasses({ AbstractHiveExtraTests.class })
@Suite.SuiteClasses({ AbstractHiveExtraTests.class })
public class HiveSuite {

static HiveInstance server;
Expand Down
12 changes: 6 additions & 6 deletions hive/src/itest/resources/cars-bulk.txt
@@ -1,12 +1,12 @@
{ "index": {}}
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" }
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18", "&c" : "foobar" }
{ "index": {}}
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" }
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02", "&c" : "foobar" }
{ "index": {}}
{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" }
{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19", "&c" : "foobar" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05", "&c" : "foobar" }
{ "index": {}}
{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" }
{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01", "&c" : "foobar" }
{ "index": {}}
{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" }
{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12", "&c" : "foobar" }
Expand Up @@ -103,7 +103,7 @@ public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Log log = LogFactory.getLog(getClass());
// move on to initialization
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenateAndUriEncode(HiveUtils.columnToAlias(settings), ","));
// set read resource
settings.setResourceRead(settings.getResourceRead());
HiveUtils.init(settings, log);
Expand Down
Expand Up @@ -141,7 +141,10 @@ public QueryBuilder fields(String fieldsCSV) {
}

private String assemble() {
StringBuilder sb = new StringBuilder(resource.indexAndType());
StringBuilder sb = new StringBuilder();
sb.append(StringUtils.encodePath(resource.index()));
sb.append("/");
sb.append(StringUtils.encodePath(resource.type()));
sb.append("/_search?");

// override infrastructure params
Expand All @@ -153,7 +156,7 @@ private String assemble() {
if (StringUtils.hasText(fields)) {
if (IS_ES_10) {
uriQuery.put("_source", fields);
uriQuery.remove("escapedFields");
uriQuery.remove("fields");
}
else {
uriQuery.put("fields", fields);
Expand Down Expand Up @@ -195,7 +198,7 @@ private String assemble() {
}

public ScrollQuery build(RestRepository client, ScrollReader reader) {
String scrollUri = StringUtils.escapeUri(assemble());
String scrollUri = assemble();
return client.scan(scrollUri, bodyQuery, reader);
}

Expand Down
Expand Up @@ -379,7 +379,7 @@ public void close() {

private static String escapeUri(String uri) {
// escape the uri right away
String escaped = StringUtils.escapeUri(uri);
String escaped = StringUtils.encodeUri(uri);
return escaped.contains("://") ? escaped : "http://" + escaped;
}

Expand Down
Expand Up @@ -45,7 +45,7 @@ public class MappingUtils {

public static void validateMapping(String fields, Field mapping, FieldPresenceValidation validation, Log log) {
if (StringUtils.hasText(fields)) {
validateMapping(StringUtils.tokenize(fields), mapping, validation, log);
validateMapping(StringUtils.tokenizeAndUriDecode(fields, ","), mapping, validation, log);
}
}

Expand All @@ -60,7 +60,7 @@ public static void validateMapping(Collection<String> fields, Field mapping, Fie
return;
}

String message = String.format("Field(s) [%s] not found in the Elasticsearch mapping specified; did you mean [%s]?",
String message = String.format("Field(s) [%s] not found in the Elasticsearch mapping specified; did you mean [%s]?",
removeDoubleBrackets(results[0]), removeDoubleBrackets(results[1]));
if (validation == FieldPresenceValidation.WARN) {
log.warn(message);
Expand Down
57 changes: 54 additions & 3 deletions mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java
Expand Up @@ -18,6 +18,9 @@
*/
package org.elasticsearch.hadoop.util;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -64,6 +67,17 @@ public static List<String> tokenize(String string, String delimiters) {
return tokenize(string, delimiters, true, true);
}

public static List<String> tokenizeAndUriDecode(String string, String delimiters) {
List<String> tokenize = tokenize(string, delimiters, true, true);
List<String> decoded = new ArrayList<String>(tokenize.size());

for (String token : tokenize) {
decoded.add(StringUtils.decodeQuery(token));
}

return decoded;
}

public static List<String> tokenize(String string, String delimiters, boolean trimTokens, boolean ignoreEmptyTokens) {
if (string == null) {
return Collections.emptyList();
Expand Down Expand Up @@ -100,6 +114,17 @@ public static String concatenate(Collection<?> list, String delimiter) {
return sb.toString();
}

public static String concatenateAndUriEncode(Collection<?> list, String delimiter) {
Collection<String> escaped = new ArrayList<String>();

if (list != null) {
for (Object object : list) {
escaped.add(encodeQuery(object.toString()));
}
}
return concatenate(escaped, delimiter);
}

public static String concatenate(Object[] array, String delimiter) {
if (array == null || array.length == 0) {
return EMPTY;
Expand Down Expand Up @@ -155,7 +180,8 @@ public static int levenshteinDistance(CharSequence one, CharSequence another, in
// if one string is empty, the edit distance is necessarily the length of the other
if (n == 0) {
return m <= threshold ? m : -1;
} else if (m == 0) {
}
else if (m == 0) {
return n <= threshold ? n : -1;
}

Expand Down Expand Up @@ -206,7 +232,8 @@ public static int levenshteinDistance(CharSequence one, CharSequence another, in
if (one.charAt(i - 1) == t_j) {
// diagonally left and up
d[i] = p[i - 1];
} else {
}
else {
// 1 + minimum of cell to the left, to the top, diagonally left and up
d[i] = 1 + Math.min(Math.min(d[i - 1], p[i]), p[i - 1]);
}
Expand Down Expand Up @@ -260,11 +287,35 @@ public static String sanitizeResource(String resource) {
return res;
}

public static String escapeUri(String uri) {
public static String encodeUri(String uri) {
try {
return URIUtil.encodePathQuery(uri);
} catch (URIException ex) {
throw new EsHadoopIllegalArgumentException("Cannot escape uri" + uri);
}
}

public static String encodePath(String path) {
try {
return URIUtil.encodePath(path, "UTF-8");
} catch (URIException ex) {
throw new EsHadoopIllegalArgumentException("Cannot encode path" + path, ex);
}
}

public static String encodeQuery(String query) {
try {
return URLEncoder.encode(query, "UTF-8");
} catch (UnsupportedEncodingException ex) {
throw new EsHadoopIllegalArgumentException("Cannot encode path" + query, ex);
}
}

public static String decodeQuery(String query) {
try {
return URLDecoder.decode(query, "UTF-8");
} catch (UnsupportedEncodingException ex) {
throw new EsHadoopIllegalArgumentException("Cannot encode path" + query, ex);
}
}
}
44 changes: 44 additions & 0 deletions mr/src/test/java/org/elasticsearch/hadoop/rest/EscapeTest.java
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.rest;

import java.util.Arrays;

import org.elasticsearch.hadoop.util.StringUtils;
import org.junit.Test;

import static org.junit.Assert.*;

import static org.hamcrest.CoreMatchers.*;

public class EscapeTest {

@Test
public void testSingleAmpersandEscape() {
String uri = StringUtils.encodeQuery("&c");
assertThat(uri, is("%26c"));
}

@Test
public void testMultiAmpersandEscapeSimple() {
String uri = StringUtils.concatenateAndUriEncode(Arrays.asList("&a", "$b", "#c", "!d", "/e", ":f"), ",");
assertThat(uri, is("%26a,%24b,%23c,%21d,%2Fe,%3Af"));
}

}
Expand Up @@ -73,8 +73,8 @@ private Resource createResource(String target) {

@Test
public void testURiEscaping() throws Exception {
assertEquals("http://localhost:9200/index/type%7Cfoo?q=foo%7Cbar:bar%7Cfoo", StringUtils.escapeUri("http://localhost:9200/index/type|foo?q=foo|bar:bar|foo"));
assertEquals("foo%7Cbar", StringUtils.escapeUri("foo|bar"));
System.out.println(StringUtils.escapeUri("foo|bar,abc,xyz|rpt"));
assertEquals("http://localhost:9200/index/type%7Cfoo?q=foo%7Cbar:bar%7Cfoo", StringUtils.encodeUri("http://localhost:9200/index/type|foo?q=foo|bar:bar|foo"));
assertEquals("foo%7Cbar", StringUtils.encodeUri("foo|bar"));
System.out.println(StringUtils.encodeUri("foo|bar,abc,xyz|rpt"));
}
}
4 changes: 2 additions & 2 deletions pig/src/main/java/org/elasticsearch/hadoop/pig/PigUtils.java
Expand Up @@ -102,7 +102,7 @@ static String asProjection(Schema schema, Properties props) {
List<String> fields = new ArrayList<String>();
addField(schema, fields, alias(new PropertiesSettings(props)), null);

return StringUtils.concatenate(fields.toArray(new String[fields.size()]), ",");
return StringUtils.concatenate(fields, ",");
}

private static void addField(Schema schema, List<String> fields, FieldAlias fa, String currentNode) {
Expand Down Expand Up @@ -139,7 +139,7 @@ static String asProjection(RequiredFieldList list, Properties props) {
addField(field, fields, alias, "");
}

return StringUtils.concatenate(fields.toArray(new String[fields.size()]), ",");
return StringUtils.concatenateAndUriEncode(fields, ",");
}

private static void addField(RequiredField field, List<String> fields, FieldAlias fa, String currentNode) {
Expand Down

0 comments on commit 60fff2d

Please sign in to comment.