Skip to content

Commit

Permalink
[CARBONDATA-1612][CARBONDATA-1615][Streaming] Support delete segment …
Browse files Browse the repository at this point in the history
…for streaming table

This closes apache#1497
  • Loading branch information
jackylk authored and anubhav100 committed Jun 22, 2018
1 parent cedf40d commit ae97002
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ case class ShowLoadsCommand(
extends Command {

override def output: Seq[Attribute] = {
Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
Seq(AttributeReference("Segment Id", StringType, nullable = false)(),
AttributeReference("Status", StringType, nullable = false)(),
AttributeReference("Load Start Time", TimestampType, nullable = false)(),
AttributeReference("Load End Time", TimestampType, nullable = false)(),
AttributeReference("Load End Time", TimestampType, nullable = true)(),
AttributeReference("Merged To", StringType, nullable = false)())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.carbondata

import java.io.{File, PrintWriter}
import java.net.ServerSocket
import java.util.{Calendar, Date}
import java.util.concurrent.Executors

import scala.collection.mutable
Expand Down Expand Up @@ -103,6 +104,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {

// 10. fault tolerant
createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)

// 11. table for delete segment test
createTable(tableName = "stream_table_delete", streaming = true, withBatchLoad = false)
}

test("validate streaming property") {
Expand Down Expand Up @@ -181,6 +185,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists streaming.stream_table_compact")
sql("drop table if exists streaming.stream_table_new")
sql("drop table if exists streaming.stream_table_tolerant")
sql("drop table if exists streaming.stream_table_delete")
}

// normal table not support streaming ingest
Expand Down Expand Up @@ -578,8 +583,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecordAction = "force",
handoffSize = 1024L * 200
)
sql("show segments for table streaming.stream_table_new").show(100, false)

assert(sql("show segments for table streaming.stream_table_new").count() == 4)

checkAnswer(
Expand All @@ -588,6 +591,51 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
}

test("test deleting streaming segment by ID while ingesting") {
executeStreamingIngest(
tableName = "stream_table_delete",
batchNums = 6,
rowNumsEachBatch = 10000,
intervalOfSource = 3,
intervalOfIngest = 5,
continueSeconds = 15,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1024L * 200
)
val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
val segmentId = beforeDelete.map(_.getString(0)).mkString(",")
sql(s"delete from table streaming.stream_table_delete where segment.id in ($segmentId) ")

val rows = sql("show segments for table streaming.stream_table_delete").collect()
rows.foreach { row =>
assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
}
}

test("test deleting streaming segment by date while ingesting") {
executeStreamingIngest(
tableName = "stream_table_delete",
batchNums = 6,
rowNumsEachBatch = 10000,
intervalOfSource = 3,
intervalOfIngest = 5,
continueSeconds = 15,
generateBadRecords = false,
badRecordAction = "force",
handoffSize = 1024L * 200
)
val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()

sql(s"delete from table streaming.stream_table_delete where segment.starttime before '2999-10-01 01:00:00'")

val rows = sql("show segments for table streaming.stream_table_delete").collect()
assertResult(beforeDelete.length)(rows.length)
rows.foreach { row =>
assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
}
}

def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
Expand Down Expand Up @@ -675,6 +723,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
}

/**
* start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
*/
def executeStreamingIngest(
tableName: String,
batchNums: Int,
Expand Down

0 comments on commit ae97002

Please sign in to comment.