Skip to content

Commit

Permalink
Merge 912e13e into c7d0420
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Jul 17, 2023
2 parents c7d0420 + 912e13e commit 4a2976a
Showing 1 changed file with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,33 @@ import scala.util.control.NoStackTrace

object HeadCacheConsumption {

/** Streams records from Kafka topic with error handling and retries.
*
* If a consumer fails then the it will be recreated, so the consumption is
* continued. The retry procedure will happen with some random jitter to
* ensure different nodes do not intefere with each other.
*
* Note, that `pointer` parameter is also wrapped in `F[_]`, i.e. the latest
* partition offsets will be used on each consumer failure, so head cache
* does not have to read records, which were already seen.
*
* @param topic
* Kafka topic where journal events are stored.
* @param pointers
* Partition offsets to start the reading from. These offsets, usually,
* come from the cache itself, which gets prepopulated by the information
* received from Cassandra. If partition is not included into a `Map` then
* [[Offset#min]] will be used, instead.
* @param consumer
* Kafka consumer factory. The consumer might be recreated in case of the
* error.
* @param log
* Log to write the consumer failures to.
* @return
* Records from Kafka topic for all the partitions in `topic`. The method
* does not raise erros, but tries to restart consumer on failure until it
* succeeds.
*/
def apply[F[_]: BracketThrowable: Sleep](
topic: Topic,
pointers: F[Map[Partition, Offset]],
Expand Down Expand Up @@ -79,6 +106,11 @@ object HeadCacheConsumption {
} yield records
}


/** Consumer did not return any partitions.
*
* This consumer does not use consumer groups, i.e. all partitions should
* have been returned, so the likely reason could be that the topic is not
* properly initialized yet.
*/
case object NoPartitionsError extends RuntimeException("No partitions") with NoStackTrace
}

0 comments on commit 4a2976a

Please sign in to comment.