Skip to content

Commit

Permalink
Merge f28c9ff into 45093b8
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 5, 2020
2 parents 45093b8 + f28c9ff commit 966b9df
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ object ConsumerFlow {
}
}

val unsubscribe = log[F] flatMap { log =>
log.info("unsubscribing") *> consumer.unsubscribe
}

val subscription = Resource.make(subscribe) { _ => unsubscribe }

def poll =
consumer.poll(config.pollTimeout) flatTap { consumerRecords =>
flows.toList traverse { case (topic, flow) =>
Expand All @@ -121,7 +127,7 @@ object ConsumerFlow {
}

def stream = for {
_ <- Stream.lift(subscribe)
_ <- Stream.fromResource(subscription)
records <- Stream.repeat(poll)
// we process empty polls to trigger timers, but do not return them
if records.values.nonEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ trait Consumer[F[_]] {

def subscribe(topic: Topic, listener: RebalanceListener[F]): F[Unit]

def unsubscribe: F[Unit]

def poll(timeout: FiniteDuration): F[ConsRecords]

def commit(offsets: NonEmptyMap[TopicPartition, OffsetAndMetadata]): F[Unit]
Expand All @@ -39,6 +41,9 @@ object Consumer {
def subscribe(topic: Topic, listener: RebalanceListener[F]) =
consumer.subscribe(NonEmptySet.of(topic), listener.some)

def unsubscribe =
consumer.unsubscribe

def poll(timeout: FiniteDuration) =
consumer.poll(timeout)

Expand All @@ -50,4 +55,4 @@ object Consumer {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class KafkaFlowSpec extends FunSuite {
actions = List(
Action.ReleaseConsumer,
Action.ReleaseTopicFlow,
Action.Unsubscribe,
Action.Poll(consumerRecords(consumerRecord(partition = 0, offset = 0))),
Action.Poll(ConsumerRecords.empty),
Action.Poll(ConsumerRecords.empty),
Expand All @@ -69,6 +70,7 @@ class KafkaFlowSpec extends FunSuite {
actions = List(
Action.ReleaseConsumer,
Action.ReleaseTopicFlow,
Action.Unsubscribe,
Action.Poll(consumerRecords(consumerRecord(partition = 0, offset = 0))),
Action.Poll(ConsumerRecords.empty),
Action.Subscribe(topic)(RebalanceListener.empty),
Expand All @@ -77,6 +79,7 @@ class KafkaFlowSpec extends FunSuite {
Action.RetryOnError(Error, OnError.Decision.retry(1.millis)),
Action.ReleaseConsumer,
Action.ReleaseTopicFlow,
Action.Unsubscribe,
Action.Poll(ConsumerRecords.empty),
Action.Subscribe(topic)(RebalanceListener.empty),
Action.AcquireTopicFlow,
Expand All @@ -100,6 +103,7 @@ class KafkaFlowSpec extends FunSuite {
actions = List(
Action.ReleaseConsumer,
Action.ReleaseTopicFlow,
Action.Unsubscribe,
Action.Poll(consumerRecords(consumerRecord(partition = 0, offset = 0))),
Action.RemovePartitions(NonEmptySet.of(Partition.unsafe(1))),
Action.Poll(ConsumerRecords.empty),
Expand Down Expand Up @@ -162,22 +166,17 @@ object KafkaFlowSpec {
def subscribe(topic: Topic, listener: RebalanceListener[F]) =
state update (_ + Action.Subscribe(topic)(listener))

def unsubscribe =
state update (_ + Action.Unsubscribe)

def poll(timeout: FiniteDuration) =
state modify {
case state @ State(Nil, _) => (state, none[Command])
case state @ State(head :: tail, _) => (state.copy(commands = tail), Some(head))
} flatMap {
case None => ConsRecords.empty.pure[F]
case Some(Command.ProduceRecords(records)) => records.pure[F]
case Some(Command.RemovePartitions(partitions)) =>
state.get flatMap { state =>
val revoke = state.actions collectFirst { case action: Action.Subscribe =>
action.listener.onPartitionsRevoked(
partitions map { partition => TopicPartition(action.topic, partition) }
)
}
revoke.sequence_ *> poll(timeout)
}
case Some(Command.RemovePartitions(partitions)) => revoke(partitions) *> poll(timeout)
case Some(Command.Fail(error)) => error.raiseError[F, ConsRecords]
}

Expand All @@ -187,6 +186,16 @@ object KafkaFlowSpec {
def position(partition: TopicPartition) =
Offset.min.pure[F]

def revoke(partitions: NonEmptySet[Partition]) =
state.get flatMap { state =>
val revoke = state.actions collectFirst { case action: Action.Subscribe =>
action.listener.onPartitionsRevoked(
partitions map { partition => TopicPartition(action.topic, partition) }
)
}
revoke.sequence_
}

}

val result: F[(Consumer[F], F[Unit])] = state modify { s =>
Expand Down Expand Up @@ -250,6 +259,7 @@ object KafkaFlowSpec {
case object ReleaseTopicFlow extends Action
case object AcquireConsumer extends Action
case object ReleaseConsumer extends Action
case object Unsubscribe extends Action
final case class RemovePartitions(partitions: NonEmptySet[Partition]) extends Action
final case class AddPartitions(partitions: NonEmptySet[(Partition, Offset)]) extends Action
final case class Subscribe(topic: Topic)(val listener: RebalanceListener[F]) extends Action
Expand Down

0 comments on commit 966b9df

Please sign in to comment.