Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2919] Support ingest from Kafka in StreamSQL #2695

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,18 @@ public abstract class AbstractCarbonLock implements ICarbonLock {

private int retryTimeout;

/**
* lockFilePath is the location of the lock file.
*/
protected String lockFilePath;

public abstract boolean lock();

@Override
public String getLockFilePath() {
return this.lockFilePath;
}

/**
* API for enabling the locking of file with retries.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ public class HdfsFileLock extends AbstractCarbonLock {

private static final LogService LOGGER =
LogServiceFactory.getLogService(HdfsFileLock.class.getName());
/**
* lockFilePath is the location of the lock file.
*/
private String lockFilePath;

/**
* lockFileDir is the directory of the lock file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public interface ICarbonLock {
*/
boolean releaseLockManually(String lockFile);

/**
* Return the path to the lock file
* @return lock file path
*/
String getLockFilePath();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@
* This will be handled using the file channel lock API.
*/
public class LocalFileLock extends AbstractCarbonLock {
/**
* lockFilePath is the location of the lock file.
*/
private String lockFilePath;

/**
* lockFileDir is the directory of the lock file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public class S3FileLock extends AbstractCarbonLock {

private static final LogService LOGGER =
LogServiceFactory.getLogService(S3FileLock.class.getName());
/**
* lockFilePath is the location of the lock file.
*/
private String lockFilePath;

/**
* lockFileDir is the directory of the lock file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,14 @@ private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) {
}
}

/**
* Return the format value defined in table properties
* @return String as per table properties, null if not defined
*/
public String getFormat() {
return getTableInfo().getFactTable().getTableProperties().get("format");
}

/**
* Method to get the list of cached columns of the table.
* This method need to be used for Describe formatted like scenario where columns need to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
Expand Down Expand Up @@ -162,6 +163,7 @@ private boolean validateKeyValue(String key, String value) throws InvalidConfigu
case CARBON_SEARCH_MODE_ENABLE:
case ENABLE_VECTOR_READER:
case ENABLE_UNSAFE_IN_QUERY_EXECUTION:
case ENABLE_AUTO_LOAD_MERGE:
isValid = CarbonUtil.validateBoolean(value);
if (!isValid) {
throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
Expand Down
6 changes: 6 additions & 0 deletions datamap/mv/plan/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Expand Down
4 changes: 4 additions & 0 deletions examples/spark2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.examples

import java.io.File
import java.net.ServerSocket

import org.apache.carbondata.examples.util.ExampleUtils

// scalastyle:off println
object StreamSQLExample {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to run this example? I am getting the following error

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) L_ORDERKEY#188 missing from l_receiptdate#286,l_commitdate#285,l_orderkey#274,l_tax#281,l_returnflag#282,l_linenumber#277,l_discount#280,l_partkey#275,l_shipdate#284,l_shipmode#288,l_linestatus#283,l_shipinstruct#287,l_extendedprice#279,l_comment#289,l_quantity#278,l_suppkey#276 in operator !Filter ((L_ORDERKEY#188 % 2) = 0);;
Project [l_orderkey#274, l_partkey#275, l_suppkey#276, l_linenumber#277, l_quantity#278, l_extendedprice#279, l_discount#280, l_tax#281, l_returnflag#282, l_linestatus#283, l_shipdate#284, l_commitdate#285, l_receiptdate#286, l_shipinstruct#287, l_shipmode#288, l_comment#289]
+- !Filter ((L_ORDERKEY#188 % 2) = 0)
   +- SubqueryAlias source
      +- Project [CASE WHEN (size(_values#271) > 0) THEN _values#271[0] ELSE cast(null as string) END AS l_orderkey#274, CASE WHEN (size(_values#271) > 1) THEN _values#271[1] ELSE cast(null as string) END AS l_partkey#275, CASE WHEN (size(_values#271) > 2) THEN _values#271[2] ELSE cast(null as string) END AS l_suppkey#276, CASE WHEN (size(_values#271) > 3) THEN _values#271[3] ELSE cast(null as string) END AS l_linenumber#277, CASE WHEN (size(_values#271) > 4) THEN _values#271[4] ELSE cast(null as string) END AS l_quantity#278, CASE WHEN (size(_values#271) > 5) THEN _values#271[5] ELSE cast(null as string) END AS l_extendedprice#279, CASE WHEN (size(_values#271) > 6) THEN _values#271[6] ELSE cast(null as string) END AS l_discount#280, CASE WHEN (size(_values#271) > 7) THEN _values#271[7] ELSE cast(null as string) END AS l_tax#281, CASE WHEN (size(_values#271) > 8) THEN _values#271[8] ELSE cast(null as string) END AS l_returnflag#282, CASE WHEN (size(_values#271) > 9) THEN _values#271[9] ELSE cast(null as string) END AS l_linestatus#283, CASE WHEN (size(_values#271) > 10) THEN _values#271[10] ELSE cast(null as string) END AS l_shipdate#284, CASE WHEN (size(_values#271) > 11) THEN _values#271[11] ELSE cast(null as string) END AS l_commitdate#285, CASE WHEN (size(_values#271) > 12) THEN _values#271[12] ELSE cast(null as string) END AS l_receiptdate#286, CASE WHEN (size(_values#271) > 13) THEN _values#271[13] ELSE cast(null as string) END AS l_shipinstruct#287, CASE WHEN (size(_values#271) > 14) THEN _values#271[14] ELSE cast(null as string) END AS l_shipmode#288, CASE WHEN (size(_values#271) > 15) THEN _values#271[15] ELSE cast(null as string) END AS l_comment#289]
         +- Project [split(_value#268, \|) AS _values#271]
            +- Project [cast(value#254 as string) AS _value#268]
               +- StreamingRelation DataSource(org.apache.spark.sql.CarbonSession@5e6ad84c,kafka,List(),None,List(),None,Map(format -> kafka, streaming -> source, subscribe -> test, local_dictionary_enable -> false, kafka.bootstrap.servers -> localhost:9092, sort_columns -> l_suppkey,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment, comment -> , bad_records_path -> , delimiter -> |),None), kafka, [key#253, value#254, topic#255, partition#256, offset#257L, timestamp#258, timestampType#259]

Copy link
Contributor Author

@jackylk jackylk Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I changed this Example to use socket stream source instead of kafka source

def main(args: Array[String]) {

// setup paths
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath

val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)

val requireCreateTable = true

if (requireCreateTable) {
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS sink")
spark.sql("DROP TABLE IF EXISTS source")

// Create target carbon table and populate with initial data
spark.sql(
s"""
| CREATE TABLE sink(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
| STORED AS carbondata
| TBLPROPERTIES(
| 'streaming'='true', 'sort_columns'='')
""".stripMargin)
}

spark.sql(
"""
| CREATE TABLE source (
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
| STORED AS carbondata
| TBLPROPERTIES(
| 'streaming'='source',
| 'format'='socket',
| 'host'='localhost',
| 'port'='7071')
""".stripMargin)

val serverSocket = new ServerSocket(7071)

// start ingest streaming job
spark.sql(
s"""
| CREATE STREAM ingest ON TABLE sink
| STMPROPERTIES(
| 'trigger' = 'ProcessingTime',
| 'interval' = '3 seconds')
| AS SELECT * FROM source
""".stripMargin)

// start writing data into the socket
import StructuredStreamingExample.{showTableCount, writeSocket}
val thread1 = writeSocket(serverSocket)
val thread2 = showTableCount(spark, "sink")

System.out.println("type enter to interrupt streaming")
System.in.read()
thread1.interrupt()
thread2.interrupt()
serverSocket.close()

// stop streaming job
spark.sql("DROP STREAM ingest").show

spark.stop()
System.out.println("streaming finished")
}

}

// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ object StructuredStreamingExample {
override def run(): Unit = {
for (_ <- 0 to 1000) {
spark.sql(s"select count(*) from $tableName").show(truncate = false)
spark.sql(s"show segments for table $tableName").show
Thread.sleep(1000 * 3)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.hadoop.CarbonInputSplit;

Expand Down Expand Up @@ -83,6 +84,16 @@ private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
}
}

public static StructType convertToSparkSchema(CarbonTable table) {
List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
ColumnSchema[] schema = new ColumnSchema[columns.size()];
int i = 0;
for (CarbonColumn column : columns) {
schema[i++] = column.getColumnSchema();
}
return convertToSparkSchema(table, schema);
}

public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) {
List<StructField> fields = new ArrayList<>(carbonColumns.length);
for (int i = 0; i < carbonColumns.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,23 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.segment.StreamSegment

object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

def showSegments(
limit: Option[String],
tableFolderPath: String,
tablePath: String,
showHistory: Boolean): Seq[Row] = {
val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
val loadMetadataDetailsArray = if (showHistory) {
SegmentStatusManager.readLoadMetadata(tableFolderPath) ++
SegmentStatusManager.readLoadHistoryMetadata(tableFolderPath)
SegmentStatusManager.readLoadMetadata(metaFolder) ++
SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
} else {
SegmentStatusManager.readLoadMetadata(tableFolderPath)
SegmentStatusManager.readLoadMetadata(metaFolder)
}

if (loadMetadataDetailsArray.nonEmpty) {
Expand Down Expand Up @@ -84,18 +86,31 @@ object CarbonStore {

val startTime =
if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
null
"NA"
} else {
new java.sql.Timestamp(load.getLoadStartTime)
new java.sql.Timestamp(load.getLoadStartTime).toString
}

val endTime =
if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
null
"NA"
} else {
new java.sql.Timestamp(load.getLoadEndTime)
new java.sql.Timestamp(load.getLoadEndTime).toString
}

val (dataSize, indexSize) = if (load.getFileFormat == FileFormat.ROW_V1) {
// for streaming segment, we should get the actual size from the index file
// since it is continuously inserting data
val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName)
val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
val indices = StreamSegment.readIndexFile(indexPath, FileFactory.getFileType(indexPath))
(indices.asScala.map(_.getFile_size).sum, FileFactory.getCarbonFile(indexPath).getSize)
} else {
// for batch segment, we can get the data size from table status file directly
(if (load.getDataSize == null) 0L else load.getDataSize.toLong,
if (load.getIndexSize == null) 0L else load.getIndexSize.toLong)
}

if (showHistory) {
Row(
load.getLoadName,
Expand All @@ -104,9 +119,9 @@ object CarbonStore {
endTime,
mergedTo,
load.getFileFormat.toString,
load.getVisibility(),
Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat),
Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat))
load.getVisibility,
Strings.formatSize(dataSize.toFloat),
Strings.formatSize(indexSize.toFloat))
} else {
Row(
load.getLoadName,
Expand All @@ -115,8 +130,8 @@ object CarbonStore {
endTime,
mergedTo,
load.getFileFormat.toString,
Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat),
Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat))
Strings.formatSize(dataSize.toFloat),
Strings.formatSize(indexSize.toFloat))
}
}.toSeq
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ object StreamHandoffRDD {
} else {
newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
newSegment.get.setLoadEndTime(System.currentTimeMillis())
CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newSegment.get, loadModel.getSegmentId,
loadModel.getCarbonDataLoadSchema.getCarbonTable)
}

// update streaming segment to compacted status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.StreamHandoffRDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.types._

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
import org.apache.carbondata.core.metadata.datatype.{ArrayType => CarbonArrayType, DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, MapType => CarbonMapType, StructField => CarbonStructField, StructType => CarbonStructType}
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
Expand All @@ -39,6 +39,20 @@ object CarbonSparkDataSourceUtil {
DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
dataType.asInstanceOf[CarbonDecimalType].getScale)
} else {
if (CarbonDataTypes.isStructType(dataType)) {
val struct = dataType.asInstanceOf[CarbonStructType]
return StructType(struct.getFields.asScala.map(x =>
StructField(x.getFieldName, convertCarbonToSparkDataType(x.getDataType)))
)
} else if (CarbonDataTypes.isArrayType(dataType)) {
val array = dataType.asInstanceOf[CarbonArrayType]
return ArrayType(convertCarbonToSparkDataType(array.getElementType))
} else if (CarbonDataTypes.isMapType(dataType)) {
val map = dataType.asInstanceOf[CarbonMapType]
return MapType(
convertCarbonToSparkDataType(map.getKeyType),
convertCarbonToSparkDataType(map.getValueType))
}
dataType match {
case CarbonDataTypes.STRING => StringType
case CarbonDataTypes.SHORT => ShortType
Expand Down