Skip to content
This repository has been archived by the owner on Nov 20, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/stratio/crossdata into me…
Browse files Browse the repository at this point in the history
…rgeMasterIntoBranch-1.7
  • Loading branch information
mafernandez-stratio committed Oct 31, 2016
2 parents c1aea63 + 9f2e3a1 commit 944603c
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .jenkins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CROSSBUILDAT:

ITSERVICES:
- ZOOKEEPER:
image: stratio/zookeeper:3.4.6
image: jplock/zookeeper:3.5.1-alpha
- MONGODB:
image: stratio/mongo:3.0.4
sleep: 10
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Only listing significant user-visible, not internal code cleanups and minor bug
* HTTP/HTTPS Communication layer with transparent driver.
* Bug Fixes:
* Cancellation message is now delivered to the client.
* Updated dependencies with curator and stratio-commons-utils compatible with Zookeeper 3.5.x

## 1.6.0 (September 2016)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ case class ErrorSQLResult(message: String, cause: Option[Throwable] = None) exte
cause.map(throwable => new RuntimeException(message, throwable)).getOrElse(new RuntimeException(message))
}

object StreamedSuccessfulSQLResult {
implicit def schema2streamed(schema: StructType): StreamedSuccessfulSQLResult = StreamedSchema(schema)
implicit def row2streamed(row: Row)(implicit providedSchema: StructType): StreamedSuccessfulSQLResult =
StreamedRow(row, Some(providedSchema))
}






sealed trait StreamedSuccessfulSQLResult
case class StreamedSchema(schema: StructType) extends StreamedSuccessfulSQLResult
case class StreamedRow(row: Row, providedSchema: Option[StructType] = None) extends StreamedSuccessfulSQLResult
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait CrossdataCommonSerializer {

implicit val json4sJacksonFormats: Formats =
DefaultFormats + SQLResultSerializer + UUIDSerializer +
StructTypeSerializer + FiniteDurationSerializer + CommandSerializer +
StructTypeSerializer + FiniteDurationSerializer + CommandSerializer + StreamedSuccessfulSQLResultSerializer +
AkkaMemberStatusSerializer + AkkaClusterMemberSerializer + new SortedSetSerializer[Member]()

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case (ArrayType(ty, _), JArray(arr)) =>
mutable.WrappedArray make arr.map(extractField(ty, _)).toArray
/* Maps will be serialized as sub-objects so keys are constrained to be strings */
case (MapType(StringType, vt, _), JObject(fields)) =>
val (keys, values) = fields.unzip
val unserValues = values map (jval => extractField(vt, jval))
ArrayBasedMapDataNotDeprecated(keys.toArray, unserValues.toArray)
case (MapType(kt, vt, _), JObject(JField("map", JObject(JField("keys", JArray(mapKeys)) :: JField("values", JArray(mapValues)) :: _) ) :: _)) =>
val unserKeys = mapKeys map (jval => extractField(kt, jval))
val unserValues = mapValues map (jval => extractField(vt, jval))
ArrayBasedMapDataNotDeprecated(unserKeys.toArray, unserValues.toArray)
case (st: StructType, JObject(JField("values",JArray(values))::_)) =>
deserializeWithSchema(st, values, true)
}
Expand Down Expand Up @@ -106,14 +106,15 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case v: ArrayDataNotDeprecated => JArray(v.array.toList.map(v => Extraction.decompose(v)))
case v: mutable.WrappedArray[_] => JArray(v.toList.map(v => Extraction.decompose(v)))
}
case (MapType(StringType, vt, _), v: MapDataNotDeprecated) =>
case (MapType(kt, vt, _), v: MapDataNotDeprecated) =>
/* Maps will be serialized as sub-objects so keys are constrained to be strings */
val serKeys = v.keyArray().array.map(v => v.toString)
val serKeys = v.keyArray().array.map(v => serializeField(kt -> v))
val serValues = v.valueArray.array.map(v => serializeField(vt -> v))
JObject(
(v.keyArray.array zip serValues) map {
case (k: String, v) => JField(k, v)
} toList
JField("map",
JObject(
JField("keys", JArray(serKeys.toList)),
JField("values", JArray(serValues.toList))
)
)
case (st: StructType, v: Row) => serializeWithSchema(st, v, true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.crossdata.common.serializers

import com.stratio.crossdata.common.result.{StreamedRow, StreamedSchema, StreamedSuccessfulSQLResult}
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.{CustomSerializer, Extraction, Formats}
import StreamedSuccessfulSQLResultSerializerHelper._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
private[serializers] object StreamedSuccessfulSQLResultSerializerHelper {
val SchemaLabel = "streamedSchema"
val RowLabel = "streamedRow"
}

object StreamedSuccessfulSQLResultSerializer extends CustomSerializer[StreamedSuccessfulSQLResult](
formats => (
{
case JObject(JField(SchemaLabel, jSchema)::Nil) =>
implicit val _: Formats = formats
StreamedSchema(jSchema.extract[StructType])
},
{
case StreamedSchema(schema) => JObject(JField(SchemaLabel, Extraction.decompose(schema)(formats)))
case StreamedRow(row, Some(providedSchema)) =>
JObject(JField(RowLabel, Extraction.decompose(row)(formats + RowSerializer(providedSchema))))
}
)
)

class StreamedRowSerializer(schema: StructType) extends CustomSerializer[StreamedSuccessfulSQLResult](
formats => (
{
case JObject(JField(RowLabel, jRow)::Nil) =>
implicit val _: Formats = formats + new RowSerializer(schema)
StreamedRow(jRow.extract[Row])
},
PartialFunction.empty
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import com.stratio.crossdata.common.serializers.XDSerializationTest.TestCase
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._
import org.json4s.jackson.JsonMethods._
import org.json4s.Extraction
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

import scala.collection.mutable.WrappedArray

@RunWith(classOf[JUnitRunner])
class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSerializer {

val schema = StructType(List(
lazy val schema = StructType(List(
StructField("int",IntegerType,true),
StructField("bigint",LongType,true),
StructField("long",LongType,true),
Expand All @@ -47,19 +48,20 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
StructField("arraystring",ArrayType(StringType,true),true),
StructField("mapstringint",MapType(StringType,IntegerType,true),true),
StructField("mapstringstring",MapType(StringType,StringType,true),true),
StructField("maptimestampinteger",MapType(TimestampType,IntegerType,true),true),
StructField("struct",StructType(StructField("field1",IntegerType,true)::StructField("field2",IntegerType,true) ::Nil), true),
StructField("arraystruct",ArrayType(StructType(StructField("field1",IntegerType,true)::StructField("field2", IntegerType,true)::Nil),true),true),
StructField("structofstruct",StructType(StructField("field1",TimestampType,true)::StructField("field2", IntegerType, true)::StructField("struct1",StructType(StructField("structField1",StringType,true)::StructField("structField2",IntegerType,true)::Nil),true)::Nil),true)
))

val values: Array[Any] = Array(
lazy val values: Array[Any] = Array(
2147483647,
9223372036854775807L,
9223372036854775807L,
"string",
true,
3.3,
3.3F,
3.0,
3.0F,
Decimal(12),
Decimal(22),
Decimal(32.0),
Expand All @@ -68,16 +70,16 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0"),
12.toShort,
"abcde".getBytes,
new GenericArrayData(Array(4, 42)),
new GenericArrayData(Array("hello", "world")),
WrappedArray make Array(4, 42),
WrappedArray make Array("hello", "world"),
ArrayBasedMapData(Map("b" -> 2)),
ArrayBasedMapData(Map("a" -> "A", "b" -> "B")),
new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)),
new GenericArrayData(
Array(
ArrayBasedMapData(Map(java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 25, java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 12)),
new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType)
::StructField("field2", IntegerType)::Nil)),
WrappedArray make Array(
new GenericRowWithSchema(Array(1,2), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)),
new GenericRowWithSchema(Array(3,4), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil))
)
),
new GenericRowWithSchema(
Array(
Expand All @@ -98,16 +100,16 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
)
)

val rowWithNoSchema = Row.fromSeq(values)
val rowWithSchema = new GenericRowWithSchema(values, schema)
lazy val rowWithNoSchema = Row.fromSeq(values)
lazy val rowWithSchema = new GenericRowWithSchema(values, schema)


implicit val formats = json4sJacksonFormats + new RowSerializer(schema)

implicit val formats = json4sJacksonFormats


override def testCases: Seq[TestCase] = Seq(
TestCase("marshall & unmarshall a row with no schema", rowWithNoSchema),
TestCase("marshall & unmarshall a row with schema", rowWithSchema)
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ class XDContextIT extends SharedXDContextTest {

}



it should "succesfully parse a CROSS JOIN" in {

val crossJoin = "SELECT * FROM table1 CROSS JOIN table2"
Expand All @@ -194,7 +192,6 @@ class XDContextIT extends SharedXDContextTest {

}


// it must "execute jar app previously uploaded" in {
// val file = File(s"TestAddApp.jar")
// xdContext.addJar("TestAddApp.jar")
Expand Down
77 changes: 60 additions & 17 deletions driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,39 @@
*/
package com.stratio.crossdata.driver

import java.io.{FileInputStream, InputStream}
import java.security.{KeyStore, SecureRandom}
import java.security.SecureRandom
import java.util.UUID
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLException, TrustManagerFactory}

import akka.NotUsed
import akka.actor.ActorRef
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.http.scaladsl.{Http, HttpExt, HttpsConnectionContext}
import akka.http.scaladsl.marshalling.{Marshal, Marshaller}
import akka.http.scaladsl.model.{HttpMethod, HttpRequest, RequestEntity, ResponseEntity}
import akka.stream.{ActorMaterializer, StreamTcpException, TLSClientAuth}
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, TLSClientAuth}
import com.stratio.crossdata.common.result._
import com.stratio.crossdata.common.security.{KeyStoreUtils, Session}
import com.stratio.crossdata.driver.config.DriverConf
import com.stratio.crossdata.driver.session.{Authentication, SessionManager}
import org.slf4j.{Logger, LoggerFactory}
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.http.scaladsl.unmarshalling.{Unmarshaller, _}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import com.stratio.crossdata.common._
import com.stratio.crossdata.common.serializers.CrossdataCommonSerializer
import com.stratio.crossdata.common.serializers.{CrossdataCommonSerializer, StreamedRowSerializer}
import com.stratio.crossdata.driver.actor.HttpSessionBeaconActor
import com.stratio.crossdata.driver.exceptions.TLSInvalidAuthException
import org.apache.spark.sql.Row
import org.json4s.jackson

import scala.collection.generic.SeqFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Try}


class HttpDriver private[driver](driverConf: DriverConf,
Expand Down Expand Up @@ -145,22 +149,61 @@ class HttpDriver private[driver](driverConf: DriverConf,

val sqlCommand = new SQLCommand(query, retrieveColNames = driverConf.getFlattenTables)

val response = simpleRequest(
securitizeCommand(sqlCommand),
s"query/${sqlCommand.requestId}",
{
case SQLReply(_, result: SQLResult) =>
result
} : PartialFunction[SQLReply, SQLResult]
)
// Performs the request to server
val response = Marshal(securitizeCommand(sqlCommand)).to[RequestEntity] flatMap { requestEntity =>
val request = HttpRequest(POST, s"$protocol://$serverHttp/query/${sqlCommand.requestId}", entity = requestEntity)
http.singleRequest(request) flatMap { httpResponse =>

if(httpResponse.status == StatusCodes.OK) { // OK Responses will be served through streaming

val bytesSource = httpResponse.entity.dataBytes // This is the stream of bytes of the answer data...
val framesSource = bytesSource.filterNot(bs => bs.isEmpty || bs == ByteString("\n")) //...empty lines get removed...
val rawSchemaAndRawRowsSource = framesSource.prefixAndTail[ByteString](1) //remaining get transformed to ByteStrings.

// From the raw lines stream, a new stream providing the first one and a stream of the remaining ones is created
val sink = Sink.head[(Seq[ByteString], Source[ByteString, NotUsed])] //Its single elements get extracted by future...

for { /*.. which, once completed,
provides a ByteString with the serialized schema and the stream of remaining lines:
The bulk of serialized rows.*/
(Seq(rawSchema), rawRows) <- rawSchemaAndRawRowsSource.toMat(sink)(Keep.right).run
StreamedSchema(schema) <- Unmarshal(HttpEntity(ContentTypes.`application/json`, rawSchema)).to[StreamedSuccessfulSQLResult]

// Having de-serialized the schema, it can be used to deserialize each row at the un-marshalling phase
rrows <- {
implicit val json4sJacksonFormats = this.json4sJacksonFormats + new StreamedRowSerializer(schema)

val um: Unmarshaller[ResponseEntity, StreamedSuccessfulSQLResult] = json4sUnmarshaller

rawRows.mapAsync(1) { bs => /* TODO: Study the implications of increasing the level of parallelism in
* the unmarshalling phase. */
val entity = HttpEntity(ContentTypes.`application/json`, bs)
um(entity)
}
}.runFold(List.empty[Row]) { case (acc: List[Row], StreamedRow(row, None)) => row::acc }

} yield SuccessfulSQLResult(rrows.reverse toArray, schema) /* TODO: Performance could be increased if
`SuccessfulSQLResult`#resultSet were of type `Seq[Row]`*/

} else {

Unmarshal(httpResponse.entity).to[SQLReply] map {
case SQLReply(_, result: SQLResult) => result
}

}

}
}

new SQLResponse(sqlCommand.requestId, response) {
override def cancelCommand(): Future[QueryCancelledReply] = {
val command = CancelQueryExecution(sqlCommand.queryId)
simpleRequest(
securitizeCommand(command),
s"query/${command.requestId}",
{ case reply: QueryCancelledReply => reply }: PartialFunction[SQLReply, QueryCancelledReply]
s"query/${command.requestId}", {
case reply: QueryCancelledReply => reply
}: PartialFunction[SQLReply, QueryCancelledReply]
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ class DefaultSourceESSpec extends BaseXDTest with MockitoSugar {
val item: Table = mock[Table]
when(item.database).thenReturn(Some("index"))
when(item.tableName).thenReturn("type")
val userOpts: Map[String, String] = Map(ES_HOST -> "localhost")
val userOpts: Map[String, String] = Map(ES_NODES -> "localhost")

//Experimentation
val result:Map[String, String] = defaultDatasource.generateConnectorOpts(item, userOpts)

//Expectations
result should not be null
result.get(ES_RESOURCE).get should be ("index/type")
result.get(ES_HOST).get should be ("localhost")
result.get(ES_NODES).get should be ("localhost")

}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@
<derby.version>10.10.1.1</derby.version>
<jackson4s.version>3.2.10</jackson4s.version>
<mockito.version>1.10.19</mockito.version>
<common.utils.version>0.5.0</common.utils.version>
<common.utils.version>0.7.0</common.utils.version>
<guava.version>18.0</guava.version>
<curator.version>2.11.0</curator.version>
<curator.version>3.2.0</curator.version>
<!-- Scala version and cross build properties -->
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
Expand Down

0 comments on commit 944603c

Please sign in to comment.