diff --git a/.jenkins.yml b/.jenkins.yml
index 16c9df734..a468fc3c0 100644
--- a/.jenkins.yml
+++ b/.jenkins.yml
@@ -6,7 +6,7 @@ CROSSBUILDAT:
ITSERVICES:
- ZOOKEEPER:
- image: stratio/zookeeper:3.4.6
+ image: jplock/zookeeper:3.5.1-alpha
- MONGODB:
image: stratio/mongo:3.0.4
sleep: 10
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c7041013b..681cc46a7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ Only listing significant user-visible, not internal code cleanups and minor bug
* HTTP/HTTPS Communication layer with transparent driver.
* Bug Fixes:
* Cancellation message is now delivered to the client.
+* Updated dependencies with curator and stratio-commons-utils compatible with Zookeeper 3.5.x
## 1.6.0 (September 2016)
diff --git a/common/src/main/scala/com/stratio/crossdata/common/result/results.scala b/common/src/main/scala/com/stratio/crossdata/common/result/results.scala
index 3add3c406..df2d395b8 100644
--- a/common/src/main/scala/com/stratio/crossdata/common/result/results.scala
+++ b/common/src/main/scala/com/stratio/crossdata/common/result/results.scala
@@ -96,9 +96,12 @@ case class ErrorSQLResult(message: String, cause: Option[Throwable] = None) exte
cause.map(throwable => new RuntimeException(message, throwable)).getOrElse(new RuntimeException(message))
}
+object StreamedSuccessfulSQLResult {
+ implicit def schema2streamed(schema: StructType): StreamedSuccessfulSQLResult = StreamedSchema(schema)
+ implicit def row2streamed(row: Row)(implicit providedSchema: StructType): StreamedSuccessfulSQLResult =
+ StreamedRow(row, Some(providedSchema))
+}
-
-
-
-
-
+sealed trait StreamedSuccessfulSQLResult
+case class StreamedSchema(schema: StructType) extends StreamedSuccessfulSQLResult
+case class StreamedRow(row: Row, providedSchema: Option[StructType] = None) extends StreamedSuccessfulSQLResult
\ No newline at end of file
diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala
index 548715d6f..7c6152e4e 100644
--- a/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala
+++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala
@@ -26,7 +26,7 @@ trait CrossdataCommonSerializer {
implicit val json4sJacksonFormats: Formats =
DefaultFormats + SQLResultSerializer + UUIDSerializer +
- StructTypeSerializer + FiniteDurationSerializer + CommandSerializer +
+ StructTypeSerializer + FiniteDurationSerializer + CommandSerializer + StreamedSuccessfulSQLResultSerializer +
AkkaMemberStatusSerializer + AkkaClusterMemberSerializer + new SortedSetSerializer[Member]()
}
diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala
index cb8bd7f55..2e8f6cfcc 100644
--- a/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala
+++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala
@@ -60,10 +60,10 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case (ArrayType(ty, _), JArray(arr)) =>
mutable.WrappedArray make arr.map(extractField(ty, _)).toArray
/* Maps will be serialized as sub-objects so keys are constrained to be strings */
- case (MapType(StringType, vt, _), JObject(fields)) =>
- val (keys, values) = fields.unzip
- val unserValues = values map (jval => extractField(vt, jval))
- ArrayBasedMapDataNotDeprecated(keys.toArray, unserValues.toArray)
+ case (MapType(kt, vt, _), JObject(JField("map", JObject(JField("keys", JArray(mapKeys)) :: JField("values", JArray(mapValues)) :: _) ) :: _)) =>
+ val unserKeys = mapKeys map (jval => extractField(kt, jval))
+ val unserValues = mapValues map (jval => extractField(vt, jval))
+ ArrayBasedMapDataNotDeprecated(unserKeys.toArray, unserValues.toArray)
case (st: StructType, JObject(JField("values",JArray(values))::_)) =>
deserializeWithSchema(st, values, true)
}
@@ -106,14 +106,15 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case v: ArrayDataNotDeprecated => JArray(v.array.toList.map(v => Extraction.decompose(v)))
case v: mutable.WrappedArray[_] => JArray(v.toList.map(v => Extraction.decompose(v)))
}
- case (MapType(StringType, vt, _), v: MapDataNotDeprecated) =>
+ case (MapType(kt, vt, _), v: MapDataNotDeprecated) =>
/* Maps will be serialized as sub-objects so keys are constrained to be strings */
- val serKeys = v.keyArray().array.map(v => v.toString)
+ val serKeys = v.keyArray().array.map(v => serializeField(kt -> v))
val serValues = v.valueArray.array.map(v => serializeField(vt -> v))
- JObject(
- (v.keyArray.array zip serValues) map {
- case (k: String, v) => JField(k, v)
- } toList
+ JField("map",
+ JObject(
+ JField("keys", JArray(serKeys.toList)),
+ JField("values", JArray(serValues.toList))
+ )
)
case (st: StructType, v: Row) => serializeWithSchema(st, v, true)
}
diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala
new file mode 100644
index 000000000..157daedbd
--- /dev/null
+++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2015 Stratio (http://stratio.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.stratio.crossdata.common.serializers
+
+import com.stratio.crossdata.common.result.{StreamedRow, StreamedSchema, StreamedSuccessfulSQLResult}
+import org.json4s.JsonAST.{JField, JObject}
+import org.json4s.{CustomSerializer, Extraction, Formats}
+import StreamedSuccessfulSQLResultSerializerHelper._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+private[serializers] object StreamedSuccessfulSQLResultSerializerHelper {
+ val SchemaLabel = "streamedSchema"
+ val RowLabel = "streamedRow"
+}
+
+object StreamedSuccessfulSQLResultSerializer extends CustomSerializer[StreamedSuccessfulSQLResult](
+ formats => (
+ {
+ case JObject(JField(SchemaLabel, jSchema)::Nil) =>
+ implicit val _: Formats = formats
+ StreamedSchema(jSchema.extract[StructType])
+ },
+ {
+ case StreamedSchema(schema) => JObject(JField(SchemaLabel, Extraction.decompose(schema)(formats)))
+ case StreamedRow(row, Some(providedSchema)) =>
+ JObject(JField(RowLabel, Extraction.decompose(row)(formats + RowSerializer(providedSchema))))
+ }
+ )
+)
+
+class StreamedRowSerializer(schema: StructType) extends CustomSerializer[StreamedSuccessfulSQLResult](
+ formats => (
+ {
+ case JObject(JField(RowLabel, jRow)::Nil) =>
+ implicit val _: Formats = formats + new RowSerializer(schema)
+ StreamedRow(jRow.extract[Row])
+ },
+ PartialFunction.empty
+ )
+)
diff --git a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala
index 4d3b6ede4..a7666e5dd 100644
--- a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala
+++ b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala
@@ -19,15 +19,16 @@ import com.stratio.crossdata.common.serializers.XDSerializationTest.TestCase
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._
-import org.json4s.jackson.JsonMethods._
-import org.json4s.Extraction
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
+import scala.collection.mutable.WrappedArray
+
@RunWith(classOf[JUnitRunner])
class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSerializer {
- val schema = StructType(List(
+ lazy val schema = StructType(List(
StructField("int",IntegerType,true),
StructField("bigint",LongType,true),
StructField("long",LongType,true),
@@ -47,19 +48,20 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
StructField("arraystring",ArrayType(StringType,true),true),
StructField("mapstringint",MapType(StringType,IntegerType,true),true),
StructField("mapstringstring",MapType(StringType,StringType,true),true),
+ StructField("maptimestampinteger",MapType(TimestampType,IntegerType,true),true),
StructField("struct",StructType(StructField("field1",IntegerType,true)::StructField("field2",IntegerType,true) ::Nil), true),
StructField("arraystruct",ArrayType(StructType(StructField("field1",IntegerType,true)::StructField("field2", IntegerType,true)::Nil),true),true),
StructField("structofstruct",StructType(StructField("field1",TimestampType,true)::StructField("field2", IntegerType, true)::StructField("struct1",StructType(StructField("structField1",StringType,true)::StructField("structField2",IntegerType,true)::Nil),true)::Nil),true)
))
- val values: Array[Any] = Array(
+ lazy val values: Array[Any] = Array(
2147483647,
9223372036854775807L,
9223372036854775807L,
"string",
true,
- 3.3,
- 3.3F,
+ 3.0,
+ 3.0F,
Decimal(12),
Decimal(22),
Decimal(32.0),
@@ -68,16 +70,16 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0"),
12.toShort,
"abcde".getBytes,
- new GenericArrayData(Array(4, 42)),
- new GenericArrayData(Array("hello", "world")),
+ WrappedArray make Array(4, 42),
+ WrappedArray make Array("hello", "world"),
ArrayBasedMapData(Map("b" -> 2)),
ArrayBasedMapData(Map("a" -> "A", "b" -> "B")),
- new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)),
- new GenericArrayData(
- Array(
+ ArrayBasedMapData(Map(java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 25, java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 12)),
+ new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType)
+ ::StructField("field2", IntegerType)::Nil)),
+ WrappedArray make Array(
new GenericRowWithSchema(Array(1,2), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)),
new GenericRowWithSchema(Array(3,4), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil))
- )
),
new GenericRowWithSchema(
Array(
@@ -98,16 +100,16 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
)
)
- val rowWithNoSchema = Row.fromSeq(values)
- val rowWithSchema = new GenericRowWithSchema(values, schema)
+ lazy val rowWithNoSchema = Row.fromSeq(values)
+ lazy val rowWithSchema = new GenericRowWithSchema(values, schema)
+
+ implicit val formats = json4sJacksonFormats + new RowSerializer(schema)
- implicit val formats = json4sJacksonFormats
-
override def testCases: Seq[TestCase] = Seq(
TestCase("marshall & unmarshall a row with no schema", rowWithNoSchema),
TestCase("marshall & unmarshall a row with schema", rowWithSchema)
)
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala b/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala
index 32f5d8156..e9fe385ab 100644
--- a/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala
+++ b/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala
@@ -183,8 +183,6 @@ class XDContextIT extends SharedXDContextTest {
}
-
-
it should "succesfully parse a CROSS JOIN" in {
val crossJoin = "SELECT * FROM table1 CROSS JOIN table2"
@@ -194,7 +192,6 @@ class XDContextIT extends SharedXDContextTest {
}
-
// it must "execute jar app previously uploaded" in {
// val file = File(s"TestAddApp.jar")
// xdContext.addJar("TestAddApp.jar")
diff --git a/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala b/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala
index 0607372a7..81819218e 100644
--- a/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala
+++ b/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala
@@ -15,35 +15,39 @@
*/
package com.stratio.crossdata.driver
-import java.io.{FileInputStream, InputStream}
-import java.security.{KeyStore, SecureRandom}
+import java.security.SecureRandom
import java.util.UUID
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLException, TrustManagerFactory}
+import akka.NotUsed
import akka.actor.ActorRef
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.http.scaladsl.{Http, HttpExt, HttpsConnectionContext}
import akka.http.scaladsl.marshalling.{Marshal, Marshaller}
-import akka.http.scaladsl.model.{HttpMethod, HttpRequest, RequestEntity, ResponseEntity}
-import akka.stream.{ActorMaterializer, StreamTcpException, TLSClientAuth}
+import akka.http.scaladsl.model._
+import akka.stream.{ActorMaterializer, TLSClientAuth}
import com.stratio.crossdata.common.result._
import com.stratio.crossdata.common.security.{KeyStoreUtils, Session}
import com.stratio.crossdata.driver.config.DriverConf
import com.stratio.crossdata.driver.session.{Authentication, SessionManager}
import org.slf4j.{Logger, LoggerFactory}
import akka.http.scaladsl.model.HttpMethods._
-import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
+import akka.http.scaladsl.unmarshalling.{Unmarshaller, _}
+import akka.stream.scaladsl.{Keep, Sink, Source}
+import akka.util.ByteString
import com.stratio.crossdata.common._
-import com.stratio.crossdata.common.serializers.CrossdataCommonSerializer
+import com.stratio.crossdata.common.serializers.{CrossdataCommonSerializer, StreamedRowSerializer}
import com.stratio.crossdata.driver.actor.HttpSessionBeaconActor
import com.stratio.crossdata.driver.exceptions.TLSInvalidAuthException
+import org.apache.spark.sql.Row
import org.json4s.jackson
+import scala.collection.generic.SeqFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Try}
class HttpDriver private[driver](driverConf: DriverConf,
@@ -145,22 +149,61 @@ class HttpDriver private[driver](driverConf: DriverConf,
val sqlCommand = new SQLCommand(query, retrieveColNames = driverConf.getFlattenTables)
- val response = simpleRequest(
- securitizeCommand(sqlCommand),
- s"query/${sqlCommand.requestId}",
- {
- case SQLReply(_, result: SQLResult) =>
- result
- } : PartialFunction[SQLReply, SQLResult]
- )
+ // Performs the request to server
+ val response = Marshal(securitizeCommand(sqlCommand)).to[RequestEntity] flatMap { requestEntity =>
+ val request = HttpRequest(POST, s"$protocol://$serverHttp/query/${sqlCommand.requestId}", entity = requestEntity)
+ http.singleRequest(request) flatMap { httpResponse =>
+
+ if(httpResponse.status == StatusCodes.OK) { // OK Responses will be served through streaming
+
+ val bytesSource = httpResponse.entity.dataBytes // This is the stream of bytes of the answer data...
+ val framesSource = bytesSource.filterNot(bs => bs.isEmpty || bs == ByteString("\n")) //...empty lines get removed...
+ val rawSchemaAndRawRowsSource = framesSource.prefixAndTail[ByteString](1) //remaining get transformed to ByteStrings.
+
+ // From the raw lines stream, a new stream providing the first one and a stream of the remaining ones is created
+ val sink = Sink.head[(Seq[ByteString], Source[ByteString, NotUsed])] //Its single elements get extracted by future...
+
+ for { /*.. which, once completed,
+ provides a ByteString with the serialized schema and the stream of remaining lines:
+ The bulk of serialized rows.*/
+ (Seq(rawSchema), rawRows) <- rawSchemaAndRawRowsSource.toMat(sink)(Keep.right).run
+ StreamedSchema(schema) <- Unmarshal(HttpEntity(ContentTypes.`application/json`, rawSchema)).to[StreamedSuccessfulSQLResult]
+
+ // Having de-serialized the schema, it can be used to deserialize each row at the un-marshalling phase
+ rrows <- {
+ implicit val json4sJacksonFormats = this.json4sJacksonFormats + new StreamedRowSerializer(schema)
+
+ val um: Unmarshaller[ResponseEntity, StreamedSuccessfulSQLResult] = json4sUnmarshaller
+
+ rawRows.mapAsync(1) { bs => /* TODO: Study the implications of increasing the level of parallelism in
+ * the unmarshalling phase. */
+ val entity = HttpEntity(ContentTypes.`application/json`, bs)
+ um(entity)
+ }
+ }.runFold(List.empty[Row]) { case (acc: List[Row], StreamedRow(row, None)) => row::acc }
+
+ } yield SuccessfulSQLResult(rrows.reverse toArray, schema) /* TODO: Performance could be increased if
+ `SuccessfulSQLResult`#resultSet were of type `Seq[Row]`*/
+
+ } else {
+
+ Unmarshal(httpResponse.entity).to[SQLReply] map {
+ case SQLReply(_, result: SQLResult) => result
+ }
+
+ }
+
+ }
+ }
new SQLResponse(sqlCommand.requestId, response) {
override def cancelCommand(): Future[QueryCancelledReply] = {
val command = CancelQueryExecution(sqlCommand.queryId)
simpleRequest(
securitizeCommand(command),
- s"query/${command.requestId}",
- { case reply: QueryCancelledReply => reply }: PartialFunction[SQLReply, QueryCancelledReply]
+ s"query/${command.requestId}", {
+ case reply: QueryCancelledReply => reply
+ }: PartialFunction[SQLReply, QueryCancelledReply]
)
}
}
diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala
index 70a20f19b..66e9614ba 100644
--- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala
+++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala
@@ -65,7 +65,7 @@ class DefaultSourceESSpec extends BaseXDTest with MockitoSugar {
val item: Table = mock[Table]
when(item.database).thenReturn(Some("index"))
when(item.tableName).thenReturn("type")
- val userOpts: Map[String, String] = Map(ES_HOST -> "localhost")
+ val userOpts: Map[String, String] = Map(ES_NODES -> "localhost")
//Experimentation
val result:Map[String, String] = defaultDatasource.generateConnectorOpts(item, userOpts)
@@ -73,7 +73,7 @@ class DefaultSourceESSpec extends BaseXDTest with MockitoSugar {
//Expectations
result should not be null
result.get(ES_RESOURCE).get should be ("index/type")
- result.get(ES_HOST).get should be ("localhost")
+ result.get(ES_NODES).get should be ("localhost")
}
}
diff --git a/pom.xml b/pom.xml
index 71798058d..5a7448e59 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,9 +123,9 @@
10.10.1.1
3.2.10
1.10.19
- 0.5.0
+ 0.7.0
18.0
- 2.11.0
+ 3.2.0
2.11
2.11.8
diff --git a/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala b/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala
index e7cfd3f3b..ef797c3db 100644
--- a/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala
+++ b/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala
@@ -19,27 +19,33 @@ import java.io.File
import java.util.UUID
import java.util.concurrent.TimeUnit
+import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.ask
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, SendToAll}
+import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.Multipart.BodyPart
import akka.http.scaladsl.server.Directive
import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.unmarshalling._
import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.FileIO
-import akka.util.Timeout
+import akka.stream.scaladsl.{FileIO, Flow, Source}
+import akka.util.{ByteString, Timeout}
import com.stratio.crossdata.common.security.Session
import com.stratio.crossdata.common.util.akka.keepalive.LiveMan.HeartBeat
import com.stratio.crossdata.common._
+import com.stratio.crossdata.common.result.{ErrorSQLResult, StreamedSchema, StreamedSuccessfulSQLResult, SuccessfulSQLResult}
import com.stratio.crossdata.server.actors.ResourceManagerActor
import com.stratio.crossdata.server.config.ServerConfig
import com.stratio.crossdata.util.HdfsUtils
import com.typesafe.config.{Config, ConfigException}
import org.apache.log4j.Logger
+import org.apache.spark.sql.Row
import org.apache.spark.sql.crossdata.XDContext
import org.apache.spark.sql.crossdata.serializers.CrossdataSerializer
+import org.apache.spark.sql.types.StructType
import org.json4s.jackson
import scala.concurrent.Future
@@ -131,9 +137,28 @@ class CrossdataHttpServer(config: Config, serverActor: ActorRef, implicit val sy
case Success(reply: ServerReply) =>
reply match {
case qcr: QueryCancelledReply => complete(qcr)
+ case SQLReply(_, SuccessfulSQLResult(resultSet, schema)) =>
+
+ implicit val jsonStreamingSupport = EntityStreamingSupport.json()
+ .withFramingRenderer(
+ Flow[ByteString].intersperse(ByteString("\n"))
+ )
+
+ implicit val _: StructType = schema
+
+ val responseStream: Source[StreamedSuccessfulSQLResult, NotUsed] =
+ Source.fromIterator(() => resultSet.toIterator).map(
+ row => row: StreamedSuccessfulSQLResult
+ ) prepend Source.single(schema)
+
+ complete(responseStream)
+
case _ => complete(reply)
+
}
- case other => complete(StatusCodes.ServerError, s"Internal XD server error: $other")
+ case other =>
+ val httpErrorReply = SQLReply(rq.cmd.requestId, ErrorSQLResult(s"Internal XD server error: $other"))
+ complete(StatusCodes.ServerError -> httpErrorReply)
}
}
diff --git a/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java b/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java
index de1c2522a..a25866724 100644
--- a/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java
+++ b/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java
@@ -53,11 +53,13 @@ public void setUp() {
logger.info("Connecting to Cassandra Cluster");
cassandra.connect();
logger.info("Checking if the catalog exists");
- if (cassandra.existsKeyspace(catalog, false)) {
+ if (cassandra.existsKeyspace(catalog, false) || cassandra.existsKeyspace("databasetest", false) ) {
logger.info("The catalog exists");
cassandra.executeQuery("DROP KEYSPACE \"KUYA\"");
+ cassandra.executeQuery("DROP KEYSPACE databasetest");
logger.info("The catalog has benn dropped");
}
+
cassandra.loadTestData(catalog, "/scripts/CassandraMicroScript.cql");
List tables = cassandra.getTables(catalog);
String connector = "Cassandra-Micro";
diff --git a/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature b/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature
index ca98d0a44..d7d9f8358 100644
--- a/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature
+++ b/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature
@@ -24,8 +24,50 @@ Feature: [CROSSDATA-677] Microstrategy - Tests Queries
When I execute 'select a11.DES_PRODMDV DES_PRODMDV,a11.COD_SEMAMES COD_SEMAMES,a11.DES_DTMM DES_DTMM,a11.DES_CBCMM DES_CBCMM,a11.DES_ZOMM DES_ZOMM,a11.DES_OFIMM DES_OFIMM,a11.DES_NIVMM DES_NIVMM,a11.COD_DATIMP COD_BANCSB,max(a11.COD_OFIMM) COD_OFICI,max(a11.DES_OFIPA) DES_OFIPA,max(a11.COD_BANCSBM) COD_BANCSB0,a11.DES_UNIMMDV DES_UNIMMDV,a11.DES_EVENMDV DES_EVENMDV,a11.COD_CONTIGO COD_CONTIGO,max(a12.DES_CONTIGO) DES_CONTIGO,max(a12.COD_ORDCONTI) COD_ORDCONTI,sum(a11.IMP_BASINCRE) WJXBFS1,sum(a11.IMP_BASTRANS) WJXBFS2,sum(a11.IMP_REINCRE) WJXBFS3,sum(a11.IMP_RETRANSA) WJXBFS4,sum(a11.QNU_TOPRODUC) WJXBFS5 from KUYA.TKUYAKPR a11 join KUYA.TKUYAKGO a12 on (a11.COD_CONTIGO = a12.COD_CONTIGO) where (a11.COD_SEMAMES in ('8.20a26JUN') and ((a11.COD_BANCSBM = '0182' and (a11.DES_OFIPA like '%0013%' or a11.COD_OFIMM = '0013')) or (a11.COD_BANCSBM = '0182' and (a11.DES_OFIPA like '%6051%' or a11.COD_OFIMM = '6051')))) group by a11.DES_PRODMDV,a11.COD_SEMAMES,a11.DES_DTMM,a11.DES_CBCMM,a11.DES_ZOMM,a11.DES_OFIMM,a11.DES_NIVMM,a11.COD_DATIMP,a11.DES_UNIMMDV,a11.DES_EVENMDV,a11.COD_CONTIGO'
Then an exception 'IS NOT' thrown
+ Scenario: [CROSSDATA-728] MICRO SELECT COUNT(DISTINCT * )
+ When I execute 'SELECT count(distinct ident) FROM databasetest.tabletest'
+ Then an exception 'IS NOT' thrown
+ Then The result has to have '1' rows
+ |_c0-long|
+ | 10 |
+ Scenario: [CROSSDATA-728] MICRO SELECT COUNT(DISTINCT * ) with alias
+ When I execute 'SELECT count(distinct ident) as count_distinct FROM databasetest.tabletest'
+ Then an exception 'IS NOT' thrown
+ Then The result has to have '1' rows
+ |count_distinct-long|
+ | 10 |
+ Scenario: [CROSSDATA-728] MICRO SELECT COUNT(DISTINCT * ) with alias
+ When I execute 'SELECT count(distinct ident) as count_distinct FROM databasetest.tabletest WHERE ident > 4'
+ Then an exception 'IS NOT' thrown
+ Then The result has to have '1' rows
+ |count_distinct-long|
+ | 5 |
-
+ Scenario: [CROSSDATA-717] MICRO SELECT + CROSS JOIN
+ When I execute 'SELECT tabletest.ident ,tab1.name FROM databasetest.tabletest CROSS JOIN databasetest.tab1'
+ Then an exception 'IS NOT' thrown
+ Then The result has to have '20' rows
+ |ident-integer| name-string|
+ | 5 |name_5 |
+ | 5 |name_5 |
+ | 1 |name_1 |
+ | 1 |name_1 |
+ | 8 |name_8 |
+ | 8 |name_8 |
+ | 0 |name_0 |
+ | 0 |name_0 |
+ | 2 |name_2 |
+ | 2 |name_2 |
+ | 4 |name_4 |
+ | 4 |name_4 |
+ | 7 |name_7 |
+ | 7 |name_7 |
+ | 6 |name_6 |
+ | 6 |name_6 |
+ | 9 |name_9 |
+ | 9 |name_9 |
+ | 3 |name_3 |
+ | 3 |name_3 |
diff --git a/testsAT/src/test/resources/scripts/CassandraMicroScript.cql b/testsAT/src/test/resources/scripts/CassandraMicroScript.cql
index 95478b9c5..e1535a651 100644
--- a/testsAT/src/test/resources/scripts/CassandraMicroScript.cql
+++ b/testsAT/src/test/resources/scripts/CassandraMicroScript.cql
@@ -13,4 +13,19 @@ insert into "KUYA"."TKUYAKCL"("COD_PERSOCL","COD_OFIPALCL","DES_SUBSEGLO","COD_B
insert into "KUYA"."TKUYAKCL"("COD_PERSOCL","COD_OFIPALCL","DES_SUBSEGLO","COD_BANCSBM","IMP_TRANINI","IMP_TRANFIN","IMP_VENTCRUZ","IMP_OBJPROP","COD_SEMAMES","COD_IDFISCAK","DES_NOCLIE","DES_BANMM","DES_EVENMDV","DES_UNIMMDV","DES_NIVMM","DES_DTMM","DES_ZOMM","DES_CBCMM","DES_OFIMM","COD_DATIMP","COD_TPVCL","COD_TARJECL","COD_RECIBCL","COD_EMIRECIB","COD_COMEX","COD_INGRESCL","COD_NETCL","COD_MOVIL","COD_MAILCL","COD_SEGUROS","COD_CNIVELCL","DES_OFIPA","AUD_TIM","COD_CONTIGO") VALUES ('eee','0016','eee','6051',5,5,5,5,'8.20a26JUN','eee','eee','eee','eee','eee','eee','eee','eee','eee','eee','eee',5,5,5,5,5,5,5,5,5,5,5,'060510','13-09-16 15:00',5);
CREATE TABLE "KUYA"."TKUYAKEV"("COD_SEMAMES" varchar PRIMARY KEY,"DES_UNIREF" varchar,"DES_EVENMDV" varchar,"IMP_RETRANSA" int,"IMP_REINCRE" int,"DES_BANMM" varchar,"DES_UNIMMDV"varchar,"DES_NIVMM" VARCHAR,"DES_DTMM" varchar,"DES_ZOMM" varchar,"DES_CBCMM" varchar,"DES_OFIMM" varchar,"COD_DATIMP"varchar,"IMP_BASTRANS" int,"IMP_BASINCRE" int,"IMP_BATRAINI" int,"IMP_CUMPLIMI" int,"QNU_BASEVMDV" int,"DES_OFIPA" varchar,"DES_SEGCLM" varchar,"COD_BANCSBM" varchar,"COD_OFIMM" varchar,"DES_EVENCORT" varchar,"AUD_TIM" TIMESTAMP,"COD_CONTIGO" int);
CREATE TABLE "KUYA"."TKUYAKSE"("COD_SEMAMES" VARCHAR PRIMARY KEY, "DES_UNIREF" varchar, "DES_EVENMDV" varchar, "QNU_RENUMMDV" int, "IMP_RECUMPLI" int, "IMP_RETRANSA" int, "IMP_RETRAINI" int, "IMP_REINCRE" int, "QNU_BASEVMDV" int, "IMP_BASTRANS" int, "IMP_BATRAINI" int, "IMP_OBJPROP" int, "IMP_BASINCRE" int, "DES_BANMM" varchar, "IMP_CUMPLIMI" int, "DES_UNIMMDV" varchar, "DES_NIVMM" varchar, "DES_DTMM" varchar, "DES_ZOMM" varchar, "DES_CBCMM" varchar, "DES_OFIMM" varchar, "COD_DATIMP" varchar, "QNU_RAKEVMDV" int, "QNU_RAKNICON" int, "QNU_RAKTRANS" int, "QNU_INCRETO" int, "DES_OFIPA" varchar, "QNU_RANKMAX" int, "DES_SEGCLM" varchar, "COD_BANCSBM" varchar, "COD_OFIMM" varchar, "DES_EVENCORT" varchar, "AUD_TIM" TIMESTAMP, "COD_CONTIGO" int);
-CREATE TABLE "KUYA"."TKUYAKPR" ("COD_SEMAMES" varchar primary key, "DES_UNIREF" varchar, "DES_EVENMDV" varchar, "IMP_RETRANSA" int, "IMP_REINCRE" int, "DES_BANMM" varchar, "DES_PRODMDV" varchar, "DES_UNIMMDV" varchar, "DES_NIVMM" VARCHAR, "DES_DTMM" varchar, "DES_ZOMM" varchar, "DES_CBCMM" varchar, "DES_OFIMM" varchar, "COD_DATIMP" varchar, "IMP_BASTRANS" int, "IMP_BASINCRE" int, "QNU_TOPRODUC" int, "DES_OFIPA" varchar, "COD_BANCSBM" varchar, "COD_OFIMM" varchar, "DES_EVENCORT" varchar, "AUD_TIM" TIMESTAMP, "COD_CONTIGO" int);
\ No newline at end of file
+CREATE TABLE "KUYA"."TKUYAKPR" ("COD_SEMAMES" varchar primary key, "DES_UNIREF" varchar, "DES_EVENMDV" varchar, "IMP_RETRANSA" int, "IMP_REINCRE" int, "DES_BANMM" varchar, "DES_PRODMDV" varchar, "DES_UNIMMDV" varchar, "DES_NIVMM" VARCHAR, "DES_DTMM" varchar, "DES_ZOMM" varchar, "DES_CBCMM" varchar, "DES_OFIMM" varchar, "COD_DATIMP" varchar, "IMP_BASTRANS" int, "IMP_BASINCRE" int, "QNU_TOPRODUC" int, "DES_OFIPA" varchar, "COD_BANCSBM" varchar, "COD_OFIMM" varchar, "DES_EVENCORT" varchar, "AUD_TIM" TIMESTAMP, "COD_CONTIGO" int);
+CREATE KEYSPACE databasetest WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
+CREATE TABLE databasetest.tabletest(ident INT PRIMARY KEY, name TEXT, money DOUBLE, new BOOLEAN, date Timestamp);
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (0, 'name_0', 10.2 ,true,'1999-11-30');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (1, 'name_1', 11.2 ,true,'2001-01-01');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (2, 'name_2', 12.2 ,true,'2002-02-02');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (3, 'name_3', 13.2 ,true,'2003-03-03');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (4, 'name_4', 14.2 ,true,'2004-04-04');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (5, 'name_5', 15.2 ,true,'2005-05-05');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (6, 'name_6', 16.2 ,true,'2006-06-06');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (7, 'name_7', 17.2 ,true,'2007-07-07');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (8, 'name_8', 18.2 ,true,'2008-08-08');
+INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (9, 'name_9', 19.2 ,true,'2009-09-09');
+CREATE TABLE databasetest.tab1(name TEXT PRIMARY KEY);
+INSERT INTO databasetest.tab1(name) VALUES ('name_0');
+INSERT INTO databasetest.tab1(name) VALUES ('name_1');
\ No newline at end of file