Skip to content

Commit

Permalink
Added locks around checkpoint state.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksey Nikiforov committed May 10, 2018
1 parent 99ff873 commit 9d130be
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,33 @@ import scala.util.control.NonFatal
import scala.collection.JavaConversions._

private[kinesis] class ShardCheckpointTracker(shardCheckpointConfig: ShardCheckpointConfig) {
private val lock = new Object

private var unprocessedInFlightRecords = Queue.empty[KinesisRecord]
private var lastCheckpointedAt = ZonedDateTime.now()
private var lastProcessedButNotCheckpointed = Option.empty[KinesisRecord]
private var processedButNotCheckpointedCount = 0

def nrOfInFlightRecords: Int = {
def nrOfInFlightRecords: Int = lock.synchronized {
unprocessedInFlightRecords.size + processedButNotCheckpointedCount
}
def nrOfProcessedUncheckpointedRecords: Int = {
def nrOfProcessedUncheckpointedRecords: Int = lock.synchronized {
popProcessedRecords()
processedButNotCheckpointedCount
}

def watchForCompletion(records: Iterable[KinesisRecord]): Unit = {
def watchForCompletion(records: Iterable[KinesisRecord]): Unit = lock.synchronized {
unprocessedInFlightRecords ++= records
}

def shouldCheckpoint: Boolean = {
def shouldCheckpoint: Boolean = lock.synchronized {
popProcessedRecords()

processedButNotCheckpointedCount >= shardCheckpointConfig.checkpointAfterProcessingNrOfRecords ||
durationSinceLastCheckpoint() >= shardCheckpointConfig.checkpointPeriod
}

def checkpointLastProcessedRecord(checkpointLogic: KinesisRecord => Unit): Unit = {
def checkpointLastProcessedRecord(checkpointLogic: KinesisRecord => Unit): Unit = lock.synchronized {
popProcessedRecords()

lastProcessedButNotCheckpointed.foreach { kinesisRecord =>
Expand All @@ -56,9 +58,11 @@ private[kinesis] class ShardCheckpointTracker(shardCheckpointConfig: ShardCheckp
}
}

def allInFlightRecordsProcessed: Boolean = unprocessedInFlightRecords.forall(_.completionFuture.isCompleted)
def allInFlightRecordsProcessed: Boolean = lock.synchronized {
unprocessedInFlightRecords.forall(_.completionFuture.isCompleted)
}

def allInFlightRecordsProcessedFuture(implicit ec: ExecutionContext): Future[Done] = {
def allInFlightRecordsProcessedFuture(implicit ec: ExecutionContext): Future[Done] = lock.synchronized {
Future
.sequence(unprocessedInFlightRecords.map(_.completionFuture))
.map(_ => Done)
Expand Down

0 comments on commit 9d130be

Please sign in to comment.