Skip to content

Commit

Permalink
fix auto handoff for preaggregate table with streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 authored and KanakaKumar committed Apr 11, 2018
1 parent 8a5369d commit cf18bd7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
Expand Up @@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
Expand Down Expand Up @@ -110,6 +110,8 @@ object StreamSinkFactory {
val segmentId = getStreamSegmentId(carbonTable)
carbonLoadModel.setSegmentId(segmentId)

val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
// start server if necessary
val server = startDictionaryServer(
sparkSession,
Expand All @@ -120,14 +122,16 @@ object StreamSinkFactory {
} else {
carbonLoadModel.setUseOnePass(false)
}

// default is carbon appended stream sink
val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
sparkSession,
carbonTable,
segmentId,
parameters,
carbonLoadModel,
server)
server,
operationContext)

// fire post event before streamin is started
val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
Expand Down
Expand Up @@ -62,7 +62,8 @@ class CarbonAppendableStreamSink(
var currentSegmentId: String,
parameters: Map[String, String],
carbonLoadModel: CarbonLoadModel,
server: Option[DictionaryServer]) extends Sink {
server: Option[DictionaryServer],
operationContext: OperationContext) extends Sink {

private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
Expand Down Expand Up @@ -110,18 +111,16 @@ class CarbonAppendableStreamSink(

// fire pre event on every batch add
// in case of streaming options and optionsFinal can be same
val operationContext = new OperationContext
val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
carbonTable.getCarbonTableIdentifier,
carbonLoadModel,
carbonLoadModel.getFactFilePath,
false,
parameters.asJava,
parameters.asJava,
false
)
false)
OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
checkOrHandOffSegment()
checkOrHandOffSegment(operationContext)

// committer will record how this spark job commit its output
val committer = FileCommitProtocol.instantiate(
Expand Down Expand Up @@ -163,7 +162,7 @@ class CarbonAppendableStreamSink(
/**
* if the directory size of current segment beyond the threshold, hand off new segment
*/
private def checkOrHandOffSegment(): Unit = {
private def checkOrHandOffSegment(operationContext: OperationContext): Unit = {
// get streaming segment, if not exists, create new streaming segment
val segmentId = StreamSegment.open(carbonTable)
if (segmentId.equals(currentSegmentId)) {
Expand All @@ -180,7 +179,7 @@ class CarbonAppendableStreamSink(
if (enableAutoHandoff) {
StreamHandoffRDD.startStreamingHandoffThread(
carbonLoadModel,
new OperationContext,
operationContext,
sparkSession,
false)
}
Expand Down
Expand Up @@ -179,7 +179,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
}

override def afterAll {
override def afterAll {
dropTable()
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
Expand Down Expand Up @@ -1454,6 +1454,35 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
}

test("test autohandoff with preaggregate tables") {
sql("drop table if exists maintable")
createTable(tableName = "maintable", streaming = true, withBatchLoad = false)
sql("create datamap p1 on table maintable using 'preaggregate' as select name, sum(id) from maintable group by name")
executeStreamingIngest(
tableName = "maintable",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = false)
executeStreamingIngest(
tableName = "maintable",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = true)
checkAnswer(sql("select count(*) from maintable_p1"), Seq(Row(200)))
}

test("block drop streaming table while streaming is in progress") {
val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
Expand Down

0 comments on commit cf18bd7

Please sign in to comment.