Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2335] Fix auto handoff for preaggregate table with streaming #2160

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
Expand Down Expand Up @@ -110,6 +110,9 @@ object StreamSinkFactory {
val segmentId = getStreamSegmentId(carbonTable)
carbonLoadModel.setSegmentId(segmentId)

// Used to generate load commands for child tables in case auto-handoff is fired.
val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
// start server if necessary
val server = startDictionaryServer(
sparkSession,
Expand All @@ -120,14 +123,16 @@ object StreamSinkFactory {
} else {
carbonLoadModel.setUseOnePass(false)
}

// default is carbon appended stream sink
val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
sparkSession,
carbonTable,
segmentId,
parameters,
carbonLoadModel,
server)
server,
operationContext)

// fire post event before streamin is started
val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
Expand Down
Expand Up @@ -62,7 +62,8 @@ class CarbonAppendableStreamSink(
var currentSegmentId: String,
parameters: Map[String, String],
carbonLoadModel: CarbonLoadModel,
server: Option[DictionaryServer]) extends Sink {
server: Option[DictionaryServer],
operationContext: OperationContext) extends Sink {

private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
Expand Down Expand Up @@ -110,16 +111,14 @@ class CarbonAppendableStreamSink(

// fire pre event on every batch add
// in case of streaming options and optionsFinal can be same
val operationContext = new OperationContext
val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
carbonTable.getCarbonTableIdentifier,
carbonLoadModel,
carbonLoadModel.getFactFilePath,
false,
parameters.asJava,
parameters.asJava,
false
)
false)
OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
checkOrHandOffSegment()

Expand Down Expand Up @@ -180,7 +179,7 @@ class CarbonAppendableStreamSink(
if (enableAutoHandoff) {
StreamHandoffRDD.startStreamingHandoffThread(
carbonLoadModel,
new OperationContext,
operationContext,
sparkSession,
false)
}
Expand Down
Expand Up @@ -179,7 +179,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
}

override def afterAll {
override def afterAll {
dropTable()
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
Expand Down Expand Up @@ -1454,6 +1454,35 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
}

test("test autohandoff with preaggregate tables") {
sql("drop table if exists maintable")
createTable(tableName = "maintable", streaming = true, withBatchLoad = false)
sql("create datamap p1 on table maintable using 'preaggregate' as select name, sum(id) from maintable group by name")
executeStreamingIngest(
tableName = "maintable",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = false)
executeStreamingIngest(
tableName = "maintable",
batchNums = 2,
rowNumsEachBatch = 100,
intervalOfSource = 5,
intervalOfIngest = 5,
continueSeconds = 20,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1L,
autoHandoff = true)
checkAnswer(sql("select count(*) from maintable_p1"), Seq(Row(200)))
}

test("block drop streaming table while streaming is in progress") {
val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
Expand Down