diff --git a/CHANGELOG.md b/CHANGELOG.md index a95f26ebc..c088ae8dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,11 @@ Only listing significant user-visible, not internal code cleanups and minor bug fixes. -## 1.5.0 (upcoming) +## 1.6.0 (upcoming) + +* Elasticsearch: support import tables without specifying a particular resource + +## 1.5.0 (August 2016) * Global index: use of inverted indices for speeding-up queries * Downgrade to Elasticsearch 2.0.2 diff --git a/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSource.scala b/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSource.scala index 46766887d..90b176081 100644 --- a/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSource.scala +++ b/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSource.scala @@ -97,7 +97,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider * @param parameters a Map with the configurations parameters * @return the validated map. */ - private def params(parameters: Map[String, String]) = { + private def params(parameters: Map[String, String], resourceValidation: Boolean = true) = { // '.' seems to be problematic when specifying the options val params = parameters.map { case (k, v) => (k.replace('_', '.'), v) }.map { case (k, v) => @@ -108,9 +108,10 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider else ("es." + k, v) } - // validate path - if (params.get(ConfigurationOptions.ES_RESOURCE_READ).orElse(params.get(ConfigurationOptions.ES_RESOURCE)).isEmpty) - throw new EsHadoopIllegalArgumentException("resource must be specified for Elasticsearch resources.") + if (resourceValidation) { + if (params.get(ConfigurationOptions.ES_RESOURCE_READ).orElse(params.get(ConfigurationOptions.ES_RESOURCE)).isEmpty) + throw new EsHadoopIllegalArgumentException("resource must be specified for Elasticsearch resources.") + } params } @@ -131,7 +132,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider if (!options.contains(opName)) sys.error( s"""Option "$opName" is mandatory for IMPORT TABLES""") } - ElasticSearchConnectionUtils.listTypes(params(options)) + ElasticSearchConnectionUtils.listTypes(params(options, resourceValidation = false)) } override def createExternalTable(context: SQLContext, diff --git a/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtils.scala b/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtils.scala index 409728b32..c3cbf0038 100644 --- a/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtils.scala +++ b/elasticsearch/src/main/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtils.scala @@ -15,10 +15,9 @@ */ package com.stratio.crossdata.connector.elasticsearch -import com.sksamuel.elastic4s.{ElasticsearchClientUri, ElasticClient} +import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri} import com.stratio.crossdata.connector.TableInventory.Table import com.stratio.crossdata.connector.elasticsearch.DefaultSource._ -import org.apache.spark.sql.types._ import org.elasticsearch.client.IndicesAdminClient import org.elasticsearch.cluster.metadata.MappingMetaData import org.elasticsearch.common.collect.ImmutableOpenMap @@ -98,35 +97,10 @@ object ElasticSearchConnectionUtils { } - - private def getIndexDetails(indexName:String, indexData: ImmutableOpenMap[String, MappingMetaData]): Seq[Table] ={ - indexData.keys().map(typeES => new Table(typeES.value, Some(indexName), Some(buildStructType(indexData.get(typeES.value))))).toSeq - } - - private def convertType(typeName:String): DataType = { - - typeName match { - case "string"=> StringType - case "integer" => IntegerType - case "date" => DateType - case "boolean" => BooleanType - case "double" => DoubleType - case "long" => LongType - case "float" => FloatType - case "null" => NullType - case _ => throw new RuntimeException (s"The type $typeName isn't supported yet in Elasticsearch connector.") - } - + private def getIndexDetails(indexName: String, indexData: ImmutableOpenMap[String, MappingMetaData]): Seq[Table] = { + val schema = None // Elasticsearch 'datasource' is already able to infer the schema + indexData.keys().map(typeES => new Table(typeES.value, Some(indexName), schema)).toSeq } - private def buildStructType(mapping: MappingMetaData): StructType ={ - - val esFields = mapping.sourceAsMap().get("properties").asInstanceOf[java.util.LinkedHashMap[String,java.util.LinkedHashMap[String, String]]].toMap - val fields: Seq[StructField] = esFields.map { - case (colName, propertyValueMap) => StructField(colName, convertType(propertyValueMap.get("type")), false) - }(collection.breakOut) - - StructType(fields) - } } diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticInsertTableIT.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticInsertTableIT.scala index 00f639bb5..f8d32b6f7 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticInsertTableIT.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticInsertTableIT.scala @@ -15,17 +15,19 @@ */ package com.stratio.crossdata.connector.elasticsearch -import com.sksamuel.elastic4s.ElasticDsl._ import org.apache.spark.sql.Row import org.apache.spark.sql.crossdata.ExecutionType +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner // TODO ignore while elastic fix ... or when native support select map/array values +@RunWith(classOf[JUnitRunner]) class ElasticInsertTableIT extends ElasticInsertCollection { it should "insert a row using INSERT INTO table VALUES in ElasticSearch" ignore { val query = s"""|INSERT INTO $Type VALUES (20, 25, 'proof description', true, 'Eve', false, '2015-01-01' , - |1200.00, 1463646640046, ['proof'], (a->2), [ (x -> 1, y-> 1), (z -> 1) ], ( x->[1,2], y-> [3,4] ))""".stripMargin + |1200.00, 1463646640046, ['proof'], (a->2), [ (x -> 1, y-> 1), (z -> 1) ], ( x->[1,2], y-> [3,4] ))""".stripMargin _xdContext.sql(query).collect() should be (Row(1)::Nil) diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtilsIT.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtilsIT.scala index afcffc185..054faaf76 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtilsIT.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchConnectionUtilsIT.scala @@ -55,9 +55,9 @@ class ElasticSearchConnectionUtilsIT extends ElasticWithSharedContext with Elast val types = ElasticSearchConnectionUtils.listTypes(options) //Expectations - types should not be (null) + types should not be null types.size should be (1) - types(0).schema.get.size should be (8) + types.head.schema shouldBe empty } diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchImportTablesIT.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchImportTablesIT.scala index f0b650dd5..5ad623992 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchImportTablesIT.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchImportTablesIT.scala @@ -16,13 +16,17 @@ package com.stratio.crossdata.connector.elasticsearch import com.sksamuel.elastic4s.ElasticDsl._ +import org.apache.spark.sql.types._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +@RunWith(classOf[JUnitRunner]) class ElasticSearchImportTablesIT extends ElasticWithSharedContext { // IMPORT OPERATIONS - it should "import all tables from a keyspace" in { + it should "import all tables from an index" in { assumeEnvironmentIsUpAndRunning def tableCountInHighschool: Long = sql("SHOW TABLES").count val initialLength = tableCountInHighschool @@ -69,7 +73,25 @@ class ElasticSearchImportTablesIT extends ElasticWithSharedContext { //Expectations xdContext.tableNames() should contain (s"$Index.$Type") - xdContext.table(s"$Index.$Type").schema should have length 8 + val schema = xdContext.table(s"$Index.$Type").schema + + schema should have length 9 + + schema("age").dataType shouldBe IntegerType + schema("description").dataType shouldBe StringType + schema("enrolled").dataType shouldBe BooleanType + schema("salary").dataType shouldBe DoubleType + schema("ageInMillis").dataType shouldBe LongType + schema("birthday").dataType shouldBe TimestampType + schema("team").dataType shouldBe a [StructType] + + val teamSchema = schema("team").dataType.asInstanceOf[StructType] + teamSchema.fields should have length 2 + teamSchema("id").dataType shouldBe IntegerType + teamSchema("name").dataType shouldBe StringType + + + } it should "infer schema after import One table from an Index" in { @@ -190,6 +212,7 @@ class ElasticSearchImportTablesIT extends ElasticWithSharedContext { } } + lazy val connectionOptions: Map[String, String] = Map( "es.nodes" -> s"$ElasticHost", "es.port" -> s"$ElasticRestPort", diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchTypesIT.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchTypesIT.scala index be77ef1e9..ca3337f72 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchTypesIT.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticSearchTypesIT.scala @@ -33,12 +33,12 @@ class ElasticSearchTypesIT extends ElasticWithSharedContext { // "name" typed StringType index NotAnalyzed, // "birthday" typed DateType, // "salary" typed DoubleType, -// "ageInMilis" typed LongType +// "ageInMillis" typed LongType "A ElasticSearchQueryProcessor " should "Return types in correct format" in { assumeEnvironmentIsUpAndRunning //Experimentation - val dataframe = sql(s"SELECT * FROM $Type where id = 2") + val dataframe = sql(s"SELECT id, age, description, enrolled, name, birthday, salary, ageInMillis FROM $Type where id = 2") val result = dataframe.collect(Native) //Expectations @@ -48,18 +48,17 @@ class ElasticSearchTypesIT extends ElasticWithSharedContext { result(0).get(3).isInstanceOf[Boolean] should be (true) result(0).get(4).isInstanceOf[String] should be (true) - result(0).get(6).isInstanceOf[Date] should be (true) - result(0).get(7).isInstanceOf[Double] should be (true) - result(0).get(8).isInstanceOf[Long] should be (true) + result(0).get(5).isInstanceOf[Date] should be (true) + result(0).get(6).isInstanceOf[Double] should be (true) + result(0).get(7).isInstanceOf[Long] should be (true) result(0).getInt(0) should be (2) result(0).getInt(1) should be (12) result(0).getString(2) should be ("A 2description about the Name2") result(0).getBoolean(3) should be (true) result(0).getString(4) should be ("Name 2") - - result(0).getDate(6) should be (DateTime.parse("1982-01-01T10:00:00-00:00").toDate) - result(0).getDouble(7) should be (2001.0) - result(0).getLong(8) should be (DateTime.parse("1982-01-01T10:00:00-00:00").getMillis) + result(0).getDate(5) should be (DateTime.parse("1982-01-01T10:00:00-00:00").toDate) + result(0).getDouble(6) should be (2001.0) + result(0).getLong(7) should be (DateTime.parse("1982-01-01T10:00:00-00:00").getMillis) } } diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticWithSharedContext.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticWithSharedContext.scala index 4f113bacd..247e73580 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticWithSharedContext.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticWithSharedContext.scala @@ -57,7 +57,8 @@ trait ElasticWithSharedContext extends SharedXDContextWithDataTest with ElasticS "name" -> s"Name $a", "birthday" -> DateTime.parse((1980 + a) + "-01-01T10:00:00-00:00").toDate, "salary" -> a * 1000.5, - "ageInMilis" -> DateTime.parse((1980 + a) + "-01-01T10:00:00-00:00").getMillis) + "ageInMillis" -> DateTime.parse((1980 + a) + "-01-01T10:00:00-00:00").getMillis, + "team" -> Map("id" -> (a+1), "name" -> s"team_name$a") ) }.await client.get.execute { flush index Index @@ -78,8 +79,12 @@ trait ElasticWithSharedContext extends SharedXDContextWithDataTest with ElasticS elasticClient } toOption + + + + "team" inner( "id" typed IntegerType, "name" typed StringType) override def sparkRegisterTableSQL: Seq[SparkTable] = super.sparkRegisterTableSQL :+ - str2sparkTableDesc(s"CREATE TEMPORARY TABLE $Type (id INT, age INT, description STRING, enrolled BOOLEAN, name STRING, optionalField BOOLEAN, birthday DATE, salary DOUBLE, ageInMilis LONG)") + str2sparkTableDesc(s"CREATE TEMPORARY TABLE $Type (id INT, age INT, description STRING, enrolled BOOLEAN, name STRING, birthday DATE, salary DOUBLE, ageInMillis LONG)") // TODO add when supported natively => team STRUCT override val runningError: String = "ElasticSearch and Spark must be up and running" @@ -97,7 +102,8 @@ trait ElasticWithSharedContext extends SharedXDContextWithDataTest with ElasticS "name" typed StringType index NotAnalyzed, "birthday" typed DateType, "salary" typed DoubleType, - "ageInMilis" typed LongType + "ageInMillis" typed LongType, + "team" inner( "id" typed IntegerType, "name" typed StringType) ) } diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticsearchConnectorIT.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticsearchConnectorIT.scala index 8edc8a770..2d903d052 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticsearchConnectorIT.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/ElasticsearchConnectorIT.scala @@ -34,7 +34,7 @@ class ElasticsearchConnectorIT extends ElasticWithSharedContext { val result = dataframe.collect(Native) result should have length 10 - schema.fieldNames should equal (Seq("id", "age", "description", "enrolled", "name", "optionalField", "birthday", "salary", "ageInMilis")) + schema.fieldNames should equal (Seq("id", "age", "description", "enrolled", "name", "birthday", "salary", "ageInMillis")) result.head.toSeq(4).toString should fullyMatch regex "Name [0-9]+" }