Skip to content

Commit

Permalink
[CARBONDATA-2759]Add Bad_Records_Options to STMPROPERTIES for Streami…
Browse files Browse the repository at this point in the history
…ng Table
  • Loading branch information
Indhumathi27 committed Jul 20, 2018
1 parent 4a37e05 commit e231e40
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.collection.mutable
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser

Expand Down Expand Up @@ -53,6 +54,30 @@ class StreamingOption(val userInputMap: Map[String, String]) {
userInputMap.getOrElse(CarbonStreamParser.CARBON_STREAM_PARSER,
CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)

lazy val badRecordsPath: String =
userInputMap
.getOrElse("bad_records_path", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))

lazy val badRecordsAction: String =
userInputMap
.getOrElse("bad_records_action", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))

lazy val badRecordsLogger: String =
userInputMap
.getOrElse("bad_records_logger_enable", CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))

lazy val isEmptyBadRecord: String =
userInputMap
.getOrElse("is_empty_bad_record", CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))

lazy val remainingOption: Map[String, String] = {
// copy the user input map and remove the fix options
val mutableMap = mutable.Map[String, String]() ++= userInputMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ object StreamJobManager {
.option("carbon.stream.parser", options.rowParser)
.option("dbName", sinkTable.getDatabaseName)
.option("tableName", sinkTable.getTableName)
.option("bad_record_path", options.badRecordsPath)
.option("bad_records_action", options.badRecordsAction)
.option("bad_records_logger_enable", options.badRecordsLogger)
.option("is_empty_bad_record", options.isEmptyBadRecord)
.options(options.remainingOption)
.start()
latch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ case class CarbonDatasourceHadoopRelation(
var ifGetArrayItemExists = s
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,122 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS sink")
}

test("StreamSQL: create and drop a stream with Load options") {
sql("DROP TABLE IF EXISTS source")
sql("DROP TABLE IF EXISTS sink")

var rows = sql("SHOW STREAMS").collect()
assertResult(0)(rows.length)

val csvDataDir = integrationPath + "/spark2/target/streamSql"
// streaming ingest 10 rows
generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)

sql(
s"""
|CREATE TABLE source(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
|)
|STORED AS carbondata
|TBLPROPERTIES (
| 'streaming'='source',
| 'format'='csv',
| 'path'='$csvDataDir'
|)
""".stripMargin)

sql(
s"""
|CREATE TABLE sink(
| id INT,
| name STRING,
| city STRING,
| salary FLOAT,
| tax DECIMAL(8,2),
| percent double,
| birthday DATE,
| register TIMESTAMP,
| updated TIMESTAMP
| )
|STORED AS carbondata
|TBLPROPERTIES('streaming'='sink')
""".stripMargin)

sql(
s"""
|CREATE STREAM stream123 ON TABLE sink
|STMPROPERTIES(
| 'trigger'='ProcessingTime',
| 'interval'='1 seconds',
| 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
| 'BAD_RECORDS_ACTION' = 'FORCE',
| 'BAD_RECORDS_PATH'='$warehouse')
|AS
| SELECT *
| FROM source
| WHERE id % 2 = 1
""".stripMargin).show(false)
sql(
s"""
|CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink
|STMPROPERTIES(
| 'trigger'='ProcessingTime',
| 'interval'='1 seconds',
| 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
| 'BAD_RECORDS_ACTION' = 'FORCE',
| 'BAD_RECORDS_PATH'='$warehouse')
|AS
| SELECT *
| FROM source
| WHERE id % 2 = 1
""".stripMargin).show(false)
Thread.sleep(200)
sql("select * from sink").show

generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
Thread.sleep(5000)

// after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))

val row = sql("select * from sink order by id").head()
val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))
assertResult(exceptedRow)(row)

sql("SHOW STREAMS").show(false)

rows = sql("SHOW STREAMS").collect()
assertResult(1)(rows.length)
assertResult("stream123")(rows.head.getString(0))
assertResult("RUNNING")(rows.head.getString(2))
assertResult("streaming.source")(rows.head.getString(3))
assertResult("streaming.sink")(rows.head.getString(4))

rows = sql("SHOW STREAMS ON TABLE sink").collect()
assertResult(1)(rows.length)
assertResult("stream123")(rows.head.getString(0))
assertResult("RUNNING")(rows.head.getString(2))
assertResult("streaming.source")(rows.head.getString(3))
assertResult("streaming.sink")(rows.head.getString(4))

sql("DROP STREAM stream123")
sql("DROP STREAM IF EXISTS stream123")

rows = sql("SHOW STREAMS").collect()
assertResult(0)(rows.length)

sql("DROP TABLE IF EXISTS source")
sql("DROP TABLE IF EXISTS sink")
}

test("StreamSQL: create stream without interval ") {
sql("DROP TABLE IF EXISTS source")
sql("DROP TABLE IF EXISTS sink")
Expand Down

0 comments on commit e231e40

Please sign in to comment.