Skip to content

Commit

Permalink
[SPARK] add integration tests for Relation and Insert traits
Browse files Browse the repository at this point in the history
relates elastic#461
  • Loading branch information
costin committed Jun 14, 2015
1 parent 9aeecee commit ea40077
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -16,3 +16,5 @@ metastore_db
/mr/src/main/resources/esh-build.properties
/pig/tmp-pig/*
/spark/keyvaluerdd.parquet
/spark/sql-13/default_*
/spark/sql-13/with_meta_*
Expand Up @@ -233,7 +233,7 @@ public void refresh(Resource resource) {
}

public void delete(String indexOrType) {
execute(DELETE, indexOrType);
execute(DELETE, indexOrType, false);
}

public List<List<Map<String, Object>>> targetShards(String index) {
Expand Down Expand Up @@ -386,8 +386,8 @@ public boolean touch(String indexOrType) {
}

public long count(String indexAndType) {
Long count = (Long) get(indexAndType + "/_count", "count");
return (count != null ? count : -1);
Number count = (Number) get(indexAndType + "/_count", "count");
return (count != null ? count.longValue() : -1);
}

public boolean isAlias(String query) {
Expand Down
3 changes: 3 additions & 0 deletions spark/sql-13/src/itest/resources/small-sample.json
@@ -0,0 +1,3 @@
{"number":"1","name":"MALICE MIZER","url":"http://www.last.fm/music/MALICE+MIZER","picture":"http://userserve-ak.last.fm/serve/252/10808.jpg","@timestamp":"2000-10-06T19:20:25.000Z","list":["quick", "brown", "fox"]}
{"number":"2","name":"Diary of Dreams","url":"http://www.last.fm/music/Diary+of+Dreams","picture":"http://userserve-ak.last.fm/serve/252/3052066.jpg","@timestamp":"2001-10-06T19:20:25.000Z","list":["quick", "brown", "fox"]}
{"number":"3","name":"Carpathian Forest","url":"http://www.last.fm/music/Carpathian+Forest","picture":"http://userserve-ak.last.fm/serve/252/40222717.jpg","@timestamp":"2002-10-06T19:20:25.000Z","list":["quick", "brown", "fox"]}
Expand Up @@ -18,21 +18,30 @@
*/
package org.elasticsearch.spark.integration;

import java.{ util => ju, lang => jl }

import java.{lang => jl}
import java.sql.Timestamp
import java.{util => ju}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions.propertiesAsScalaMap
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.storage.StorageLevel._
import org.elasticsearch.hadoop.cfg.ConfigurationOptions._
import org.elasticsearch.hadoop.mr.RestUtils
import org.elasticsearch.hadoop.util.StringUtils
import org.elasticsearch.hadoop.util.TestSettings
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.spark._
Expand All @@ -46,23 +55,16 @@ import org.junit.Assert._
import org.junit.Assume._
import org.junit.BeforeClass
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.MethodSorters
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import org.elasticsearch.hadoop.cfg.ConfigurationOptions._
import org.junit.Test
import javax.xml.bind.DatatypeConverter
import org.apache.spark.sql.catalyst.expressions.GenericRow
import java.util.Arrays
import org.apache.spark.storage.StorageLevel._
import org.elasticsearch.hadoop.Provisioner
import org.elasticsearch.hadoop.util.StringUtils
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable.ArrayBuffer

import com.esotericsoftware.kryo.io.{ Output => KryoOutput }
import com.esotericsoftware.kryo.io.{ Input => KryoInput }
import com.esotericsoftware.kryo.io.{Input => KryoInput}
import com.esotericsoftware.kryo.io.{Output => KryoOutput}

import javax.xml.bind.DatatypeConverter

object AbstractScalaEsScalaSparkSQL {
@transient val conf = new SparkConf().setAll(TestSettings.TESTING_PROPS).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Expand Down Expand Up @@ -91,23 +93,45 @@ object AbstractScalaEsScalaSparkSQL {
@Parameters
def testParams(): ju.Collection[Array[jl.Object]] = {
val list = new ju.ArrayList[Array[jl.Object]]()
list.add(Array("default_", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.FALSE))
list.add(Array("default_strict", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.TRUE))
list.add(Array("default_no_push", jl.Boolean.FALSE, jl.Boolean.FALSE, jl.Boolean.FALSE))
list.add(Array("with_meta_", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.FALSE))
list.add(Array("with_meta_strict", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.TRUE))
list.add(Array("with_meta_no_push", jl.Boolean.TRUE, jl.Boolean.FALSE, jl.Boolean.FALSE))
// no query
val noQuery = ""
list.add(Array("default_", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.FALSE, noQuery))
list.add(Array("default_strict", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.TRUE, noQuery))
list.add(Array("default_no_push", jl.Boolean.FALSE, jl.Boolean.FALSE, jl.Boolean.FALSE, noQuery))
list.add(Array("with_meta_", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.FALSE, noQuery))
list.add(Array("with_meta_strict", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.TRUE, noQuery))
list.add(Array("with_meta_no_push", jl.Boolean.TRUE, jl.Boolean.FALSE, jl.Boolean.FALSE, noQuery))

// uri query
val uriQuery = "?q=*"
list.add(Array("default_uri_query", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.FALSE, uriQuery))
list.add(Array("default_uri_query_strict", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.TRUE, uriQuery))
list.add(Array("default_uri_query_no_push", jl.Boolean.FALSE, jl.Boolean.FALSE, jl.Boolean.FALSE, uriQuery))
list.add(Array("with_meta_uri_query_", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.FALSE, uriQuery))
list.add(Array("with_meta_uri_query_strict", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.TRUE, uriQuery))
list.add(Array("with_meta_uri_query_no_push", jl.Boolean.TRUE, jl.Boolean.FALSE, jl.Boolean.FALSE, uriQuery))

// dsl query
val dslQuery = """ {"query" : { "match_all" : { } } } """
list.add(Array("default_dsl_query_", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.FALSE, dslQuery))
list.add(Array("default_strict_dsl_query", jl.Boolean.FALSE, jl.Boolean.TRUE, jl.Boolean.TRUE, dslQuery))
list.add(Array("default_no_push_dsl_query", jl.Boolean.FALSE, jl.Boolean.FALSE, jl.Boolean.FALSE, dslQuery))
list.add(Array("with_meta_dsl_query", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.FALSE, dslQuery))
list.add(Array("with_meta_strict_dsl_query", jl.Boolean.TRUE, jl.Boolean.TRUE, jl.Boolean.TRUE, dslQuery))
list.add(Array("with_meta_no_push_dsl_query", jl.Boolean.TRUE, jl.Boolean.FALSE, jl.Boolean.FALSE, dslQuery))

list
}
}

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(classOf[Parameterized])
class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pushDown: jl.Boolean, strictPushDown: jl.Boolean) extends Serializable {
class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pushDown: jl.Boolean, strictPushDown: jl.Boolean, query: String = "") extends Serializable {

val sc = AbstractScalaEsScalaSparkSQL.sc
val sqc = AbstractScalaEsScalaSparkSQL.sqc
val cfg = Map(ES_READ_METADATA -> readMetadata.toString(),
val cfg = Map(ES_QUERY -> query,
ES_READ_METADATA -> readMetadata.toString(),
"es.internal.spark.sql.pushdown" -> pushDown.toString(),
"es.internal.spark.sql.pushdown.strict" -> strictPushDown.toString() )

Expand Down Expand Up @@ -557,12 +581,15 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus

@Test
def testEsDataFrame51WriteToExistingDataSource() {
// to keep the select static
assumeFalse(readMetadata)

val index = wrapIndex("sparksql-test/scala-basic-write")
val table = wrapIndex("table_insert")

var options = s"resource '$index '"
if (readMetadata) {
options = options + " ,read_metadata true"
options = options + " ,read_metadata 'true'"
}

val dataFrame = sqc.sql(s"CREATE TEMPORARY TABLE $table " +
Expand Down Expand Up @@ -597,7 +624,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertTrue(df.count > 1)
val insertRDD = sqc.sql(s"INSERT OVERWRITE TABLE $table SELECT 123456789, 'test-sql', 'http://test-sql.com', '', 12345")
df = sqc.table(table)
assertTrue(df.count == 1)
assertEquals(1, df.count)
}

@Test
Expand All @@ -624,7 +651,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
srcOptions = srcOptions + " ,read_metadata 'true'"
}

val srcFrame = sqc.sql(s"CREATE TEMPORARY TABLE $srcTable " +
val srcFrame = sqc.sql(s"CREATE TEMPORARY TABLE $srcTable " +
s"USING org.elasticsearch.spark.sql " +
s"OPTIONS ($srcOptions)");

Expand All @@ -638,6 +665,62 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertTrue(df.count > 100)
}

@Test
def testEsDataFrame60DataSourceSaveModeError() {
val srcFrame = sqc.jsonFile(this.getClass.getResource("/small-sample.json").toURI().toString())
val index = wrapIndex("sparksql-test/savemode_error")
val table = wrapIndex("save_mode_error")

srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.ErrorIfExists)
try {
srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.ErrorIfExists)
fail()
} catch {
case _ => // swallow
}
}

@Test
def testEsDataFrame60DataSourceSaveModeAppend() {
val srcFrame = sqc.jsonFile(this.getClass.getResource("/small-sample.json").toURI().toString())
val index = wrapIndex("sparksql-test/savemode_append")
val table = wrapIndex("save_mode_append")

srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.Append)
val df = EsSparkSQL.esDF(sqc, index)

assertEquals(3, df.count())
srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.Append)
assertEquals(6, df.count())
}

@Test
def testEsDataFrame60DataSourceSaveModeOverwrite() {
val srcFrame = sqc.jsonFile(this.getClass.getResource("/small-sample.json").toURI().toString())
val index = wrapIndex("sparksql-test/savemode_overwrite")
val table = wrapIndex("save_mode_overwrite")

srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.Overwrite)
val df = EsSparkSQL.esDF(sqc, index)

assertEquals(3, df.count())
srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.Overwrite)
assertEquals(3, df.count())
}

@Test
def testEsDataFrame60DataSourceSaveModeIgnore() {
val srcFrame = sqc.jsonFile(this.getClass.getResource("/small-sample.json").toURI().toString())
val index = wrapIndex("sparksql-test/savemode_ignore")
val table = wrapIndex("save_mode_ignore")

srcFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.Ignore)
val df = EsSparkSQL.esDF(sqc, index)

assertEquals(3, df.count())
artistsAsDataFrame.save(index, "org.elasticsearch.spark.sql", SaveMode.Ignore)
assertEquals(3, df.count())
}

def wrapIndex(index: String) = {
prefix + index
Expand Down

0 comments on commit ea40077

Please sign in to comment.