Skip to content

Commit

Permalink
Merge fbb0b2f into cb9b306
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Mar 10, 2021
2 parents cb9b306 + fbb0b2f commit dafadc4
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,35 @@ object TimerFlowOf {
def register(touchedAt: Timestamp) =
timers.registerProcessing(touchedAt.clock plusMillis fireEvery.toMillis)

val acquire = for {
current <- timers.current
persistedAt <- timers.persistedAt
committedAt = persistedAt getOrElse current
_ <- context.hold(committedAt.offset)
_ <- register(committedAt)
} yield new TimerFlow[F] {
def onTimer = for {
val acquire = Resource.liftF {
for {
current <- timers.current
processedAt <- timers.processedAt
touchedAt = processedAt getOrElse committedAt
expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
expired = current.clock isAfter expiredAt
offsetDifference = current.offset.value - touchedAt.offset.value
_ <- if (expired || offsetDifference > maxOffsetDifference) {
context.log.info(s"flush, offset difference: $offsetDifference") *>
persistence.flush *>
context.remove
} else {
register(touchedAt)
}
} yield ()
persistedAt <- timers.persistedAt
committedAt = persistedAt getOrElse current
_ <- context.hold(committedAt.offset)
_ <- register(committedAt)
} yield new TimerFlow[F] {
def onTimer = for {
current <- timers.current
processedAt <- timers.processedAt
touchedAt = processedAt getOrElse committedAt
expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
expired = current.clock isAfter expiredAt
offsetDifference = current.offset.value - touchedAt.offset.value
_ <- if (expired || offsetDifference > maxOffsetDifference) {
context.log.info(s"flush, offset difference: $offsetDifference") *>
persistence.flush *>
context.remove
} else {
register(touchedAt)
}
} yield ()
}
}

val cancel = for {
holding <- context.holding
_ <- if (holding.isDefined && flushOnRevoke) {
context.log.info(s"flush on revoke, holding offset: $holding") *>
persistence.flush *>
context.remove
} else {
().pure[F]
}
} yield ()
val cancel = flushOnCancel.apply(context, persistence, timers)

Resource.makeCase(acquire) {
case (_, Completed) => cancel
case (_, Canceled) => cancel
// there is no point to try flushing if it failed with an error
// the state might not be consistend and storage not accessible
// plus this is a concurrent operation, and we do not want anything
// to happen concurrently for a specific key
case (_, _) => ().pure[F]
}
if (flushOnRevoke) acquire *> cancel else acquire

}

Expand All @@ -95,13 +80,14 @@ object TimerFlowOf {
*/
def persistPeriodically[F[_]: Monad](
fireEvery: FiniteDuration = 1.minute,
persistEvery: FiniteDuration = 1.minute
persistEvery: FiniteDuration = 1.minute,
flushOnRevoke: Boolean = false,
): TimerFlowOf[F] = { (context, persistence, timers) =>

def register(current: Timestamp) =
timers.registerProcessing(current.clock plusMillis fireEvery.toMillis)

Resource.liftF {
val acquire = Resource.liftF {
for {
current <- timers.current
persistedAt <- timers.persistedAt
Expand All @@ -124,5 +110,35 @@ object TimerFlowOf {
}
}

val cancel = flushOnCancel.apply(context, persistence, timers)

if (flushOnRevoke) acquire *> cancel else acquire

}

/** Performs flush when `Resource` is cancelled only */
def flushOnCancel[F[_]: Monad]: TimerFlowOf[F] = { (context, persistence, _) =>

val cancel = for {
holding <- context.holding
_ <- if (holding.isDefined) {
context.log.info(s"flush on revoke, holding offset: $holding") *>
persistence.flush *>
context.remove
} else {
().pure[F]
}
} yield ()

Resource.makeCase(TimerFlow.empty.pure) {
case (_, Completed) => cancel
case (_, Canceled) => cancel
// there is no point to try flushing if it failed with an error
// the state might not be consistend and storage not accessible
// plus this is a concurrent operation, and we do not want anything
// to happen concurrently for a specific key
case (_, _) => ().pure[F]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class TimerFlowOfSpec extends FunSuite {
}
val result = program.runS(context).unsafeRunSync()

// Then("flush happens and remove do not happen")
// Then("neither flush or remove happens")
assertEquals(result.flushed, 0)
assertEquals(result.removed, 0)

Expand Down Expand Up @@ -142,7 +142,45 @@ class TimerFlowOfSpec extends FunSuite {
}
val result = program.runS(context).unsafeRunSync()

// Then("flush happens and remove do not happen")
// Then("neither flush or remove happens")
assertEquals(result.flushed, 0)
assertEquals(result.removed, 0)

}

test("unloadOrphaned flushes when resource is cancelled if configured to do so") {

val f = new ConstFixture

// Given("flow is configured to flush on revoke")
val context = Context(timestamps = TimestampState(f.timestamp))
val flowOf = TimerFlowOf.unloadOrphaned[F](flushOnRevoke = true)
val flow = flowOf(keyContext, flushBuffers, timerContext)

// When("flow is started and cancelled")
val program = flow use { _ => ().pure[F] }
val result = program.runS(context).unsafeRunSync()

// Then("state is flushed and removed")
assertEquals(result.flushed, 1)
assertEquals(result.removed, 1)

}

test("unloadOrphaned flushes when resource is cancelled if not configured to do so") {

val f = new ConstFixture

// Given("flow is configured to flush on revoke")
val context = Context(timestamps = TimestampState(f.timestamp))
val flowOf = TimerFlowOf.unloadOrphaned[F](flushOnRevoke = false)
val flow = flowOf(keyContext, flushBuffers, timerContext)

// When("flow is started and cancelled")
val program = flow use { _ => ().pure[F] }
val result = program.runS(context).unsafeRunSync()

// Then("neither flush or remove happens")
assertEquals(result.flushed, 0)
assertEquals(result.removed, 0)

Expand Down Expand Up @@ -228,6 +266,44 @@ class TimerFlowOfSpec extends FunSuite {

}

test("persistPeriodically flushes when resource is cancelled if configured to do so") {

val f = new ConstFixture

// Given("flow is configured to flush on revoke")
val context = Context(timestamps = TimestampState(f.timestamp))
val flowOf = TimerFlowOf.persistPeriodically[F](flushOnRevoke = true)
val flow = flowOf(keyContext, flushBuffers, timerContext)

// When("flow is started and cancelled")
val program = flow use { _ => ().pure[F] }
val result = program.runS(context).unsafeRunSync()

// Then("state is flushed and removed")
assertEquals(result.flushed, 1)
assertEquals(result.removed, 1)

}

test("persistPeriodically flushes when resource is cancelled if not configured to do so") {

val f = new ConstFixture

// Given("flow is configured to flush on revoke")
val context = Context(timestamps = TimestampState(f.timestamp))
val flowOf = TimerFlowOf.persistPeriodically[F](flushOnRevoke = false)
val flow = flowOf(keyContext, flushBuffers, timerContext)

// When("flow is started and cancelled")
val program = flow use { _ => ().pure[F] }
val result = program.runS(context).unsafeRunSync()

// Then("neither flush or remove happens")
assertEquals(result.flushed, 0)
assertEquals(result.removed, 0)

}

}
object TimerFlowSpec {

Expand Down

0 comments on commit dafadc4

Please sign in to comment.