Skip to content

Commit

Permalink
Merge 947128d into 561c144
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Jul 13, 2023
2 parents 561c144 + 947128d commit 59501b3
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,18 @@ object HeadCache {
}


/** Lighweight wrapper over [[EventualJournal]].
*
* Allows easier stubbing in unit tests.
*/
trait Eventual[F[_]] {

/** Gets the last replicated offset for a partition topic.
*
* @see [[EventualJournal#offset]] for more details.
*/
def pointer(topic: Topic, partition: Partition): F[Option[Offset]]

}

object Eventual {
Expand Down Expand Up @@ -183,14 +192,61 @@ object HeadCache {
}


/** Provides methods to update the metrics for [[HeadCache]] internals */
trait Metrics[F[_]] {

/** Report duration and result of cache hits, i.e. [[TopicCache#get]].
*
* @param topic
* Topic journal is being stored in.
* @param latency
* Duration of [[TopicCache#get]] call.
* @param result
* Result of the call, i.e. "ahead", "limited", "timeout" or "failure".
* @param now
* If result was [[PartitionCache.Result.Now]], i.e. entry was already in
* cache.
*/
def get(topic: Topic, latency: FiniteDuration, result: String, now: Boolean): F[Unit]

/** Report health of all [[PartitionCache]] instances related to a topic.
*
* @param topic
* Topic which these [[PartitionCache]] instances are related to.
* @param entries
* Number of distinct journals stored in a topic cache. If it is too
* close to [[HeadCacheConfig.Partition#maxSize]] multiplied by number of
* partitions, the cache might not work efficiently.
* @param listeners
* Number of listeners waiting after [[PartitionCache#get]] call. Too
* many of them might mean that cache is not being loaded fast enough.
*/
def meters(topic: Topic, entries: Int, listeners: Int): F[Unit]

/** Report the latency and number of records coming from Kafka.
*
* I.e. how long it took for a next element in a stream returned by
* [[HeadCacheConsumption#apply]] to get from a journal writer to this
* cache.
*
* @param topic
* Topic being read by [[HeadCacheConsumption]].
* @param age
* Time it took for an element to reach [[HeadCache]].
* @param diff
* The number of elements added to cache by this batch, i.e. returned by
* [[PartitionCache#add]].
*/
def consumer(topic: Topic, age: FiniteDuration, diff: Long): F[Unit]

/** Report the number of records coming from Cassandra.
*
* @param topic
* Topic being read by [[Eventual]].
* @param diff
* The number of elements remove from cache by this batch, i.e. returned
* by [[PartitionCache#remove]].
*/
def storage(topic: Topic, diff: Long): F[Unit]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.evolution.scache.Cache
import scala.concurrent.duration.FiniteDuration


/** Maintains an information about all non-replicated Kafka records.
/** Maintains an information about non-replicated Kafka records in a partition.
*
* The class itself does not read Kafka or poll Cassandra (or other long term
* storage), it relies on the information incoming through
Expand Down Expand Up @@ -636,7 +636,7 @@ object PartitionCache {
* Number of listeners waiting after [[PartitionCache#get]] call. Too many
* of them might mean that cache is not being loaded fast enough.
* @param entries
* Number of distinct journal store in a cache. If it is too close to
* Number of distinct journals stored in a cache. If it is too close to
* maximum configured number, the cache might not work efficiently.
*/
final case class Meters(listeners: Int, entries: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,62 @@ import com.evolution.scache.Cache

import scala.concurrent.duration._


/** Maintains an information about non-replicated Kafka records in a topic.
*
* The implementation reads both Kafka and Cassandra by itself, continously
* refreshing the information.
*/
trait TopicCache[F[_]] {

/** Get the information about a state of a journal stored in the topic.
*
* @param id
* Journal id
* @param partition
* Partition where journal is stored to. The usual way to get the partition
* is to write a "marker" record to Kafka topic and use the partition of
* the marker as a current one.
* @param offset
* Current [[Offset]], i.e. maximum offset where Kafka records related to a
* journal are located. The usual way to get such an offset is to write a
* "marker" record to Kafka patition and use the offset of the marker as a
* current one.
*
* @return
* [[PartitionCache.Result]] with either the current state or indication of
* a reason why such state is not present in a cache.
*
* @see
* [[PartitionCache.Result]] for more details on possible results.
*/
def get(id: String, partition: Partition, offset: Offset): F[PartitionCache.Result[F]]
}

object TopicCache {

/** Creates [[TopicCache]] using configured parameters and data sources.
*
* @param eventual
* Cassandra data source.
* @param topic
* Topic stored in this cache.
* @param log
* Logger used to write debug logs to.
* @param consumer
* Kafka data source.
* @param config
* [[HeadCache]] configuration.
* @param consRecordToActionHeader
* Function used to parse records coming from `consumer`. Only headers will
* be parsed, and the payload will be ignored.
* @param metrics
* Interface to report the metrics to.
* @return
* Resource which will configure a [[TopicCache]] with the passed
* parameters. Instance of `Resource[TopicCache]` are, obviously, reusable
* and there is no need to call [[TopicCache#of]] each time if parameters
* did not change.
*/
def of[F[_]: Async: Parallel: Runtime](
eventual: Eventual[F],
topic: Topic,
Expand Down Expand Up @@ -204,19 +252,50 @@ object TopicCache {
}
}

/** Lighweight wrapper over [[KafkaConsumer]].
*
* Allows easier stubbing in unit tests and provides a little bit more
* convenient [[TopicCache]]-specific API.
*/
trait Consumer[F[_]] {

/** Assigns specific topic partitions to a consumer.
*
* I.e. consumer groups will not be used.
*
* @see
* [[KafkaConsumer#assign]] for more details.
*/
def assign(topic: Topic, partitions: Nes[Partition]): F[Unit]

/** Moves fetching position to a different offset(s).
*
* The read will start from the new offsets the next time [[#poll]] is
* called.
*
* @see
* [[KafkaConsumer#seek]] for more details.
*/
def seek(topic: Topic, offsets: Nem[Partition, Offset]): F[Unit]

/** Fetch data from the previously assigned partitions.
*
* @see
* [[KafkaConsumer#poll]] for more details.
*/
def poll: F[ConsumerRecords[String, Unit]]

/** Get the set of partitions for a given topic.
*
* @see
* [[KafkaConsumer#partitions]] for more details.
*/
def partitions(topic: Topic): F[Set[Partition]]
}

object Consumer {

/** Stub implemenation of [[Consumer]], which never returns any records. */
def empty[F[_]: Applicative]: Consumer[F] = {
class Empty
new Empty with Consumer[F] {
Expand All @@ -234,6 +313,11 @@ object TopicCache {

def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F

/** Wraps existing [[KafkaConsumer]] into [[Consumer]] API.
*
* @param consumer Previously created [[KafkaConsumer]].
* @param pollTimeout The timeout to use for [[KafkaConsumer#poll]].
*/
def apply[F[_]: Monad](
consumer: KafkaConsumer[F, String, Unit],
pollTimeout: FiniteDuration
Expand Down Expand Up @@ -262,6 +346,16 @@ object TopicCache {
}
}

/** Creates a new [[KafkaConsumer]] and wraps it into [[Consumer]] API.
*
* @param config
* Kafka configuration in form of [[ConsumerConfig]]. It is used to get
* Kafka address, mostly, and some important parameters will be ignored,
* as these need to be set to specific values for the cache to work. I.e.
* `autoOffsetReset`, `groupId` and `autoCommit` will not be used.
* @param pollTimeout
* The timeout to use for [[KafkaConsumer#poll]].
*/
def of[F[_]: Monad: KafkaConsumerOf: FromTry](
config: ConsumerConfig,
pollTimeout: FiniteDuration = 10.millis
Expand Down Expand Up @@ -326,12 +420,35 @@ object TopicCache {
}


/** Cumulative average of some data stream.
*
* If one has to calcuate an average for a large list of numbers, one does
* not have to keep all these numbers in a memory. It is enough to keep sum
* of them and the count.
*
* @param sum
* Sum of all numbers seen.
* @param count
* Number of all numbers seen.
*
* Example:
* {{{
* scala> import cats.syntax.all._
* scala> (1L to 100L).toList.map(Sample(_)).combineAll.avg
* val res0: Option[Long] = Some(50)
* }}}
*
* @see
* https://en.wikipedia.org/wiki/Moving_average#Cumulative_average
*/
private final case class Sample(sum: Long, count: Int)

private object Sample {

/** Single number in a stream we are calculating average for */
def apply(value: Long): Sample = Sample(sum = value, count = 1)

/** Initial state of cumulative average, i.e. no numbers registered */
val Empty: Sample = Sample(0L, 0)

implicit val monoidSample: Monoid[Sample] = new Monoid[Sample] {
Expand All @@ -346,6 +463,11 @@ object TopicCache {
}

implicit class SampleOps(val self: Sample) extends AnyVal {

/** Average of the all numbers seen, or `None` if no numbers were added.
*
* @return Average of all numbers seen, rounded down.
*/
def avg: Option[Long] = {
if (self.count > 0) (self.sum / self.count).some else none
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,32 @@ import com.evolutiongaming.skafka.Header
import com.evolutiongaming.skafka.consumer.ConsumerRecord
import scodec.bits.ByteVector

/** Parses Kafka Journal specific header from a generic Kafka record.
*
* Example:
* {{{
* def pollVersions[F[_]: Monad](
* kafkaConsumer: KafkaConsumer[F, String, _],
* consRecordToActionHeader: ConsRecordToActionHeader[F]
* ): F[List[Version]] =
* for {
* recordsByPartition <- kafkaConsumer.poll(10.seconds)
* flattenedRecords = recordsByPartition.values.values.toList.flatMap(_.toList)
* actions <- flattenedRecords.traverseFilter { record => consRecordToActionHeader(record).value }
* } yield actions.flatMap(_.version)
* }}}
*/
trait ConsRecordToActionHeader[F[_]] {

/** Convert generic Kafka record to a Kafka Journal action.
*
* @param record
* The record received from Kafka.
* @return
* [[ActionHeader]] or `F[None]` if [[ActionHeader.key]] is not found. May
* raise a [[JournalError]] if the header is found, but could not be
* parsed.
*/
def apply[A](record: ConsumerRecord[String, A]): OptionT[F, ActionHeader]
}

Expand Down

0 comments on commit 59501b3

Please sign in to comment.