Skip to content

Commit

Permalink
[SPARK] Fixed ClassNotFoundError when saving DataFrame with both 'es.…
Browse files Browse the repository at this point in the history
…mapping.id' and SaveMode.Overwrite

fixes elastic#837
  • Loading branch information
jbaiera committed Aug 29, 2016
1 parent b60f606 commit 0ed7203
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
Expand Up @@ -1099,7 +1099,6 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
def testEsDataFrame60DataSourceSaveModeOverwrite() {
val srcFrame = sqc.read.json(this.getClass.getResource("/small-sample.json").toURI().toString())
val index = wrapIndex("sparksql-test/savemode_overwrite")
val table = wrapIndex("save_mode_overwrite")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(index)
val df = EsSparkSQL.esDF(sqc, index)
Expand All @@ -1109,6 +1108,19 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals(3, df.count())
}

@Test
def testEsDataFrame60DataSourceSaveModeOverwriteWithID() {
val srcFrame = sqc.read.json(this.getClass.getResource("/small-sample.json").toURI().toString())
val index = wrapIndex("sparksql-test/savemode_overwrite_id")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index)
val df = EsSparkSQL.esDF(sqc, index)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index)
assertEquals(3, df.count())
}

@Test
def testEsDataFrame60DataSourceSaveModeIgnore() {
val srcFrame = sqc.read.json(this.getClass.getResource("/small-sample.json").toURI().toString())
Expand Down
Expand Up @@ -53,6 +53,8 @@ import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.spark.serialization.ScalaValueWriter
import javax.xml.bind.DatatypeConverter

import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor

private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {

Version.logVersion()
Expand Down Expand Up @@ -470,6 +472,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
// perform a scan-scroll delete
val cfgCopy = cfg.copy()
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")
Expand Down
Expand Up @@ -1116,6 +1116,19 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals(3, df.count())
}

@Test
def testEsDataFrame60DataSourceSaveModeOverwriteWithID() {
val srcFrame = artistsJsonAsDataFrame
val index = wrapIndex("sparksql-test/savemode_overwrite_id")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index)
val df = EsSparkSQL.esDF(sqc, index)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(index)
assertEquals(3, df.count())
}

@Test
def testEsDataFrame60DataSourceSaveModeIgnore() {
val srcFrame = artistsJsonAsDataFrame
Expand Down
Expand Up @@ -8,7 +8,6 @@ import java.util.Locale
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.LinkedHashSet

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -54,9 +53,10 @@ import org.elasticsearch.hadoop.util.StringUtils
import org.elasticsearch.hadoop.util.Version
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.spark.serialization.ScalaValueWriter

import javax.xml.bind.DatatypeConverter

import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor

private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {

Version.logVersion()
Expand Down Expand Up @@ -474,6 +474,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
// perform a scan-scroll delete
val cfgCopy = cfg.copy()
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
InitializationUtils.setFieldExtractorIfNotSet(cfgCopy, classOf[ConstantFieldExtractor], null) //throw away extractor
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "1000")
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_SIZE_BYTES, "1mb")
Expand Down

0 comments on commit 0ed7203

Please sign in to comment.