Skip to content
This repository has been archived by the owner on Nov 20, 2019. It is now read-only.

Commit

Permalink
[CROSSDATA-576] [elasticsearch] Delegate import table to datasource (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
darroyo-stratio committed Aug 29, 2016
1 parent 7e113b2 commit 5348885
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 55 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

Only listing significant user-visible, not internal code cleanups and minor bug fixes.

## 1.5.0 (upcoming)
## 1.6.0 (upcoming)

* Elasticsearch: support import tables without specifying a particular resource

## 1.5.0 (August 2016)

* Global index: use of inverted indices for speeding-up queries
* Downgrade to Elasticsearch 2.0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider
* @param parameters a Map with the configurations parameters
* @return the validated map.
*/
private def params(parameters: Map[String, String]) = {
private def params(parameters: Map[String, String], resourceValidation: Boolean = true) = {

// '.' seems to be problematic when specifying the options
val params = parameters.map { case (k, v) => (k.replace('_', '.'), v) }.map { case (k, v) =>
Expand All @@ -108,9 +108,10 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider
else ("es." + k, v)
}

// validate path
if (params.get(ConfigurationOptions.ES_RESOURCE_READ).orElse(params.get(ConfigurationOptions.ES_RESOURCE)).isEmpty)
throw new EsHadoopIllegalArgumentException("resource must be specified for Elasticsearch resources.")
if (resourceValidation) {
if (params.get(ConfigurationOptions.ES_RESOURCE_READ).orElse(params.get(ConfigurationOptions.ES_RESOURCE)).isEmpty)
throw new EsHadoopIllegalArgumentException("resource must be specified for Elasticsearch resources.")
}

params
}
Expand All @@ -131,7 +132,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider
if (!options.contains(opName)) sys.error( s"""Option "$opName" is mandatory for IMPORT TABLES""")
}

ElasticSearchConnectionUtils.listTypes(params(options))
ElasticSearchConnectionUtils.listTypes(params(options, resourceValidation = false))
}

override def createExternalTable(context: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
*/
package com.stratio.crossdata.connector.elasticsearch

import com.sksamuel.elastic4s.{ElasticsearchClientUri, ElasticClient}
import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import com.stratio.crossdata.connector.TableInventory.Table
import com.stratio.crossdata.connector.elasticsearch.DefaultSource._
import org.apache.spark.sql.types._
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.metadata.MappingMetaData
import org.elasticsearch.common.collect.ImmutableOpenMap
Expand Down Expand Up @@ -98,35 +97,10 @@ object ElasticSearchConnectionUtils {

}


private def getIndexDetails(indexName:String, indexData: ImmutableOpenMap[String, MappingMetaData]): Seq[Table] ={
indexData.keys().map(typeES => new Table(typeES.value, Some(indexName), Some(buildStructType(indexData.get(typeES.value))))).toSeq
}

private def convertType(typeName:String): DataType = {

typeName match {
case "string"=> StringType
case "integer" => IntegerType
case "date" => DateType
case "boolean" => BooleanType
case "double" => DoubleType
case "long" => LongType
case "float" => FloatType
case "null" => NullType
case _ => throw new RuntimeException (s"The type $typeName isn't supported yet in Elasticsearch connector.")
}

private def getIndexDetails(indexName: String, indexData: ImmutableOpenMap[String, MappingMetaData]): Seq[Table] = {
val schema = None // Elasticsearch 'datasource' is already able to infer the schema
indexData.keys().map(typeES => new Table(typeES.value, Some(indexName), schema)).toSeq
}

private def buildStructType(mapping: MappingMetaData): StructType ={

val esFields = mapping.sourceAsMap().get("properties").asInstanceOf[java.util.LinkedHashMap[String,java.util.LinkedHashMap[String, String]]].toMap

val fields: Seq[StructField] = esFields.map {
case (colName, propertyValueMap) => StructField(colName, convertType(propertyValueMap.get("type")), false)
}(collection.breakOut)

StructType(fields)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/
package com.stratio.crossdata.connector.elasticsearch

import com.sksamuel.elastic4s.ElasticDsl._
import org.apache.spark.sql.Row
import org.apache.spark.sql.crossdata.ExecutionType
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

// TODO ignore while elastic fix ... or when native support select map/array values
@RunWith(classOf[JUnitRunner])
class ElasticInsertTableIT extends ElasticInsertCollection {


it should "insert a row using INSERT INTO table VALUES in ElasticSearch" ignore {
val query = s"""|INSERT INTO $Type VALUES (20, 25, 'proof description', true, 'Eve', false, '2015-01-01' ,
|1200.00, 1463646640046, ['proof'], (a->2), [ (x -> 1, y-> 1), (z -> 1) ], ( x->[1,2], y-> [3,4] ))""".stripMargin
|1200.00, 1463646640046, ['proof'], (a->2), [ (x -> 1, y-> 1), (z -> 1) ], ( x->[1,2], y-> [3,4] ))""".stripMargin

_xdContext.sql(query).collect() should be (Row(1)::Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ class ElasticSearchConnectionUtilsIT extends ElasticWithSharedContext with Elast
val types = ElasticSearchConnectionUtils.listTypes(options)

//Expectations
types should not be (null)
types should not be null
types.size should be (1)
types(0).schema.get.size should be (8)
types.head.schema shouldBe empty

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
package com.stratio.crossdata.connector.elasticsearch

import com.sksamuel.elastic4s.ElasticDsl._
import org.apache.spark.sql.types._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class ElasticSearchImportTablesIT extends ElasticWithSharedContext {


// IMPORT OPERATIONS

it should "import all tables from a keyspace" in {
it should "import all tables from an index" in {
assumeEnvironmentIsUpAndRunning
def tableCountInHighschool: Long = sql("SHOW TABLES").count
val initialLength = tableCountInHighschool
Expand Down Expand Up @@ -69,7 +73,25 @@ class ElasticSearchImportTablesIT extends ElasticWithSharedContext {

//Expectations
xdContext.tableNames() should contain (s"$Index.$Type")
xdContext.table(s"$Index.$Type").schema should have length 8
val schema = xdContext.table(s"$Index.$Type").schema

schema should have length 9

schema("age").dataType shouldBe IntegerType
schema("description").dataType shouldBe StringType
schema("enrolled").dataType shouldBe BooleanType
schema("salary").dataType shouldBe DoubleType
schema("ageInMillis").dataType shouldBe LongType
schema("birthday").dataType shouldBe TimestampType
schema("team").dataType shouldBe a [StructType]

val teamSchema = schema("team").dataType.asInstanceOf[StructType]
teamSchema.fields should have length 2
teamSchema("id").dataType shouldBe IntegerType
teamSchema("name").dataType shouldBe StringType



}

it should "infer schema after import One table from an Index" in {
Expand Down Expand Up @@ -190,6 +212,7 @@ class ElasticSearchImportTablesIT extends ElasticWithSharedContext {
}
}


lazy val connectionOptions: Map[String, String] = Map(
"es.nodes" -> s"$ElasticHost",
"es.port" -> s"$ElasticRestPort",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ class ElasticSearchTypesIT extends ElasticWithSharedContext {
// "name" typed StringType index NotAnalyzed,
// "birthday" typed DateType,
// "salary" typed DoubleType,
// "ageInMilis" typed LongType
// "ageInMillis" typed LongType
"A ElasticSearchQueryProcessor " should "Return types in correct format" in {
assumeEnvironmentIsUpAndRunning

//Experimentation
val dataframe = sql(s"SELECT * FROM $Type where id = 2")
val dataframe = sql(s"SELECT id, age, description, enrolled, name, birthday, salary, ageInMillis FROM $Type where id = 2")
val result = dataframe.collect(Native)

//Expectations
Expand All @@ -48,18 +48,17 @@ class ElasticSearchTypesIT extends ElasticWithSharedContext {
result(0).get(3).isInstanceOf[Boolean] should be (true)
result(0).get(4).isInstanceOf[String] should be (true)

result(0).get(6).isInstanceOf[Date] should be (true)
result(0).get(7).isInstanceOf[Double] should be (true)
result(0).get(8).isInstanceOf[Long] should be (true)
result(0).get(5).isInstanceOf[Date] should be (true)
result(0).get(6).isInstanceOf[Double] should be (true)
result(0).get(7).isInstanceOf[Long] should be (true)

result(0).getInt(0) should be (2)
result(0).getInt(1) should be (12)
result(0).getString(2) should be ("A 2description about the Name2")
result(0).getBoolean(3) should be (true)
result(0).getString(4) should be ("Name 2")

result(0).getDate(6) should be (DateTime.parse("1982-01-01T10:00:00-00:00").toDate)
result(0).getDouble(7) should be (2001.0)
result(0).getLong(8) should be (DateTime.parse("1982-01-01T10:00:00-00:00").getMillis)
result(0).getDate(5) should be (DateTime.parse("1982-01-01T10:00:00-00:00").toDate)
result(0).getDouble(6) should be (2001.0)
result(0).getLong(7) should be (DateTime.parse("1982-01-01T10:00:00-00:00").getMillis)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ trait ElasticWithSharedContext extends SharedXDContextWithDataTest with ElasticS
"name" -> s"Name $a",
"birthday" -> DateTime.parse((1980 + a) + "-01-01T10:00:00-00:00").toDate,
"salary" -> a * 1000.5,
"ageInMilis" -> DateTime.parse((1980 + a) + "-01-01T10:00:00-00:00").getMillis)
"ageInMillis" -> DateTime.parse((1980 + a) + "-01-01T10:00:00-00:00").getMillis,
"team" -> Map("id" -> (a+1), "name" -> s"team_name$a") )
}.await
client.get.execute {
flush index Index
Expand All @@ -78,8 +79,12 @@ trait ElasticWithSharedContext extends SharedXDContextWithDataTest with ElasticS
elasticClient
} toOption




"team" inner( "id" typed IntegerType, "name" typed StringType)
override def sparkRegisterTableSQL: Seq[SparkTable] = super.sparkRegisterTableSQL :+
str2sparkTableDesc(s"CREATE TEMPORARY TABLE $Type (id INT, age INT, description STRING, enrolled BOOLEAN, name STRING, optionalField BOOLEAN, birthday DATE, salary DOUBLE, ageInMilis LONG)")
str2sparkTableDesc(s"CREATE TEMPORARY TABLE $Type (id INT, age INT, description STRING, enrolled BOOLEAN, name STRING, birthday DATE, salary DOUBLE, ageInMillis LONG)") // TODO add when supported natively => team STRUCT<id: INT, name: STRING>

override val runningError: String = "ElasticSearch and Spark must be up and running"

Expand All @@ -97,7 +102,8 @@ trait ElasticWithSharedContext extends SharedXDContextWithDataTest with ElasticS
"name" typed StringType index NotAnalyzed,
"birthday" typed DateType,
"salary" typed DoubleType,
"ageInMilis" typed LongType
"ageInMillis" typed LongType,
"team" inner( "id" typed IntegerType, "name" typed StringType)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ElasticsearchConnectorIT extends ElasticWithSharedContext {
val result = dataframe.collect(Native)

result should have length 10
schema.fieldNames should equal (Seq("id", "age", "description", "enrolled", "name", "optionalField", "birthday", "salary", "ageInMilis"))
schema.fieldNames should equal (Seq("id", "age", "description", "enrolled", "name", "birthday", "salary", "ageInMillis"))
result.head.toSeq(4).toString should fullyMatch regex "Name [0-9]+"
}

Expand Down

0 comments on commit 5348885

Please sign in to comment.