diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala index 58f51bd4235..857a7ae7627 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala @@ -17,7 +17,6 @@ package org.apache.carbondata.examples -import java.io.File import java.net.ServerSocket import org.apache.carbondata.examples.util.ExampleUtils @@ -26,13 +25,9 @@ import org.apache.carbondata.examples.util.ExampleUtils object StreamSQLExample { def main(args: Array[String]) { - // setup paths - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4) - val requireCreateTable = true + val recordFormat = "json" // can be "json" or "csv" if (requireCreateTable) { // drop table if exists previously @@ -45,7 +40,6 @@ object StreamSQLExample { | CREATE TABLE sink( | id INT, | name STRING, - | city STRING, | salary FLOAT, | file struct, age:int> | ) @@ -56,11 +50,10 @@ object StreamSQLExample { } spark.sql( - """ + s""" | CREATE TABLE source ( | id INT, | name STRING, - | city STRING, | salary FLOAT, | file struct, age:int> | ) @@ -69,7 +62,9 @@ object StreamSQLExample { | 'streaming'='source', | 'format'='socket', | 'host'='localhost', - | 'port'='7071') + | 'port'='7071', + | 'record_format'='$recordFormat' + | ) """.stripMargin) val serverSocket = new ServerSocket(7071) @@ -86,7 +81,7 @@ object StreamSQLExample { // start writing data into the socket import StructuredStreamingExample.{showTableCount, writeSocket} - val thread1 = writeSocket(serverSocket) + val thread1 = writeSocket(serverSocket, recordFormat) val thread2 = showTableCount(spark, "sink") System.out.println("type enter to interrupt streaming") diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala index 31de668e774..4e099af8aeb 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala @@ -52,7 +52,6 @@ object StructuredStreamingExample { | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, - | city STRING, | salary FLOAT, | file struct, age:int> | ) @@ -66,7 +65,6 @@ object StructuredStreamingExample { | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, - | city STRING, | salary FLOAT | ) | STORED BY 'carbondata' @@ -176,7 +174,7 @@ object StructuredStreamingExample { thread } - def writeSocket(serverSocket: ServerSocket): Thread = { + def writeSocket(serverSocket: ServerSocket, recordFormat: String = "csv"): Thread = { val thread = new Thread() { override def run(): Unit = { // wait for client to connection request and accept @@ -187,9 +185,15 @@ object StructuredStreamingExample { // write 5 records per iteration for (_ <- 0 to 1000) { index = index + 1 - socketWriter.println(index.toString + ",name_" + index - + ",city_" + index + "," + (index * 10000.00).toString + - ",school_" + index + ":school_" + index + index + "$" + index) + recordFormat match { + case "csv" => + socketWriter.println(index.toString + ",name_" + index + + "," + (index * 10000.00).toString + + ",school_" + index + ":school_" + index + index + "$" + index) + case "json" => + socketWriter.println( + s"""{"id":$index,"name":"s","salary":4.3,"file":{"school":["a","b"],"age":6}}""") + } } socketWriter.flush() Thread.sleep(1000) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala index 94e063b03aa..1f8bde2ba1a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala @@ -80,11 +80,11 @@ case class CarbonCreateStreamCommand( val updatedQuery = if (format.equals("kafka")) { shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable) shouldHaveProperty(tblProperty, "subscribe", sourceTable) - createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty) + createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty.asScala) } else if (format.equals("socket")) { shouldHaveProperty(tblProperty, "host", sourceTable) shouldHaveProperty(tblProperty, "port", sourceTable) - createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty) + createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty.asScala) } else { // Replace the logical relation with a streaming relation created // from the stream source table @@ -138,7 +138,7 @@ case class CarbonCreateStreamCommand( inputQuery: DataFrame, sourceTable: CarbonTable, sourceName: String, - tblProperty: util.Map[String, String]): LogicalPlan = { + tblProperty: mutable.Map[String, String]): LogicalPlan = { // We follow 3 steps to generate new plan // 1. replace the logical relation in stream query with streaming relation // 2. collect the new ExprId generated @@ -151,32 +151,42 @@ case class CarbonCreateStreamCommand( s"case when size(_values) > $i then _values[$i] else null end AS $columnName" } - val delimiter = tblProperty.asScala.getOrElse("delimiter", ",") val aliasMap = new util.HashMap[String, ExprId]() val updatedQuery = inputQuery.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => // for kafka stream source, get the 'value' column and split it by using UDF - val kafkaPlan = sparkSession.readStream + val plan = sparkSession.readStream .format(sourceName) .options(tblProperty) .load() .selectExpr("CAST(value as string) as _value") - .selectExpr( - s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values") - .selectExpr(exprList: _*) - .logicalPlan + val recordFormat = tblProperty.getOrElse("record_format", "csv") + val newPlan = recordFormat match { + case "csv" => + val delimiter = tblProperty.getOrElse("delimiter", ",") + plan.selectExpr( + s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values") + .selectExpr(exprList: _*) + .logicalPlan + case "json" => + import org.apache.spark.sql.functions._ + plan + .select(from_json(col("_value"), Util.convertToSparkSchema(sourceTable)) as "_data") + .select("_data.*") + .logicalPlan + } // collect the newly generated ExprId - kafkaPlan collect { + newPlan collect { case p@Project(projectList, child) => projectList.map { expr => aliasMap.put(expr.name, expr.exprId) } p } - kafkaPlan + newPlan case plan: LogicalPlan => plan }