Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package za.co.absa.kafkacase.examples.reader
import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.reader.ReaderImpl
import za.co.absa.kafkacase.reader.Reader

object ReaderCustomResourceHandling {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
withResource(ReaderImpl[T](readerConf, topicName))(reader => {
withResource(Reader[T](readerConf, topicName))(reader => {
for (item <- reader)
println(item)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl
import za.co.absa.kafkacase.reader.Reader

object ReaderManualResourceHandling {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
val reader = ReaderImpl[T](readerConf, topicName)
val reader = Reader[T](readerConf, topicName)
try {
for (item <- reader)
println(item)
Expand Down
62 changes: 53 additions & 9 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package za.co.absa.kafkacase.reader
import com.typesafe.config.Config
import io.circe.Decoder

import java.time.Duration
import java.util.Properties

trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoCloseable

object Reader {
def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = ReaderImpl[T](readerProps, topicName)
private val DEFAULT_TIMEOUT = Duration.ofSeconds(6)
private val DEFAULT_NEVER_ENDING = false

def readOnce[T: Decoder](reader: Reader[T], work: ((String, Either[String, T])) => Unit): Unit = {
try {
for (item <- reader)
work(item)
Expand All @@ -34,13 +37,54 @@ object Reader {
}
}

def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = ReaderImpl[T](readerConf, topicName)
try {
for (item <- reader)
work(item)
} finally {
reader.close()
// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods
// Primary method that contains default arguments
def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit, neverEnding: Boolean = DEFAULT_NEVER_ENDING): Unit =
readOnce(apply[T](readerProps, topicName, neverEnding = neverEnding), work)

// With never ending
def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit, neverEnding: Boolean): Unit =
readOnce(apply[T](readerConf, topicName, neverEnding = neverEnding), work)

// Without never ending
def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit =
readOnce(apply[T](readerConf, topicName, neverEnding = DEFAULT_NEVER_ENDING), work)

// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods
// Primary method that contains default arguments
def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): Reader[TType] = {
if (neverEnding)
new ReaderNeverEnding[TType](props, topic, timeout)
else
new ReaderEnding[TType](props, topic, timeout)
}

// Overloaded method with Config and all optional arguments
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): Reader[TType] = {
val props = convertConfigToProperties(config)
apply[TType](props, topic, timeout, neverEnding)
}

// Overloaded method with Config and neverEnding optional argument
def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): Reader[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding)
}

// Overloaded method with Config and timeout optional argument
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): Reader[TType] = {
apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING)
}

// Overloaded method with Config and none of optional arguments
def apply[TType: Decoder](config: Config, topic: String): Reader[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING)
}

private def convertConfigToProperties(config: Config): Properties = {
val properties = new Properties()
config.entrySet().forEach { entry =>
properties.put(entry.getKey, config.getString(entry.getKey))
}
properties
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.reader

import io.circe.Decoder
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import za.co.absa.kafkacase.reader.ReaderEnding.log
import za.co.absa.kafkacase.reader.ReaderTools.parseRecord

import java.time.Duration
import java.util
import java.util.Properties

class ReaderEnding[TType: Decoder](props: Properties, topic: String, timeout: Duration) extends Reader[TType] {
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
log.info("Fetching initial batch")
private var singlePollIterator = consumer.poll(timeout).iterator()

override def hasNext: Boolean = singlePollIterator.hasNext

override def next(): (String, Either[String, TType]) = {
log.info("Fetching next item")
val nextItem = singlePollIterator.next()
// Fetch next batch before return, so hasNext can answer correctly
if (!singlePollIterator.hasNext) {
log.info("Fetching next batch")
singlePollIterator = consumer.poll(timeout).iterator()
}
parseRecord(nextItem)
}

def close(): Unit = consumer.close()
}

object ReaderEnding {
private val log = LoggerFactory.getLogger(this.getClass)
}
101 changes: 0 additions & 101 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.reader

import io.circe.Decoder
import org.slf4j.LoggerFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import za.co.absa.kafkacase.reader.ReaderNeverEnding.log
import za.co.absa.kafkacase.reader.ReaderTools.parseRecord

import java.time.Duration
import java.util
import java.util.Properties

class ReaderNeverEnding[TType: Decoder](props: Properties, topic: String, timeout: Duration) extends Reader[TType] {
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
log.info("Fetching initial batch")
private var singlePollIterator = consumer.poll(timeout).iterator()

override def hasNext: Boolean = true

override def next(): (String, Either[String, TType]) = {
while(!singlePollIterator.hasNext) {
log.info("(Re)Fetching next batch")
singlePollIterator = consumer.poll(timeout).iterator()
}
log.info("Fetching next item")
val nextItem = singlePollIterator.next()
parseRecord(nextItem)
}

def close(): Unit = consumer.close()
}

object ReaderNeverEnding {
private val log = LoggerFactory.getLogger(this.getClass)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.reader

import io.circe.Decoder
import io.circe.jawn.decode
import org.apache.kafka.clients.consumer.ConsumerRecord

object ReaderTools {
def parseRecord[TType: Decoder](record: ConsumerRecord[String, String]): (String, Either[String, TType]) = {
val maybeTyped = decode[TType](record.value()) match {
case Left(_) => Left(s"Cannot parse ${record.value()}")
case Right(item) => Right(item)
}
record.key() -> maybeTyped
}
}
Loading