Skip to content

Commit

Permalink
[CARBONDATA-2736][CARBONSTORE] Kafka integration with Carbon StreamSQL
Browse files Browse the repository at this point in the history
Modification in this PR:
1.Pass source table properties to streamReader.load()
2.Do not pass schema when sparkSession.readStream
3.Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema )
4.Extract the dataframe from kafka source which contain actual data schema @ writeStream

This closes #2495
  • Loading branch information
ajithme authored and jackylk committed Jul 18, 2018
1 parent 96fe233 commit 4b96ed8
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 33 deletions.
Expand Up @@ -1123,4 +1123,13 @@ private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) {
table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
}
}

/**
* Return the format value defined in table properties
* @return String as per table properties, null if not defined
*/
public String getFormat() {
return getTableInfo().getFactTable().getTableProperties()
.get("format");
}
}
Expand Up @@ -738,9 +738,6 @@ class TableNewProcessor(cm: TableModel) {
tableInfo.setFactTable(tableSchema)
val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT)
if (format.isDefined) {
if (!format.get.equalsIgnoreCase("csv")) {
CarbonException.analysisException(s"Currently we only support csv as external file format")
}
tableInfo.setFormat(format.get)
val formatProperties = cm.tableProperties.filter(pair =>
pair._1.startsWith(s"${format.get.toLowerCase}.")).asJava
Expand Down
Expand Up @@ -52,19 +52,23 @@ object StreamJobManager {
}
}

private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = {
private def validateSinkTable(validateQuerySchema: Boolean,
querySchema: StructType, sink: CarbonTable): Unit = {
if (!sink.isStreamingSink) {
throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " +
"streaming sink table " +
"('streaming' tblproperty is not 'sink' or 'true')")
}
val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
StructField(column.getColName,
CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
}
if (!querySchema.equals(StructType(fields))) {
throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
s"does not match query output")
// TODO: validate query schema against sink in kafka (we cannot get schema directly)
if (validateQuerySchema) {
val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
StructField(column.getColName,
CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
}
if (!querySchema.equals(StructType(fields))) {
throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
s"does not match query output")
}
}
}

Expand Down Expand Up @@ -102,14 +106,22 @@ object StreamJobManager {
}

validateSourceTable(sourceTable)
validateSinkTable(streamDf.schema, sinkTable)

// kafka source always have fixed schema, need to get actual schema
val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka")
val dataFrame = if (isKafka) {
streamDf.selectExpr("CAST(value as STRING)")
} else {
streamDf
}
validateSinkTable(!isKafka, dataFrame.schema, sinkTable)

// start a new thread to run the streaming ingest job, the job will be running
// until user stops it by STOP STREAM JOB
val thread = new Thread(new Runnable {
override def run(): Unit = {
try {
job = streamDf.writeStream
job = dataFrame.writeStream
.format("carbondata")
.trigger(options.trigger)
.option("checkpointLocation", options.checkpointLocation(sinkTable.getTablePath))
Expand Down
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.DataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.{StringType, StructType}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
Expand Down Expand Up @@ -53,20 +52,21 @@ case class CarbonCreateStreamCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
val df = sparkSession.sql(query)
var sourceTable: CarbonTable = null
var dataFrame: Option[DataFrame] = None

// find the streaming source table in the query
// and replace it with StreamingRelation
val streamLp = df.logicalPlan transform {
// Prepare the dataframe from the stream source table
df.logicalPlan transform {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
if (sourceTable != null && sourceTable.getTableName != source.getTableName) {
throw new MalformedCarbonCommandException(
"Stream query on more than one stream source table is not supported")
}
sourceTable = source
streamingRelation
dataFrame = Option(resolvedFrame)
r
case plan: LogicalPlan => plan
}

Expand All @@ -82,24 +82,37 @@ case class CarbonCreateStreamCommand(
sourceTable = sourceTable,
sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
query = query,
streamDf = Dataset.ofRows(sparkSession, streamLp),
streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, df.logicalPlan)),
options = new StreamingOption(optionMap)
)
Seq(Row(streamName, jobId, "RUNNING"))
}

private def prepareStreamingRelation(
/**
* Create a dataframe from source table of logicalRelation
* @param sparkSession
* @param logicalRelation
* @return sourceTable and its stream dataFrame
*/
private def prepareDataFrame(
sparkSession: SparkSession,
r: LogicalRelation): (CarbonTable, StreamingRelation) = {
val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = {
val sourceTable = logicalRelation.relation
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
val format = tblProperty.get("format")
if (format == null) {
throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
}
val streamReader = sparkSession.readStream
.schema(getSparkSchema(sourceTable))
.format(format)
val streamReader = if (format != "kafka") {
sparkSession.readStream
.schema(getSparkSchema(sourceTable))
.format(format)
} else {
// kafka source fixed schema, it cannot be set to a custom schema
sparkSession.readStream
.format(format)
}
val dataFrame = format match {
case "csv" | "text" | "json" | "parquet" =>
if (!tblProperty.containsKey("path")) {
Expand All @@ -108,16 +121,11 @@ case class CarbonCreateStreamCommand(
}
streamReader.load(tblProperty.get("path"))
case "kafka" | "socket" =>
streamReader.load()
streamReader.options(tblProperty).load()
case other =>
throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")
}
val streamRelation = dataFrame.logicalPlan.asInstanceOf[StreamingRelation]

// Since SparkSQL analyzer will match the UUID in attribute,
// create a new StreamRelation and re-use the same attribute from LogicalRelation
(sourceTable,
StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output))
(sourceTable, dataFrame)
}

private def getSparkSchema(sourceTable: CarbonTable): StructType = {
Expand Down

0 comments on commit 4b96ed8

Please sign in to comment.