Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2759]Add Bad_Records_Options to STMPROPERTIES for Streaming Table #2532

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.collection.mutable
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser

Expand Down Expand Up @@ -53,6 +54,30 @@ class StreamingOption(val userInputMap: Map[String, String]) {
userInputMap.getOrElse(CarbonStreamParser.CARBON_STREAM_PARSER,
CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)

lazy val badRecordsPath: String =
userInputMap
.getOrElse("bad_records_path", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))

lazy val badRecordsAction: String =
userInputMap
.getOrElse("bad_records_action", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))

lazy val badRecordsLogger: String =
userInputMap
.getOrElse("bad_records_logger_enable", CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))

lazy val isEmptyBadRecord: String =
userInputMap
.getOrElse("is_empty_bad_record", CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))

lazy val remainingOption: Map[String, String] = {
// copy the user input map and remove the fix options
val mutableMap = mutable.Map[String, String]() ++= userInputMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ object StreamJobManager {
.option("carbon.stream.parser", options.rowParser)
.option("dbName", sinkTable.getDatabaseName)
.option("tableName", sinkTable.getTableName)
.option("bad_record_path", options.badRecordsPath)
.option("bad_records_action", options.badRecordsAction)
.option("bad_records_logger_enable", options.badRecordsLogger)
.option("is_empty_bad_record", options.isEmptyBadRecord)
.options(options.remainingOption)
.start()
latch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ case class CarbonDatasourceHadoopRelation(
var ifGetArrayItemExists = s
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,122 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS sink")
}

test("StreamSQL: create and drop a stream with Load options") {
sql("DROP TABLE IF EXISTS source")
sql("DROP TABLE IF EXISTS sink")

var rows = sql("SHOW STREAMS").collect()
assertResult(0)(rows.length)

val csvDataDir = integrationPath + "/spark2/target/streamSql"
// streaming ingest 10 rows
generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)

sql(
s"""
|CREATE TABLE source(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
|)
|STORED AS carbondata
|TBLPROPERTIES (
| 'streaming'='source',
| 'format'='csv',
| 'path'='$csvDataDir'
|)
""".stripMargin)

sql(
s"""
|CREATE TABLE sink(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
| )
|STORED AS carbondata
|TBLPROPERTIES('streaming'='sink')
""".stripMargin)

sql(
s"""
|CREATE STREAM stream123 ON TABLE sink
|STMPROPERTIES(
| 'trigger'='ProcessingTime',
| 'interval'='1 seconds',
| 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
| 'BAD_RECORDS_ACTION' = 'FORCE',
| 'BAD_RECORDS_PATH'='$warehouse')
|AS
| SELECT *
| FROM source
| WHERE id % 2 = 1
""".stripMargin).show(false)
sql(
s"""
|CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink
|STMPROPERTIES(
| 'trigger'='ProcessingTime',
| 'interval'='1 seconds',
| 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
| 'BAD_RECORDS_ACTION' = 'FORCE',
| 'BAD_RECORDS_PATH'='$warehouse')
|AS
| SELECT *
| FROM source
| WHERE id % 2 = 1
""".stripMargin).show(false)
Thread.sleep(200)
sql("select * from sink").show

generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
Thread.sleep(5000)

// after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))

val row = sql("select * from sink order by id").head()
val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
assertResult(exceptedRow)(row)

sql("SHOW STREAMS").show(false)

rows = sql("SHOW STREAMS").collect()
assertResult(1)(rows.length)
assertResult("stream123")(rows.head.getString(0))
assertResult("RUNNING")(rows.head.getString(2))
assertResult("streaming.source")(rows.head.getString(3))
assertResult("streaming.sink")(rows.head.getString(4))

rows = sql("SHOW STREAMS ON TABLE sink").collect()
assertResult(1)(rows.length)
assertResult("stream123")(rows.head.getString(0))
assertResult("RUNNING")(rows.head.getString(2))
assertResult("streaming.source")(rows.head.getString(3))
assertResult("streaming.sink")(rows.head.getString(4))

sql("DROP STREAM stream123")
sql("DROP STREAM IF EXISTS stream123")

rows = sql("SHOW STREAMS").collect()
assertResult(0)(rows.length)

sql("DROP TABLE IF EXISTS source")
sql("DROP TABLE IF EXISTS sink")
}

test("StreamSQL: create stream without interval ") {
sql("DROP TABLE IF EXISTS source")
sql("DROP TABLE IF EXISTS sink")
Expand Down