Skip to content

Commit

Permalink
Simplified a code a bit.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 5, 2020
1 parent 9e42599 commit f28c9ff
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,7 @@ object KafkaFlowSpec {
} flatMap {
case None => ConsRecords.empty.pure[F]
case Some(Command.ProduceRecords(records)) => records.pure[F]
case Some(Command.RemovePartitions(partitions)) =>
state.get flatMap { state =>
val revoke = state.actions collectFirst { case action: Action.Subscribe =>
action.listener.onPartitionsRevoked(
partitions map { partition => TopicPartition(action.topic, partition) }
)
}
revoke.sequence_ *> poll(timeout)
}
case Some(Command.RemovePartitions(partitions)) => revoke(partitions) *> poll(timeout)
case Some(Command.Fail(error)) => error.raiseError[F, ConsRecords]
}

Expand All @@ -194,6 +186,16 @@ object KafkaFlowSpec {
def position(partition: TopicPartition) =
Offset.min.pure[F]

def revoke(partitions: NonEmptySet[Partition]) =
state.get flatMap { state =>
val revoke = state.actions collectFirst { case action: Action.Subscribe =>
action.listener.onPartitionsRevoked(
partitions map { partition => TopicPartition(action.topic, partition) }
)
}
revoke.sequence_
}

}

val result: F[(Consumer[F], F[Unit])] = state modify { s =>
Expand Down

0 comments on commit f28c9ff

Please sign in to comment.