Skip to content

Commit

Permalink
Merge 14271cc into 04084c7
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Sep 18, 2018
2 parents 04084c7 + 14271cc commit d5c40d7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 28 deletions.
Expand Up @@ -17,7 +17,6 @@

package org.apache.carbondata.examples

import java.io.File
import java.net.ServerSocket

import org.apache.carbondata.examples.util.ExampleUtils
Expand All @@ -26,13 +25,9 @@ import org.apache.carbondata.examples.util.ExampleUtils
object StreamSQLExample {
def main(args: Array[String]) {

// setup paths
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath

val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)

val requireCreateTable = true
val recordFormat = "json" // can be "json" or "csv"

if (requireCreateTable) {
// drop table if exists previously
Expand All @@ -45,7 +40,6 @@ object StreamSQLExample {
| CREATE TABLE sink(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
Expand All @@ -56,11 +50,10 @@ object StreamSQLExample {
}

spark.sql(
"""
s"""
| CREATE TABLE source (
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
Expand All @@ -69,7 +62,9 @@ object StreamSQLExample {
| 'streaming'='source',
| 'format'='socket',
| 'host'='localhost',
| 'port'='7071')
| 'port'='7071',
| 'record_format'='$recordFormat'
| )
""".stripMargin)

val serverSocket = new ServerSocket(7071)
Expand All @@ -86,7 +81,7 @@ object StreamSQLExample {

// start writing data into the socket
import StructuredStreamingExample.{showTableCount, writeSocket}
val thread1 = writeSocket(serverSocket)
val thread1 = writeSocket(serverSocket, recordFormat)
val thread2 = showTableCount(spark, "sink")

System.out.println("type enter to interrupt streaming")
Expand Down
Expand Up @@ -52,7 +52,6 @@ object StructuredStreamingExample {
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
Expand All @@ -66,7 +65,6 @@ object StructuredStreamingExample {
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT
| )
| STORED BY 'carbondata'
Expand Down Expand Up @@ -176,7 +174,7 @@ object StructuredStreamingExample {
thread
}

def writeSocket(serverSocket: ServerSocket): Thread = {
def writeSocket(serverSocket: ServerSocket, recordFormat: String = "csv"): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
Expand All @@ -187,9 +185,15 @@ object StructuredStreamingExample {
// write 5 records per iteration
for (_ <- 0 to 1000) {
index = index + 1
socketWriter.println(index.toString + ",name_" + index
+ ",city_" + index + "," + (index * 10000.00).toString +
",school_" + index + ":school_" + index + index + "$" + index)
recordFormat match {
case "csv" =>
socketWriter.println(index.toString + ",name_" + index
+ "," + (index * 10000.00).toString +
",school_" + index + ":school_" + index + index + "$" + index)
case "json" =>
socketWriter.println(
s"""{"id":$index,"name":"s","salary":4.3,"file":{"school":["a","b"],"age":6}}""")
}
}
socketWriter.flush()
Thread.sleep(1000)
Expand Down
Expand Up @@ -80,11 +80,11 @@ case class CarbonCreateStreamCommand(
val updatedQuery = if (format.equals("kafka")) {
shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable)
shouldHaveProperty(tblProperty, "subscribe", sourceTable)
createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty)
createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty.asScala)
} else if (format.equals("socket")) {
shouldHaveProperty(tblProperty, "host", sourceTable)
shouldHaveProperty(tblProperty, "port", sourceTable)
createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty)
createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty.asScala)
} else {
// Replace the logical relation with a streaming relation created
// from the stream source table
Expand Down Expand Up @@ -138,7 +138,7 @@ case class CarbonCreateStreamCommand(
inputQuery: DataFrame,
sourceTable: CarbonTable,
sourceName: String,
tblProperty: util.Map[String, String]): LogicalPlan = {
tblProperty: mutable.Map[String, String]): LogicalPlan = {
// We follow 3 steps to generate new plan
// 1. replace the logical relation in stream query with streaming relation
// 2. collect the new ExprId generated
Expand All @@ -151,32 +151,42 @@ case class CarbonCreateStreamCommand(
s"case when size(_values) > $i then _values[$i] else null end AS $columnName"
}

val delimiter = tblProperty.asScala.getOrElse("delimiter", ",")
val aliasMap = new util.HashMap[String, ExprId]()
val updatedQuery = inputQuery.logicalPlan transform {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
// for kafka stream source, get the 'value' column and split it by using UDF
val kafkaPlan = sparkSession.readStream
val plan = sparkSession.readStream
.format(sourceName)
.options(tblProperty)
.load()
.selectExpr("CAST(value as string) as _value")
.selectExpr(
s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
.selectExpr(exprList: _*)
.logicalPlan
val recordFormat = tblProperty.getOrElse("record_format", "csv")
val newPlan = recordFormat match {
case "csv" =>
val delimiter = tblProperty.getOrElse("delimiter", ",")
plan.selectExpr(
s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
.selectExpr(exprList: _*)
.logicalPlan
case "json" =>
import org.apache.spark.sql.functions._
plan
.select(from_json(col("_value"), Util.convertToSparkSchema(sourceTable)) as "_data")
.select("_data.*")
.logicalPlan
}

// collect the newly generated ExprId
kafkaPlan collect {
newPlan collect {
case p@Project(projectList, child) =>
projectList.map { expr =>
aliasMap.put(expr.name, expr.exprId)
}
p
}
kafkaPlan
newPlan
case plan: LogicalPlan => plan
}

Expand Down

0 comments on commit d5c40d7

Please sign in to comment.