Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public interface InternalConfigurationOptions extends ConfigurationOptions {
String INTERNAL_ES_VERSION = "es.internal.es.version";
// used for isolating connection pools of multiple spark streaming jobs in the same app.
String INTERNAL_TRANSPORT_POOLING_KEY = "es.internal.transport.pooling.key";

// don't fetch _source field during scroll queries
String INTERNAL_ES_EXCLUDE_SOURCE = "es.internal.exclude.source";
String INTERNAL_ES_EXCLUDE_SOURCE_DEFAULT = "false";
}
4 changes: 4 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ public String getScrollFields() {
return getProperty(INTERNAL_ES_TARGET_FIELDS);
}

public boolean getExcludeSource() {
return Booleans.parseBoolean(getProperty(INTERNAL_ES_EXCLUDE_SOURCE, INTERNAL_ES_EXCLUDE_SOURCE_DEFAULT));
}

public String getSerializerValueWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
.fields(SettingsUtils.determineSourceFields(settings))
.filters(QueryUtils.parseFilters(settings))
.shard(Integer.toString(partition.getShardId()))
.local(true);
.local(true)
.excludeSource(settings.getExcludeSource());
if (partition.getSlice() != null && partition.getSlice().max > 1) {
requestBuilder.slice(partition.getSlice().id, partition.getSlice().max);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private static class Slice {
private String routing;
private Slice slice;
private boolean local = false;
private boolean excludeSource = false;

public SearchRequestBuilder(EsMajorVersion version, boolean includeVersion) {
this.version = version;
Expand Down Expand Up @@ -122,6 +123,7 @@ public SearchRequestBuilder shard(String shard) {
}

public SearchRequestBuilder fields(String fieldsCSV) {
Assert.isFalse(this.excludeSource, "Fields can't be requested because _source section is excluded");
this.fields = fieldsCSV;
return this;
}
Expand Down Expand Up @@ -151,6 +153,14 @@ public SearchRequestBuilder local(boolean value) {
return this;
}

public SearchRequestBuilder excludeSource(boolean value) {
if (value) {
Assert.hasNoText(this.fields, String.format("_source section can't be excluded if fields [%s] are requested", this.fields));
}
this.excludeSource = value;
return this;
}

private String assemble() {
if (limit > 0) {
if (size > limit) {
Expand Down Expand Up @@ -184,6 +194,8 @@ private String assemble() {
// override fields
if (StringUtils.hasText(fields)) {
uriParams.put("_source", HttpEncodingTools.concatenateAndUriEncode(StringUtils.tokenize(fields), StringUtils.DEFAULT_DELIMITER));
} else if (excludeSource) {
uriParams.put("_source", "false");
}

// set shard preference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ private Object[] readHitAsMap() {
}
else {
if (readMetadata) {
if (parsingCallback != null) {
parsingCallback.excludeSource();
}

data = reader.createMap();
reader.addToMap(data, reader.wrapString(metadataField), metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface ValueParsingCallback {

void endSource();

void excludeSource();

void beginTrailMetadata();

void endTrailMetadata();
Expand Down
20 changes: 20 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/util/Assert.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public static void hasText(CharSequence sequence) {
hasText(sequence, "[Assertion failed] - this CharSequence argument must have text; it must not be null, empty, or blank");
}

public static void hasNoText(CharSequence sequence, String message) {
if (StringUtils.hasText(sequence)) {
throw new EsHadoopIllegalArgumentException(message);
}
}

public static void hasNoText(CharSequence sequence) {
hasNoText(sequence, "[Assertion failed] - this CharSequence argument must be empty");
}

public static void notNull(Object object, String message) {
if (object == null) {
throw new EsHadoopIllegalArgumentException(message);
Expand All @@ -54,4 +64,14 @@ public static void isTrue(Boolean object, String message) {
public static void isTrue(Boolean object) {
isTrue(object, "[Assertion failed] - this argument must be true");
}

public static void isFalse(Boolean object, String message) {
if (!Boolean.FALSE.equals(object)) {
throw new EsHadoopIllegalArgumentException(message);
}
}

public static void isFalse(Boolean object) {
isFalse(object, "[Assertion failed] - this argument must be false");
}
}
30 changes: 26 additions & 4 deletions mr/src/test/java/org/elasticsearch/hadoop/rest/QueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,48 @@
*/
package org.elasticsearch.hadoop.rest;

import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public class QueryTest {

private Settings cfg = new TestSettings();
private TestSettings cfg;
private SearchRequestBuilder builder;

@Before
public void setup() {
cfg = new TestSettings();
builder = new SearchRequestBuilder(EsMajorVersion.V_5_X, true);
}

@Test
public void testSimpleQuery() {
cfg.setResourceRead("foo/bar");
assertTrue(new SearchRequestBuilder(EsMajorVersion.V_5_X, true).indices("foo").types("bar").toString().contains("foo/bar"));
assertTrue(builder.indices("foo").types("bar").toString().contains("foo/bar"));
}

@Test
public void testExcludeSourceTrue() {
assertTrue(builder.excludeSource(true).toString().contains("_source=false"));
}

@Test
public void testExcludeSourceFalse() {
assertFalse(builder.fields("a,b").excludeSource(false).toString().contains("_source=false"));
}

@Test(expected=EsHadoopIllegalArgumentException.class)
public void testExcludeSourceAndGetFields() {
builder.fields("a,b").excludeSource(true);
}

@Test(expected=EsHadoopIllegalArgumentException.class)
public void testGetFieldsAndExcludeSource() {
builder.excludeSource(true).fields("a,b");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ public void testEsdataFrame2Read() throws Exception {
assertEquals(10, nameRDD.count());
}

@Test
public void testEsDataFrameReadMetadata() throws Exception {
DataFrame artists = artistsAsDataFrame();
String target = "sparksql-test/scala-dataframe-read-metadata";
JavaEsSparkSQL.saveToEs(artists, target);

DataFrame dataframe = sqc.read().format("es").option("es.read.metadata", "true").load(target).where("id = 1");

// Since _metadata field isn't a part of _source,
// we want to check that it could be fetched in any position.
assertEquals("sparksql-test", dataframe.selectExpr("_metadata['_index']").takeAsList(1).get(0).get(0));
assertEquals("sparksql-test", dataframe.selectExpr("_metadata['_index']", "name").takeAsList(1).get(0).get(0));
assertEquals("MALICE MIZER", dataframe.selectExpr("_metadata['_index']", "name").takeAsList(1).get(0).get(1));
assertEquals("MALICE MIZER", dataframe.selectExpr("name", "_metadata['_index']").takeAsList(1).get(0).get(0));
assertEquals("sparksql-test", dataframe.selectExpr("name", "_metadata['_index']").takeAsList(1).get(0).get(1));
}

private DataFrame artistsAsDataFrame() {
String input = TestUtils.sampleArtistsDat();
JavaRDD<String> data = sc.textFile(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,18 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
}
}

paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS ->
StringUtils.concatenate(filteredColumns.asInstanceOf[Array[Object]], StringUtils.DEFAULT_DELIMITER))
// Set fields to scroll over (_metadata is excluded, because it isn't a part of _source)
val sourceCSV = StringUtils.concatenate(filteredColumns.asInstanceOf[Array[Object]], StringUtils.DEFAULT_DELIMITER)
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS -> sourceCSV)

// Keep the order of fields requested by user (we don't exclude _metadata here)
val requiredCSV = StringUtils.concatenate(requiredColumns.asInstanceOf[Array[Object]], StringUtils.DEFAULT_DELIMITER)
paramWithScan += (Utils.DATA_SOURCE_REQUIRED_COLUMNS -> requiredCSV)

// If the only field requested by user is metadata, we don't want to fetch the whole document source
if (requiredCSV == cfg.getReadMetadataField()) {
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_EXCLUDE_SOURCE -> "true")
}

if (filters != null && filters.size > 0) {
if (Utils.isPushDown(cfg)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu

def endSource(): Unit = {}

def excludeSource(): Unit = { rootLevel = true; sparkRowField = Utils.ROOT_LEVEL_NAME }

def beginTrailMetadata(): Unit = {}

def endTrailMetadata(): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,13 @@ private[sql] object SchemaUtils {
def detectRowInfo(settings: Settings, struct: StructType): (Properties, Properties) = {
// tuple - 1 = columns (in simple names) for each row, 2 - what fields (in absolute names) are arrays
val rowInfo = (new Properties, new Properties)

doDetectInfo(rowInfo, ROOT_LEVEL_NAME, struct)
val csv = SettingsUtils.determineSourceFields(settings)
// if a projection is applied (filtering or projection) use that instead
if (StringUtils.hasText(csv)) {
if (settings.getReadMetadata) {
rowInfo._1.setProperty(ROOT_LEVEL_NAME, csv + StringUtils.DEFAULT_DELIMITER + settings.getReadMetadataField)
}
else {
rowInfo._1.setProperty(ROOT_LEVEL_NAME, csv)
}

val requiredFields = settings.getProperty(Utils.DATA_SOURCE_REQUIRED_COLUMNS)
if (StringUtils.hasText(requiredFields)) {
rowInfo._1.setProperty(ROOT_LEVEL_NAME, requiredFields)
}

rowInfo
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ static FieldType extractType(Field field) {

static final String DATA_SOURCE_PUSH_DOWN = "es.internal.spark.sql.pushdown";
static final String DATA_SOURCE_PUSH_DOWN_STRICT = "es.internal.spark.sql.pushdown.strict";

// columns selected by Spark SQL query
static final String DATA_SOURCE_REQUIRED_COLUMNS = "es.internal.spark.sql.required.columns";

// double filtering (run Spark filters) or not
static final String DATA_SOURCE_KEEP_HANDLED_FILTERS = "es.internal.spark.sql.pushdown.keep.handled.filters";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public void testEsDataset2Read() throws Exception {
assertEquals(10, nameRDD.count());
}

@Test
public void testEsDatasetReadMetadata() throws Exception {
String target = "sparksql-test/scala-basic-write";

Dataset<Row> dataset = sqc.read().format("es").option("es.read.metadata", "true").load(target).where("id = 1");

// Since _metadata field isn't a part of _source,
// we want to check that it could be fetched in any position.
assertEquals("sparksql-test", dataset.selectExpr("_metadata['_index']").takeAsList(1).get(0).get(0));
assertEquals("sparksql-test", dataset.selectExpr("_metadata['_index']", "name").takeAsList(1).get(0).get(0));
assertEquals("MALICE MIZER", dataset.selectExpr("_metadata['_index']", "name").takeAsList(1).get(0).get(1));
assertEquals("MALICE MIZER", dataset.selectExpr("name", "_metadata['_index']").takeAsList(1).get(0).get(0));
assertEquals("sparksql-test", dataset.selectExpr("name", "_metadata['_index']").takeAsList(1).get(0).get(1));
}

private Dataset<Row> artistsAsDataset() throws Exception {
// don't use the sc.textFile as it pulls in the Hadoop madness (2.x vs 1.x)
Path path = Paths.get(TestUtils.sampleArtistsDatUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,18 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
}
}

paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS ->
StringUtils.concatenate(filteredColumns.asInstanceOf[Array[Object]], StringUtils.DEFAULT_DELIMITER))
// Set fields to scroll over (_metadata is excluded, because it isn't a part of _source)
val sourceCSV = StringUtils.concatenate(filteredColumns.asInstanceOf[Array[Object]], StringUtils.DEFAULT_DELIMITER)
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS -> sourceCSV)

// Keep the order of fields requested by user (we don't exclude _metadata here)
val requiredCSV = StringUtils.concatenate(requiredColumns.asInstanceOf[Array[Object]], StringUtils.DEFAULT_DELIMITER)
paramWithScan += (Utils.DATA_SOURCE_REQUIRED_COLUMNS -> requiredCSV)

// If the only field requested by user is metadata, we don't want to fetch the whole document source
if (requiredCSV == cfg.getReadMetadataField()) {
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_EXCLUDE_SOURCE -> "true")
}

if (filters != null && filters.size > 0) {
if (Utils.isPushDown(cfg)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu

def endSource(): Unit = {}

def excludeSource(): Unit = { rootLevel = true; sparkRowField = Utils.ROOT_LEVEL_NAME }

def beginTrailMetadata(): Unit = {}

def endTrailMetadata(): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import java.util.Properties
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.BinaryType
import org.apache.spark.sql.types.BooleanType
Expand Down Expand Up @@ -267,18 +266,13 @@ private[sql] object SchemaUtils {
def detectRowInfo(settings: Settings, struct: StructType): (Properties, Properties) = {
// tuple - 1 = columns (in simple names) for each row, 2 - what fields (in absolute names) are arrays
val rowInfo = (new Properties, new Properties)

doDetectInfo(rowInfo, ROOT_LEVEL_NAME, struct)
val csv = SettingsUtils.determineSourceFields(settings)
// if a projection is applied (filtering or projection) use that instead
if (StringUtils.hasText(csv)) {
if (settings.getReadMetadata) {
rowInfo._1.setProperty(ROOT_LEVEL_NAME, csv + StringUtils.DEFAULT_DELIMITER + settings.getReadMetadataField)
}
else {
rowInfo._1.setProperty(ROOT_LEVEL_NAME, csv)
}

val requiredFields = settings.getProperty(Utils.DATA_SOURCE_REQUIRED_COLUMNS)
if (StringUtils.hasText(requiredFields)) {
rowInfo._1.setProperty(ROOT_LEVEL_NAME, requiredFields)
}

rowInfo
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ static FieldType extractType(Field field) {
// double filtering (run Spark filters) or not
static final String DATA_SOURCE_KEEP_HANDLED_FILTERS = "es.internal.spark.sql.pushdown.keep.handled.filters";

// columns selected by Spark SQL query
static final String DATA_SOURCE_REQUIRED_COLUMNS = "es.internal.spark.sql.required.columns";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this to a different location? Like InternalConfigurationOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about this? It looks like all "es.internal.spark." options are located here. I feel like the set of columns selected by spark sql fits into "es.internal.spark." group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this comment was my mistake originally. It's fine in this location.


static boolean isPushDown(Settings cfg) {
return Booleans.parseBoolean(cfg.getProperty(DATA_SOURCE_PUSH_DOWN), true);
}
Expand Down