diff --git a/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index cc7a45accb9fcb..c26059886f38f2 100644 --- a/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -1099,7 +1099,6 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus def testEsDataFrame60DataSourceSaveModeOverwrite() { val srcFrame = sqc.read.json(this.getClass.getResource("/small-sample.json").toURI().toString()) val index = wrapIndex("sparksql-test/savemode_overwrite") - val table = wrapIndex("save_mode_overwrite") srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(index) val df = EsSparkSQL.esDF(sqc, index) @@ -1109,6 +1108,19 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(3, df.count()) } + @Test + def testEsDataFrame60DataSourceSaveModeOverwriteWithID() { + val srcFrame = sqc.read.json(this.getClass.getResource("/small-sample.json").toURI().toString()) + val index = wrapIndex("sparksql-test/savemode_overwrite_id") + + srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index) + val df = EsSparkSQL.esDF(sqc, index) + + assertEquals(3, df.count()) + srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index) + assertEquals(3, df.count()) + } + @Test def testEsDataFrame60DataSourceSaveModeIgnore() { val srcFrame = sqc.read.json(this.getClass.getResource("/small-sample.json").toURI().toString()) diff --git a/spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala b/spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala index 85c177e189e5b1..ec28c4f3e4dbee 100644 --- a/spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala +++ b/spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala @@ -53,6 +53,8 @@ import org.elasticsearch.spark.cfg.SparkSettingsManager import org.elasticsearch.spark.serialization.ScalaValueWriter import javax.xml.bind.DatatypeConverter +import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor + private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider { Version.logVersion() @@ -470,6 +472,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @ // perform a scan-scroll delete val cfgCopy = cfg.copy() InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null) + InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false") cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000") cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb") diff --git a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index 8ad719b214d1bc..c9dfdcb47dcabc 100644 --- a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -1116,6 +1116,19 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(3, df.count()) } + @Test + def testEsDataFrame60DataSourceSaveModeOverwriteWithID() { + val srcFrame = artistsJsonAsDataFrame + val index = wrapIndex("sparksql-test/savemode_overwrite_id") + + srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index) + val df = EsSparkSQL.esDF(sqc, index) + + assertEquals(3, df.count()) + srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index) + assertEquals(3, df.count()) + } + @Test def testEsDataFrame60DataSourceSaveModeIgnore() { val srcFrame = artistsJsonAsDataFrame diff --git a/spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala b/spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala index 4ba0b7fcd429ae..a16d129ac8cb2b 100644 --- a/spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala +++ b/spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala @@ -8,7 +8,6 @@ import java.util.Locale import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.mutable.LinkedHashMap import scala.collection.mutable.LinkedHashSet - import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row @@ -54,9 +53,10 @@ import org.elasticsearch.hadoop.util.StringUtils import org.elasticsearch.hadoop.util.Version import org.elasticsearch.spark.cfg.SparkSettingsManager import org.elasticsearch.spark.serialization.ScalaValueWriter - import javax.xml.bind.DatatypeConverter +import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor + private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider { Version.logVersion() @@ -474,6 +474,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @ // perform a scan-scroll delete val cfgCopy = cfg.copy() InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null) + InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false") cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000") cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")