From 2b08f52c305d70ea940542f5e1b9ee5d423eea03 Mon Sep 17 00:00:00 2001 From: Unai Sarasola Date: Wed, 26 Oct 2016 12:09:42 +0200 Subject: [PATCH 1/7] Removed entries that are not used (#746) --- dist/src/main/resources/server/server-application.conf | 2 -- 1 file changed, 2 deletions(-) diff --git a/dist/src/main/resources/server/server-application.conf b/dist/src/main/resources/server/server-application.conf index 34e1c6345..fd2edfbea 100755 --- a/dist/src/main/resources/server/server-application.conf +++ b/dist/src/main/resources/server/server-application.conf @@ -128,8 +128,6 @@ crossdata-server.config.spark.metrics.conf="/etc/sds/crossdata/server/xdmetrics. # HTTP Server config #TLS for Akka-http (Https with client authentication) #crossdata-server.akka-http.ssl.enable = false -#crossdata-server.akka-http.ssl.host = "crossdata.com" -#crossdata-server.akka-http.ssl.port = 443 #crossdata-server.akka-http.ssl.truststore = "/home/crossdata/truststore.jks" #crossdata-server.akka-http.ssl.truststore-password = "stratio" #crossdata-server.akka-http.ssl.keystore = "/home/crossdata/keystore.jks" From 07dcbff641ff3bf7e33a14fc9126d189dad50c79 Mon Sep 17 00:00:00 2001 From: Unai Sarasola Date: Wed, 26 Oct 2016 13:30:20 +0200 Subject: [PATCH 2/7] [CROSSDATA-729] Tests for lack of support for subqueries in filter (#750) (#753) --- .../spark/sql/crossdata/XDContextIT.scala | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala b/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala index 353b2d297..e3debbdb9 100644 --- a/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala +++ b/core/src/test/scala/org/apache/spark/sql/crossdata/XDContextIT.scala @@ -112,6 +112,77 @@ class XDContextIT extends SharedXDContextTest { } + it must "plan a query with a filter and an alias for the table" in { + + val t1: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((1 to 5).map(i => Row(s"val_$i", i))), + StructType(Array(StructField("id", StringType), StructField("value", IntegerType)))) + t1.registerTempTable("t1") + + val t2: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((4 to 8).map(i => Row(s"val_$i", i, i*2))), + StructType(Array(StructField("ident", StringType), StructField("num", IntegerType), StructField("magic", IntegerType)))) + t2.registerTempTable("t2") + + val dataFrame = xdContext.sql("SELECT st.num FROM t2 st WHERE st.num <= 10") + + dataFrame.show + + dataFrame.collect should have length 5 + + } + + it must "plan a query with a in clause" in { + + val t1: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((1 to 5).map(i => Row(s"val_$i", i))), + StructType(Array(StructField("id", StringType), StructField("value", IntegerType)))) + t1.registerTempTable("t1") + + val t2: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((4 to 8).map(i => Row(s"val_$i", i, i*2))), + StructType(Array(StructField("ident", StringType), StructField("num", IntegerType), StructField("magic", IntegerType)))) + t2.registerTempTable("t2") + + val dataFrame = xdContext.sql("SELECT * FROM t1 WHERE (t1.value IN (1, 3, 5, 7, 9)) GROUP BY value, id") + + dataFrame.show + + dataFrame.collect should have length 3 + + } + + it must "plan a query with same alias in different scopes" in { + + val t1: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((1 to 5).map(i => Row(s"val_$i", i))), + StructType(Array(StructField("id", StringType), StructField("value", IntegerType)))) + t1.registerTempTable("t1") + + val t2: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((4 to 8).map(i => Row(s"val_$i", i, i/2))), + StructType(Array(StructField("ident", StringType), StructField("num", IntegerType), StructField("magic", IntegerType)))) + t2.registerTempTable("t2") + + val dataFrame = xdContext.sql("SELECT * FROM (SELECT * FROM t1 ft WHERE ft.value = 2) ft WHERE ft.id = 'val_2'") + + dataFrame.show + + dataFrame.collect should have length 1 + + } + + it must "fail when a subquery is used in a filter" in { + + val t1: DataFrame = xdContext.createDataFrame( + xdContext.sparkContext.parallelize((1 to 5).map(i => Row(s"val_$i", i))), + StructType(Array(StructField("id", StringType), StructField("value", IntegerType)))) + t1.registerTempTable("t1") + + an[Exception] should be thrownBy xdContext.sql("SELECT * FROM t1 WHERE t1.value = (SELECT first(value) FROM t1)") + + } + it should "succesfully parse a CROSS JOIN" in { val crossJoin = "SELECT * FROM table1 CROSS JOIN table2" @@ -121,6 +192,8 @@ class XDContextIT extends SharedXDContextTest { } + + // it must "execute jar app previously uploaded" in { // val file = File(s"TestAddApp.jar") // xdContext.addJar("TestAddApp.jar") From 246abfb3bf7d48f6b1cb6b90e8545edec186630c Mon Sep 17 00:00:00 2001 From: Unai Sarasola Date: Thu, 27 Oct 2016 17:48:41 +0200 Subject: [PATCH 3/7] Updated commons and curator (#759) * Updated commons and curator * Changelog * Updated the jenkins.yml with Zookeeper 3.5.1 * Updated jenkins with Zookeeper --- .jenkins.yml | 2 +- CHANGELOG.md | 1 + pom.xml | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.jenkins.yml b/.jenkins.yml index 16c9df734..a468fc3c0 100644 --- a/.jenkins.yml +++ b/.jenkins.yml @@ -6,7 +6,7 @@ CROSSBUILDAT: ITSERVICES: - ZOOKEEPER: - image: stratio/zookeeper:3.4.6 + image: jplock/zookeeper:3.5.1-alpha - MONGODB: image: stratio/mongo:3.0.4 sleep: 10 diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dbd5cc6a..707c91051 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Only listing significant user-visible, not internal code cleanups and minor bug ## 1.8.0 (upcoming) * Pending changelog +* Updated dependencies with curator and stratio-commons-utils compatible with Zookeeper 3.5.x ## 1.7.0 (upcoming) diff --git a/pom.xml b/pom.xml index b0c16ac5f..c86a199f1 100644 --- a/pom.xml +++ b/pom.xml @@ -123,9 +123,9 @@ 10.10.1.1 3.2.10 1.10.19 - 0.5.0 + 0.7.0 18.0 - 2.11.0 + 3.2.0 2.11 2.11.8 From c525898ce15420ba147015e7cbf8240ec5928529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Dom=C3=ADnguez=20Sanz?= Date: Mon, 31 Oct 2016 08:33:33 +0100 Subject: [PATCH 4/7] [CROSSDATA-686][testsAT]Added tests for count distinct and crossjoin (#761) * [CROSSDATA-686][testsAT]Added tests for count distinct and crossjoin --- .../stratio/tests/ATMicroCassandraXDTest.java | 4 +- .../MicroStrategy/MicroStrategy.feature | 44 ++++++++++++++++++- .../scripts/CassandraMicroScript.cql | 17 ++++++- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java b/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java index de1c2522a..a25866724 100644 --- a/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java +++ b/testsAT/src/test/java/com/stratio/tests/ATMicroCassandraXDTest.java @@ -53,11 +53,13 @@ public void setUp() { logger.info("Connecting to Cassandra Cluster"); cassandra.connect(); logger.info("Checking if the catalog exists"); - if (cassandra.existsKeyspace(catalog, false)) { + if (cassandra.existsKeyspace(catalog, false) || cassandra.existsKeyspace("databasetest", false) ) { logger.info("The catalog exists"); cassandra.executeQuery("DROP KEYSPACE \"KUYA\""); + cassandra.executeQuery("DROP KEYSPACE databasetest"); logger.info("The catalog has benn dropped"); } + cassandra.loadTestData(catalog, "/scripts/CassandraMicroScript.cql"); List tables = cassandra.getTables(catalog); String connector = "Cassandra-Micro"; diff --git a/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature b/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature index ca98d0a44..d7d9f8358 100644 --- a/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature +++ b/testsAT/src/test/resources/features/MicroStrategy/MicroStrategy.feature @@ -24,8 +24,50 @@ Feature: [CROSSDATA-677] Microstrategy - Tests Queries When I execute 'select a11.DES_PRODMDV DES_PRODMDV,a11.COD_SEMAMES COD_SEMAMES,a11.DES_DTMM DES_DTMM,a11.DES_CBCMM DES_CBCMM,a11.DES_ZOMM DES_ZOMM,a11.DES_OFIMM DES_OFIMM,a11.DES_NIVMM DES_NIVMM,a11.COD_DATIMP COD_BANCSB,max(a11.COD_OFIMM) COD_OFICI,max(a11.DES_OFIPA) DES_OFIPA,max(a11.COD_BANCSBM) COD_BANCSB0,a11.DES_UNIMMDV DES_UNIMMDV,a11.DES_EVENMDV DES_EVENMDV,a11.COD_CONTIGO COD_CONTIGO,max(a12.DES_CONTIGO) DES_CONTIGO,max(a12.COD_ORDCONTI) COD_ORDCONTI,sum(a11.IMP_BASINCRE) WJXBFS1,sum(a11.IMP_BASTRANS) WJXBFS2,sum(a11.IMP_REINCRE) WJXBFS3,sum(a11.IMP_RETRANSA) WJXBFS4,sum(a11.QNU_TOPRODUC) WJXBFS5 from KUYA.TKUYAKPR a11 join KUYA.TKUYAKGO a12 on (a11.COD_CONTIGO = a12.COD_CONTIGO) where (a11.COD_SEMAMES in ('8.20a26JUN') and ((a11.COD_BANCSBM = '0182' and (a11.DES_OFIPA like '%0013%' or a11.COD_OFIMM = '0013')) or (a11.COD_BANCSBM = '0182' and (a11.DES_OFIPA like '%6051%' or a11.COD_OFIMM = '6051')))) group by a11.DES_PRODMDV,a11.COD_SEMAMES,a11.DES_DTMM,a11.DES_CBCMM,a11.DES_ZOMM,a11.DES_OFIMM,a11.DES_NIVMM,a11.COD_DATIMP,a11.DES_UNIMMDV,a11.DES_EVENMDV,a11.COD_CONTIGO' Then an exception 'IS NOT' thrown + Scenario: [CROSSDATA-728] MICRO SELECT COUNT(DISTINCT * ) + When I execute 'SELECT count(distinct ident) FROM databasetest.tabletest' + Then an exception 'IS NOT' thrown + Then The result has to have '1' rows + |_c0-long| + | 10 | + Scenario: [CROSSDATA-728] MICRO SELECT COUNT(DISTINCT * ) with alias + When I execute 'SELECT count(distinct ident) as count_distinct FROM databasetest.tabletest' + Then an exception 'IS NOT' thrown + Then The result has to have '1' rows + |count_distinct-long| + | 10 | + Scenario: [CROSSDATA-728] MICRO SELECT COUNT(DISTINCT * ) with alias + When I execute 'SELECT count(distinct ident) as count_distinct FROM databasetest.tabletest WHERE ident > 4' + Then an exception 'IS NOT' thrown + Then The result has to have '1' rows + |count_distinct-long| + | 5 | - + Scenario: [CROSSDATA-717] MICRO SELECT + CROSS JOIN + When I execute 'SELECT tabletest.ident ,tab1.name FROM databasetest.tabletest CROSS JOIN databasetest.tab1' + Then an exception 'IS NOT' thrown + Then The result has to have '20' rows + |ident-integer| name-string| + | 5 |name_5 | + | 5 |name_5 | + | 1 |name_1 | + | 1 |name_1 | + | 8 |name_8 | + | 8 |name_8 | + | 0 |name_0 | + | 0 |name_0 | + | 2 |name_2 | + | 2 |name_2 | + | 4 |name_4 | + | 4 |name_4 | + | 7 |name_7 | + | 7 |name_7 | + | 6 |name_6 | + | 6 |name_6 | + | 9 |name_9 | + | 9 |name_9 | + | 3 |name_3 | + | 3 |name_3 | diff --git a/testsAT/src/test/resources/scripts/CassandraMicroScript.cql b/testsAT/src/test/resources/scripts/CassandraMicroScript.cql index 95478b9c5..e1535a651 100644 --- a/testsAT/src/test/resources/scripts/CassandraMicroScript.cql +++ b/testsAT/src/test/resources/scripts/CassandraMicroScript.cql @@ -13,4 +13,19 @@ insert into "KUYA"."TKUYAKCL"("COD_PERSOCL","COD_OFIPALCL","DES_SUBSEGLO","COD_B insert into "KUYA"."TKUYAKCL"("COD_PERSOCL","COD_OFIPALCL","DES_SUBSEGLO","COD_BANCSBM","IMP_TRANINI","IMP_TRANFIN","IMP_VENTCRUZ","IMP_OBJPROP","COD_SEMAMES","COD_IDFISCAK","DES_NOCLIE","DES_BANMM","DES_EVENMDV","DES_UNIMMDV","DES_NIVMM","DES_DTMM","DES_ZOMM","DES_CBCMM","DES_OFIMM","COD_DATIMP","COD_TPVCL","COD_TARJECL","COD_RECIBCL","COD_EMIRECIB","COD_COMEX","COD_INGRESCL","COD_NETCL","COD_MOVIL","COD_MAILCL","COD_SEGUROS","COD_CNIVELCL","DES_OFIPA","AUD_TIM","COD_CONTIGO") VALUES ('eee','0016','eee','6051',5,5,5,5,'8.20a26JUN','eee','eee','eee','eee','eee','eee','eee','eee','eee','eee','eee',5,5,5,5,5,5,5,5,5,5,5,'060510','13-09-16 15:00',5); CREATE TABLE "KUYA"."TKUYAKEV"("COD_SEMAMES" varchar PRIMARY KEY,"DES_UNIREF" varchar,"DES_EVENMDV" varchar,"IMP_RETRANSA" int,"IMP_REINCRE" int,"DES_BANMM" varchar,"DES_UNIMMDV"varchar,"DES_NIVMM" VARCHAR,"DES_DTMM" varchar,"DES_ZOMM" varchar,"DES_CBCMM" varchar,"DES_OFIMM" varchar,"COD_DATIMP"varchar,"IMP_BASTRANS" int,"IMP_BASINCRE" int,"IMP_BATRAINI" int,"IMP_CUMPLIMI" int,"QNU_BASEVMDV" int,"DES_OFIPA" varchar,"DES_SEGCLM" varchar,"COD_BANCSBM" varchar,"COD_OFIMM" varchar,"DES_EVENCORT" varchar,"AUD_TIM" TIMESTAMP,"COD_CONTIGO" int); CREATE TABLE "KUYA"."TKUYAKSE"("COD_SEMAMES" VARCHAR PRIMARY KEY, "DES_UNIREF" varchar, "DES_EVENMDV" varchar, "QNU_RENUMMDV" int, "IMP_RECUMPLI" int, "IMP_RETRANSA" int, "IMP_RETRAINI" int, "IMP_REINCRE" int, "QNU_BASEVMDV" int, "IMP_BASTRANS" int, "IMP_BATRAINI" int, "IMP_OBJPROP" int, "IMP_BASINCRE" int, "DES_BANMM" varchar, "IMP_CUMPLIMI" int, "DES_UNIMMDV" varchar, "DES_NIVMM" varchar, "DES_DTMM" varchar, "DES_ZOMM" varchar, "DES_CBCMM" varchar, "DES_OFIMM" varchar, "COD_DATIMP" varchar, "QNU_RAKEVMDV" int, "QNU_RAKNICON" int, "QNU_RAKTRANS" int, "QNU_INCRETO" int, "DES_OFIPA" varchar, "QNU_RANKMAX" int, "DES_SEGCLM" varchar, "COD_BANCSBM" varchar, "COD_OFIMM" varchar, "DES_EVENCORT" varchar, "AUD_TIM" TIMESTAMP, "COD_CONTIGO" int); -CREATE TABLE "KUYA"."TKUYAKPR" ("COD_SEMAMES" varchar primary key, "DES_UNIREF" varchar, "DES_EVENMDV" varchar, "IMP_RETRANSA" int, "IMP_REINCRE" int, "DES_BANMM" varchar, "DES_PRODMDV" varchar, "DES_UNIMMDV" varchar, "DES_NIVMM" VARCHAR, "DES_DTMM" varchar, "DES_ZOMM" varchar, "DES_CBCMM" varchar, "DES_OFIMM" varchar, "COD_DATIMP" varchar, "IMP_BASTRANS" int, "IMP_BASINCRE" int, "QNU_TOPRODUC" int, "DES_OFIPA" varchar, "COD_BANCSBM" varchar, "COD_OFIMM" varchar, "DES_EVENCORT" varchar, "AUD_TIM" TIMESTAMP, "COD_CONTIGO" int); \ No newline at end of file +CREATE TABLE "KUYA"."TKUYAKPR" ("COD_SEMAMES" varchar primary key, "DES_UNIREF" varchar, "DES_EVENMDV" varchar, "IMP_RETRANSA" int, "IMP_REINCRE" int, "DES_BANMM" varchar, "DES_PRODMDV" varchar, "DES_UNIMMDV" varchar, "DES_NIVMM" VARCHAR, "DES_DTMM" varchar, "DES_ZOMM" varchar, "DES_CBCMM" varchar, "DES_OFIMM" varchar, "COD_DATIMP" varchar, "IMP_BASTRANS" int, "IMP_BASINCRE" int, "QNU_TOPRODUC" int, "DES_OFIPA" varchar, "COD_BANCSBM" varchar, "COD_OFIMM" varchar, "DES_EVENCORT" varchar, "AUD_TIM" TIMESTAMP, "COD_CONTIGO" int); +CREATE KEYSPACE databasetest WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }; +CREATE TABLE databasetest.tabletest(ident INT PRIMARY KEY, name TEXT, money DOUBLE, new BOOLEAN, date Timestamp); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (0, 'name_0', 10.2 ,true,'1999-11-30'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (1, 'name_1', 11.2 ,true,'2001-01-01'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (2, 'name_2', 12.2 ,true,'2002-02-02'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (3, 'name_3', 13.2 ,true,'2003-03-03'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (4, 'name_4', 14.2 ,true,'2004-04-04'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (5, 'name_5', 15.2 ,true,'2005-05-05'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (6, 'name_6', 16.2 ,true,'2006-06-06'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (7, 'name_7', 17.2 ,true,'2007-07-07'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (8, 'name_8', 18.2 ,true,'2008-08-08'); +INSERT INTO databasetest.tabletest(ident,name,money,new,date) VALUES (9, 'name_9', 19.2 ,true,'2009-09-09'); +CREATE TABLE databasetest.tab1(name TEXT PRIMARY KEY); +INSERT INTO databasetest.tab1(name) VALUES ('name_0'); +INSERT INTO databasetest.tab1(name) VALUES ('name_1'); \ No newline at end of file From 5c3625c874361795817752621c057e4885a5a2cb Mon Sep 17 00:00:00 2001 From: Miguel Angel Fernandez Diaz Date: Mon, 31 Oct 2016 13:09:42 +0100 Subject: [PATCH 5/7] ES_HOST updated to ES_NODES (#755) ES_HOST is deprecated --- .../connector/elasticsearch/DefaultSourceESSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala index 70a20f19b..66e9614ba 100644 --- a/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala +++ b/elasticsearch/src/test/scala/com/stratio/crossdata/connector/elasticsearch/DefaultSourceESSpec.scala @@ -65,7 +65,7 @@ class DefaultSourceESSpec extends BaseXDTest with MockitoSugar { val item: Table = mock[Table] when(item.database).thenReturn(Some("index")) when(item.tableName).thenReturn("type") - val userOpts: Map[String, String] = Map(ES_HOST -> "localhost") + val userOpts: Map[String, String] = Map(ES_NODES -> "localhost") //Experimentation val result:Map[String, String] = defaultDatasource.generateConnectorOpts(item, userOpts) @@ -73,7 +73,7 @@ class DefaultSourceESSpec extends BaseXDTest with MockitoSugar { //Expectations result should not be null result.get(ES_RESOURCE).get should be ("index/type") - result.get(ES_HOST).get should be ("localhost") + result.get(ES_NODES).get should be ("localhost") } } From 274ff66d286b5783b0728852b081773346cfe01e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Francisco=20P=C3=A9rez=20Hidalgo?= Date: Mon, 31 Oct 2016 17:06:21 +0100 Subject: [PATCH 6/7] [CROSSDATA-666] HTTP stream response (#758) * Added streamed successful results and their reserializers * Streamed SQLResult: Non-tested * Completed server SQL response adaptation to streamed response. * HTTP Client - DRAFT - streamed response retreival * Completed succesful streamed result. * Fixed serialization test * Improved error management * Fix: No-SQL commands response should be delivered in a HTTP.OK response --- .../crossdata/common/result/results.scala | 13 ++-- .../serializers/CrossdataSerializer.scala | 2 +- ...treamedSuccessfulSQLResultSerializer.scala | 53 +++++++++++++ .../serializers/RowSerializerSpec.scala | 29 ++++--- .../stratio/crossdata/driver/HttpDriver.scala | 77 +++++++++++++++---- .../server/CrossdataHttpServer.scala | 31 +++++++- 6 files changed, 164 insertions(+), 41 deletions(-) create mode 100644 common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala diff --git a/common/src/main/scala/com/stratio/crossdata/common/result/results.scala b/common/src/main/scala/com/stratio/crossdata/common/result/results.scala index 3add3c406..df2d395b8 100644 --- a/common/src/main/scala/com/stratio/crossdata/common/result/results.scala +++ b/common/src/main/scala/com/stratio/crossdata/common/result/results.scala @@ -96,9 +96,12 @@ case class ErrorSQLResult(message: String, cause: Option[Throwable] = None) exte cause.map(throwable => new RuntimeException(message, throwable)).getOrElse(new RuntimeException(message)) } +object StreamedSuccessfulSQLResult { + implicit def schema2streamed(schema: StructType): StreamedSuccessfulSQLResult = StreamedSchema(schema) + implicit def row2streamed(row: Row)(implicit providedSchema: StructType): StreamedSuccessfulSQLResult = + StreamedRow(row, Some(providedSchema)) +} - - - - - +sealed trait StreamedSuccessfulSQLResult +case class StreamedSchema(schema: StructType) extends StreamedSuccessfulSQLResult +case class StreamedRow(row: Row, providedSchema: Option[StructType] = None) extends StreamedSuccessfulSQLResult \ No newline at end of file diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala index 548715d6f..7c6152e4e 100644 --- a/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala +++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/CrossdataSerializer.scala @@ -26,7 +26,7 @@ trait CrossdataCommonSerializer { implicit val json4sJacksonFormats: Formats = DefaultFormats + SQLResultSerializer + UUIDSerializer + - StructTypeSerializer + FiniteDurationSerializer + CommandSerializer + + StructTypeSerializer + FiniteDurationSerializer + CommandSerializer + StreamedSuccessfulSQLResultSerializer + AkkaMemberStatusSerializer + AkkaClusterMemberSerializer + new SortedSetSerializer[Member]() } diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala new file mode 100644 index 000000000..157daedbd --- /dev/null +++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/StreamedSuccessfulSQLResultSerializer.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.crossdata.common.serializers + +import com.stratio.crossdata.common.result.{StreamedRow, StreamedSchema, StreamedSuccessfulSQLResult} +import org.json4s.JsonAST.{JField, JObject} +import org.json4s.{CustomSerializer, Extraction, Formats} +import StreamedSuccessfulSQLResultSerializerHelper._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType +private[serializers] object StreamedSuccessfulSQLResultSerializerHelper { + val SchemaLabel = "streamedSchema" + val RowLabel = "streamedRow" +} + +object StreamedSuccessfulSQLResultSerializer extends CustomSerializer[StreamedSuccessfulSQLResult]( + formats => ( + { + case JObject(JField(SchemaLabel, jSchema)::Nil) => + implicit val _: Formats = formats + StreamedSchema(jSchema.extract[StructType]) + }, + { + case StreamedSchema(schema) => JObject(JField(SchemaLabel, Extraction.decompose(schema)(formats))) + case StreamedRow(row, Some(providedSchema)) => + JObject(JField(RowLabel, Extraction.decompose(row)(formats + RowSerializer(providedSchema)))) + } + ) +) + +class StreamedRowSerializer(schema: StructType) extends CustomSerializer[StreamedSuccessfulSQLResult]( + formats => ( + { + case JObject(JField(RowLabel, jRow)::Nil) => + implicit val _: Formats = formats + new RowSerializer(schema) + StreamedRow(jRow.extract[Row]) + }, + PartialFunction.empty + ) +) diff --git a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala index 4d3b6ede4..4f205096e 100644 --- a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala +++ b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala @@ -19,15 +19,15 @@ import com.stratio.crossdata.common.serializers.XDSerializationTest.TestCase import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ -import org.json4s.jackson.JsonMethods._ -import org.json4s.Extraction import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner +import scala.collection.mutable.WrappedArray + @RunWith(classOf[JUnitRunner]) class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSerializer { - val schema = StructType(List( + lazy val schema = StructType(List( StructField("int",IntegerType,true), StructField("bigint",LongType,true), StructField("long",LongType,true), @@ -52,14 +52,14 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer StructField("structofstruct",StructType(StructField("field1",TimestampType,true)::StructField("field2", IntegerType, true)::StructField("struct1",StructType(StructField("structField1",StringType,true)::StructField("structField2",IntegerType,true)::Nil),true)::Nil),true) )) - val values: Array[Any] = Array( + lazy val values: Array[Any] = Array( 2147483647, 9223372036854775807L, 9223372036854775807L, "string", true, - 3.3, - 3.3F, + 3.0, + 3.0F, Decimal(12), Decimal(22), Decimal(32.0), @@ -68,16 +68,15 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0"), 12.toShort, "abcde".getBytes, - new GenericArrayData(Array(4, 42)), - new GenericArrayData(Array("hello", "world")), + WrappedArray make Array(4, 42), + WrappedArray make Array("hello", "world"), ArrayBasedMapData(Map("b" -> 2)), ArrayBasedMapData(Map("a" -> "A", "b" -> "B")), - new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)), - new GenericArrayData( - Array( + new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType) + ::StructField("field2", IntegerType)::Nil)), + WrappedArray make Array( new GenericRowWithSchema(Array(1,2), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)), new GenericRowWithSchema(Array(3,4), StructType(StructField("field1", IntegerType)::StructField("field2", IntegerType)::Nil)) - ) ), new GenericRowWithSchema( Array( @@ -98,11 +97,11 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer ) ) - val rowWithNoSchema = Row.fromSeq(values) - val rowWithSchema = new GenericRowWithSchema(values, schema) + lazy val rowWithNoSchema = Row.fromSeq(values) + lazy val rowWithSchema = new GenericRowWithSchema(values, schema) - implicit val formats = json4sJacksonFormats + implicit val formats = json4sJacksonFormats + new RowSerializer(schema) override def testCases: Seq[TestCase] = Seq( diff --git a/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala b/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala index 0607372a7..81819218e 100644 --- a/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala +++ b/driver/src/main/scala/com/stratio/crossdata/driver/HttpDriver.scala @@ -15,35 +15,39 @@ */ package com.stratio.crossdata.driver -import java.io.{FileInputStream, InputStream} -import java.security.{KeyStore, SecureRandom} +import java.security.SecureRandom import java.util.UUID import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLException, TrustManagerFactory} +import akka.NotUsed import akka.actor.ActorRef import akka.cluster.ClusterEvent.CurrentClusterState import akka.http.scaladsl.{Http, HttpExt, HttpsConnectionContext} import akka.http.scaladsl.marshalling.{Marshal, Marshaller} -import akka.http.scaladsl.model.{HttpMethod, HttpRequest, RequestEntity, ResponseEntity} -import akka.stream.{ActorMaterializer, StreamTcpException, TLSClientAuth} +import akka.http.scaladsl.model._ +import akka.stream.{ActorMaterializer, TLSClientAuth} import com.stratio.crossdata.common.result._ import com.stratio.crossdata.common.security.{KeyStoreUtils, Session} import com.stratio.crossdata.driver.config.DriverConf import com.stratio.crossdata.driver.session.{Authentication, SessionManager} import org.slf4j.{Logger, LoggerFactory} import akka.http.scaladsl.model.HttpMethods._ -import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.http.scaladsl.unmarshalling.{Unmarshaller, _} +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.util.ByteString import com.stratio.crossdata.common._ -import com.stratio.crossdata.common.serializers.CrossdataCommonSerializer +import com.stratio.crossdata.common.serializers.{CrossdataCommonSerializer, StreamedRowSerializer} import com.stratio.crossdata.driver.actor.HttpSessionBeaconActor import com.stratio.crossdata.driver.exceptions.TLSInvalidAuthException +import org.apache.spark.sql.Row import org.json4s.jackson +import scala.collection.generic.SeqFactory import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Try} class HttpDriver private[driver](driverConf: DriverConf, @@ -145,22 +149,61 @@ class HttpDriver private[driver](driverConf: DriverConf, val sqlCommand = new SQLCommand(query, retrieveColNames = driverConf.getFlattenTables) - val response = simpleRequest( - securitizeCommand(sqlCommand), - s"query/${sqlCommand.requestId}", - { - case SQLReply(_, result: SQLResult) => - result - } : PartialFunction[SQLReply, SQLResult] - ) + // Performs the request to server + val response = Marshal(securitizeCommand(sqlCommand)).to[RequestEntity] flatMap { requestEntity => + val request = HttpRequest(POST, s"$protocol://$serverHttp/query/${sqlCommand.requestId}", entity = requestEntity) + http.singleRequest(request) flatMap { httpResponse => + + if(httpResponse.status == StatusCodes.OK) { // OK Responses will be served through streaming + + val bytesSource = httpResponse.entity.dataBytes // This is the stream of bytes of the answer data... + val framesSource = bytesSource.filterNot(bs => bs.isEmpty || bs == ByteString("\n")) //...empty lines get removed... + val rawSchemaAndRawRowsSource = framesSource.prefixAndTail[ByteString](1) //remaining get transformed to ByteStrings. + + // From the raw lines stream, a new stream providing the first one and a stream of the remaining ones is created + val sink = Sink.head[(Seq[ByteString], Source[ByteString, NotUsed])] //Its single elements get extracted by future... + + for { /*.. which, once completed, + provides a ByteString with the serialized schema and the stream of remaining lines: + The bulk of serialized rows.*/ + (Seq(rawSchema), rawRows) <- rawSchemaAndRawRowsSource.toMat(sink)(Keep.right).run + StreamedSchema(schema) <- Unmarshal(HttpEntity(ContentTypes.`application/json`, rawSchema)).to[StreamedSuccessfulSQLResult] + + // Having de-serialized the schema, it can be used to deserialize each row at the un-marshalling phase + rrows <- { + implicit val json4sJacksonFormats = this.json4sJacksonFormats + new StreamedRowSerializer(schema) + + val um: Unmarshaller[ResponseEntity, StreamedSuccessfulSQLResult] = json4sUnmarshaller + + rawRows.mapAsync(1) { bs => /* TODO: Study the implications of increasing the level of parallelism in + * the unmarshalling phase. */ + val entity = HttpEntity(ContentTypes.`application/json`, bs) + um(entity) + } + }.runFold(List.empty[Row]) { case (acc: List[Row], StreamedRow(row, None)) => row::acc } + + } yield SuccessfulSQLResult(rrows.reverse toArray, schema) /* TODO: Performance could be increased if + `SuccessfulSQLResult`#resultSet were of type `Seq[Row]`*/ + + } else { + + Unmarshal(httpResponse.entity).to[SQLReply] map { + case SQLReply(_, result: SQLResult) => result + } + + } + + } + } new SQLResponse(sqlCommand.requestId, response) { override def cancelCommand(): Future[QueryCancelledReply] = { val command = CancelQueryExecution(sqlCommand.queryId) simpleRequest( securitizeCommand(command), - s"query/${command.requestId}", - { case reply: QueryCancelledReply => reply }: PartialFunction[SQLReply, QueryCancelledReply] + s"query/${command.requestId}", { + case reply: QueryCancelledReply => reply + }: PartialFunction[SQLReply, QueryCancelledReply] ) } } diff --git a/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala b/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala index e7cfd3f3b..ef797c3db 100644 --- a/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala +++ b/server/src/main/scala/com/stratio/crossdata/server/CrossdataHttpServer.scala @@ -19,27 +19,33 @@ import java.io.File import java.util.UUID import java.util.concurrent.TimeUnit +import akka.NotUsed import akka.actor.{ActorRef, ActorSystem} import akka.pattern.ask import akka.cluster.pubsub.DistributedPubSub import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, SendToAll} +import akka.http.scaladsl.common.EntityStreamingSupport import akka.http.scaladsl.model._ import akka.http.scaladsl.model.Multipart.BodyPart import akka.http.scaladsl.server.Directive import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.unmarshalling._ import akka.stream.ActorMaterializer -import akka.stream.scaladsl.FileIO -import akka.util.Timeout +import akka.stream.scaladsl.{FileIO, Flow, Source} +import akka.util.{ByteString, Timeout} import com.stratio.crossdata.common.security.Session import com.stratio.crossdata.common.util.akka.keepalive.LiveMan.HeartBeat import com.stratio.crossdata.common._ +import com.stratio.crossdata.common.result.{ErrorSQLResult, StreamedSchema, StreamedSuccessfulSQLResult, SuccessfulSQLResult} import com.stratio.crossdata.server.actors.ResourceManagerActor import com.stratio.crossdata.server.config.ServerConfig import com.stratio.crossdata.util.HdfsUtils import com.typesafe.config.{Config, ConfigException} import org.apache.log4j.Logger +import org.apache.spark.sql.Row import org.apache.spark.sql.crossdata.XDContext import org.apache.spark.sql.crossdata.serializers.CrossdataSerializer +import org.apache.spark.sql.types.StructType import org.json4s.jackson import scala.concurrent.Future @@ -131,9 +137,28 @@ class CrossdataHttpServer(config: Config, serverActor: ActorRef, implicit val sy case Success(reply: ServerReply) => reply match { case qcr: QueryCancelledReply => complete(qcr) + case SQLReply(_, SuccessfulSQLResult(resultSet, schema)) => + + implicit val jsonStreamingSupport = EntityStreamingSupport.json() + .withFramingRenderer( + Flow[ByteString].intersperse(ByteString("\n")) + ) + + implicit val _: StructType = schema + + val responseStream: Source[StreamedSuccessfulSQLResult, NotUsed] = + Source.fromIterator(() => resultSet.toIterator).map( + row => row: StreamedSuccessfulSQLResult + ) prepend Source.single(schema) + + complete(responseStream) + case _ => complete(reply) + } - case other => complete(StatusCodes.ServerError, s"Internal XD server error: $other") + case other => + val httpErrorReply = SQLReply(rq.cmd.requestId, ErrorSQLResult(s"Internal XD server error: $other")) + complete(StatusCodes.ServerError -> httpErrorReply) } } From 9f2e3a1d23971d4533a4f961871a80e44df9c8a2 Mon Sep 17 00:00:00 2001 From: David Arroyo Cazorla Date: Mon, 31 Oct 2016 18:53:44 +0100 Subject: [PATCH 7/7] [CROSSDATA-808] Fix Map serialization (#760) * fix 'MapType' serialization: serialize any kind of key supported by spark appart from string * Merge Fix --- .../common/serializers/RowSerializer.scala | 21 ++++++++++--------- .../serializers/RowSerializerSpec.scala | 7 +++++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala index cb8bd7f55..2e8f6cfcc 100644 --- a/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala +++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala @@ -60,10 +60,10 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] { case (ArrayType(ty, _), JArray(arr)) => mutable.WrappedArray make arr.map(extractField(ty, _)).toArray /* Maps will be serialized as sub-objects so keys are constrained to be strings */ - case (MapType(StringType, vt, _), JObject(fields)) => - val (keys, values) = fields.unzip - val unserValues = values map (jval => extractField(vt, jval)) - ArrayBasedMapDataNotDeprecated(keys.toArray, unserValues.toArray) + case (MapType(kt, vt, _), JObject(JField("map", JObject(JField("keys", JArray(mapKeys)) :: JField("values", JArray(mapValues)) :: _) ) :: _)) => + val unserKeys = mapKeys map (jval => extractField(kt, jval)) + val unserValues = mapValues map (jval => extractField(vt, jval)) + ArrayBasedMapDataNotDeprecated(unserKeys.toArray, unserValues.toArray) case (st: StructType, JObject(JField("values",JArray(values))::_)) => deserializeWithSchema(st, values, true) } @@ -106,14 +106,15 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] { case v: ArrayDataNotDeprecated => JArray(v.array.toList.map(v => Extraction.decompose(v))) case v: mutable.WrappedArray[_] => JArray(v.toList.map(v => Extraction.decompose(v))) } - case (MapType(StringType, vt, _), v: MapDataNotDeprecated) => + case (MapType(kt, vt, _), v: MapDataNotDeprecated) => /* Maps will be serialized as sub-objects so keys are constrained to be strings */ - val serKeys = v.keyArray().array.map(v => v.toString) + val serKeys = v.keyArray().array.map(v => serializeField(kt -> v)) val serValues = v.valueArray.array.map(v => serializeField(vt -> v)) - JObject( - (v.keyArray.array zip serValues) map { - case (k: String, v) => JField(k, v) - } toList + JField("map", + JObject( + JField("keys", JArray(serKeys.toList)), + JField("values", JArray(serValues.toList)) + ) ) case (st: StructType, v: Row) => serializeWithSchema(st, v, true) } diff --git a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala index 4f205096e..a7666e5dd 100644 --- a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala +++ b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala @@ -19,6 +19,7 @@ import com.stratio.crossdata.common.serializers.XDSerializationTest.TestCase import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -47,6 +48,7 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer StructField("arraystring",ArrayType(StringType,true),true), StructField("mapstringint",MapType(StringType,IntegerType,true),true), StructField("mapstringstring",MapType(StringType,StringType,true),true), + StructField("maptimestampinteger",MapType(TimestampType,IntegerType,true),true), StructField("struct",StructType(StructField("field1",IntegerType,true)::StructField("field2",IntegerType,true) ::Nil), true), StructField("arraystruct",ArrayType(StructType(StructField("field1",IntegerType,true)::StructField("field2", IntegerType,true)::Nil),true),true), StructField("structofstruct",StructType(StructField("field1",TimestampType,true)::StructField("field2", IntegerType, true)::StructField("struct1",StructType(StructField("structField1",StringType,true)::StructField("structField2",IntegerType,true)::Nil),true)::Nil),true) @@ -72,6 +74,7 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer WrappedArray make Array("hello", "world"), ArrayBasedMapData(Map("b" -> 2)), ArrayBasedMapData(Map("a" -> "A", "b" -> "B")), + ArrayBasedMapData(Map(java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 25, java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 12)), new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType) ::StructField("field2", IntegerType)::Nil)), WrappedArray make Array( @@ -102,11 +105,11 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer implicit val formats = json4sJacksonFormats + new RowSerializer(schema) - + override def testCases: Seq[TestCase] = Seq( TestCase("marshall & unmarshall a row with no schema", rowWithNoSchema), TestCase("marshall & unmarshall a row with schema", rowWithSchema) ) -} +} \ No newline at end of file