Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer #57

Closed
z4f1r0v opened this issue Nov 26, 2015 · 1 comment
Closed

Kafka consumer #57

z4f1r0v opened this issue Nov 26, 2015 · 1 comment

Comments

@z4f1r0v
Copy link

z4f1r0v commented Nov 26, 2015

I have a project that deals with high volume of twitter data. Kafka is used for queuing tweets for later processing. I have two metrics of interest - one that shows how much time it takes for a tweet to come in the system before being put to Kafka and a second that displays the total time it takes from creating a tweet to storing it in a db after it is fetched from Kafka. The first one is pretty much constant. The second however is slowly rising.

I'm investigating as to what the cause could be and I ended up with the growing number of tweets in Kafka as the probable reason. And it kind of makes sense since the time displayed by the second metric is proportional to the increasing number of tweets. It could be both the way the kafka is setup and the way the I'm reading from the queue.

Something that may be of interest is that I have a implemented a simple partitionizer for the Producer because I need the tweets in the order they are created with respect to a particular conversation.

private def createSupervisedSubscriberActor() = {
    val kafka = new ReactiveKafka()

    val subscriberProperties = ProducerProperties(
      brokerList = config.getStringList("kafka.brokers").toList.mkString(","),
      topic = config.getString("kafka.topic"),
      clientId = config.getString("kafka.clientId"),
      encoder = new StringEncoder(),
      partitionizer
    )

    val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
    context.actorOf(subscriberActorProps, subscriber)
}

private def partitionizer: String => Option[Array[Byte]] = (s: String) => Option(s.getBytes)

On the consumer side I am simply reading from the queue. I expect that I'm just getting the latest message every time something is pushed to the end of the queue. Am I right? Can you suggest something that I could do to optimize my reading from Kafka? How does the size of the queue effect the way the consumer reads from it?

Please let me know if you are missing any info.
Thanks for the help!

@z4f1r0v
Copy link
Author

z4f1r0v commented Nov 26, 2015

Found the problem! The clock on the AWS instance had drifted 3 seconds :)

@z4f1r0v z4f1r0v closed this as completed Nov 26, 2015
@ennru ennru added this to the invalid milestone Jun 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants