diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index c302b2bec1c..f16d1bb5b8d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -1123,4 +1123,13 @@ private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) { table.setLocalDictionaryEnabled(Boolean.parseBoolean("false")); } } + + /** + * Return the format value defined in table properties + * @return String as per table properties, null if not defined + */ + public String getFormat() { + return getTableInfo().getFactTable().getTableProperties() + .get("format"); + } } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index a6413290324..6cb0dcfb68f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -738,9 +738,6 @@ class TableNewProcessor(cm: TableModel) { tableInfo.setFactTable(tableSchema) val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT) if (format.isDefined) { - if (!format.get.equalsIgnoreCase("csv")) { - CarbonException.analysisException(s"Currently we only support csv as external file format") - } tableInfo.setFormat(format.get) val formatProperties = cm.tableProperties.filter(pair => pair._1.startsWith(s"${format.get.toLowerCase}.")).asJava diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala index 59e924da786..470d89a260d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala @@ -52,19 +52,23 @@ object StreamJobManager { } } - private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = { + private def validateSinkTable(validateQuerySchema: Boolean, + querySchema: StructType, sink: CarbonTable): Unit = { if (!sink.isStreamingSink) { throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " + "streaming sink table " + "('streaming' tblproperty is not 'sink' or 'true')") } - val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column => - StructField(column.getColName, - CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType)) - } - if (!querySchema.equals(StructType(fields))) { - throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " + - s"does not match query output") + // TODO: validate query schema against sink in kafka (we cannot get schema directly) + if (validateQuerySchema) { + val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column => + StructField(column.getColName, + CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType)) + } + if (!querySchema.equals(StructType(fields))) { + throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " + + s"does not match query output") + } } } @@ -102,14 +106,22 @@ object StreamJobManager { } validateSourceTable(sourceTable) - validateSinkTable(streamDf.schema, sinkTable) + + // kafka source always have fixed schema, need to get actual schema + val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka") + val dataFrame = if (isKafka) { + streamDf.selectExpr("CAST(value as STRING)") + } else { + streamDf + } + validateSinkTable(!isKafka, dataFrame.schema, sinkTable) // start a new thread to run the streaming ingest job, the job will be running // until user stops it by STOP STREAM JOB val thread = new Thread(new Runnable { override def run(): Unit = { try { - job = streamDf.writeStream + job = dataFrame.writeStream .format("carbondata") .trigger(options.trigger) .option("checkpointLocation", options.checkpointLocation(sinkTable.getTablePath)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala index d3b178cdd22..c413a62ae0c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.DataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types.{StringType, StructType} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -53,20 +52,21 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null + var dataFrame: Option[DataFrame] = None - // find the streaming source table in the query - // and replace it with StreamingRelation - val streamLp = df.logicalPlan transform { + // Prepare the dataframe from the stream source table + df.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => - val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) + val (source, resolvedFrame) = prepareDataFrame(sparkSession, r) if (sourceTable != null && sourceTable.getTableName != source.getTableName) { throw new MalformedCarbonCommandException( "Stream query on more than one stream source table is not supported") } sourceTable = source - streamingRelation + dataFrame = Option(resolvedFrame) + r case plan: LogicalPlan => plan } @@ -82,24 +82,37 @@ case class CarbonCreateStreamCommand( sourceTable = sourceTable, sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession), query = query, - streamDf = Dataset.ofRows(sparkSession, streamLp), + streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, df.logicalPlan)), options = new StreamingOption(optionMap) ) Seq(Row(streamName, jobId, "RUNNING")) } - private def prepareStreamingRelation( + /** + * Create a dataframe from source table of logicalRelation + * @param sparkSession + * @param logicalRelation + * @return sourceTable and its stream dataFrame + */ + private def prepareDataFrame( sparkSession: SparkSession, - r: LogicalRelation): (CarbonTable, StreamingRelation) = { - val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = { + val sourceTable = logicalRelation.relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties val format = tblProperty.get("format") if (format == null) { throw new MalformedCarbonCommandException("Streaming from carbon file is not supported") } - val streamReader = sparkSession.readStream - .schema(getSparkSchema(sourceTable)) - .format(format) + val streamReader = if (format != "kafka") { + sparkSession.readStream + .schema(getSparkSchema(sourceTable)) + .format(format) + } else { + // kafka source fixed schema, it cannot be set to a custom schema + sparkSession.readStream + .format(format) + } val dataFrame = format match { case "csv" | "text" | "json" | "parquet" => if (!tblProperty.containsKey("path")) { @@ -108,16 +121,11 @@ case class CarbonCreateStreamCommand( } streamReader.load(tblProperty.get("path")) case "kafka" | "socket" => - streamReader.load() + streamReader.options(tblProperty).load() case other => throw new MalformedCarbonCommandException(s"Streaming from $format is not supported") } - val streamRelation = dataFrame.logicalPlan.asInstanceOf[StreamingRelation] - - // Since SparkSQL analyzer will match the UUID in attribute, - // create a new StreamRelation and re-use the same attribute from LogicalRelation - (sourceTable, - StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output)) + (sourceTable, dataFrame) } private def getSparkSchema(sourceTable: CarbonTable): StructType = {