Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Approx 1000ms delay between Produce/Consume #1203

Closed
4 of 8 tasks
TrevorDArcyEvans opened this issue Mar 3, 2020 · 4 comments
Closed
4 of 8 tasks

Approx 1000ms delay between Produce/Consume #1203

TrevorDArcyEvans opened this issue Mar 3, 2020 · 4 comments
Labels
investigate further we want to further investigate to understand properly

Comments

@TrevorDArcyEvans
Copy link

Description

Observed on Linux:

lsb_release -a
No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 18.04.4 LTS
Release:	18.04
Codename:	bionic

uname -r
5.3.0-40-generic

With Producer+Consumer on same system, there is about a 1000ms (~1s) delay between sending a message and receiving it.

How to reproduce

Test code:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace kafka.pubsub.console
{
  public class Program
  {
    // stolen from:
    //    https://github.com/confluentinc/confluent-kafka-dotnet
    public static async Task Main(string[] args)
    {
      const string BootstrapServers = "localhost:9092";
      const string Topic = "test-topic-output";

      const int NumMessages = 100;

      if (args.Length != 0 && args[0] == "-producer")
      {
        // In stream processing applications, where you would like to process many messages in rapid succession,
        // you would typically use the Produce method instead:
        var prodConf = new ProducerConfig
        {
          BootstrapServers = BootstrapServers,
          EnableDeliveryReports = false
        };

        Console.WriteLine($"Sending messages...");

        var sw = Stopwatch.StartNew();
        using (var producer = new ProducerBuilder<Null, string>(prodConf).Build())
        {
          foreach (var _ in Enumerable.Range(0, NumMessages))
          {
            await producer.ProduceAsync(Topic, new Message<Null, string> { Value = DateTime.UtcNow.ToString("O") });
          }

          Console.WriteLine($"  Sent {NumMessages} in {sw.ElapsedMilliseconds} ms");

          // wait for up to 10 seconds for any inflight messages to be delivered.
          producer.Flush(TimeSpan.FromSeconds(10));
        }

        return;
      }


      {
        var consConf = new ConsumerConfig
        {
          GroupId = "test-consumer-group",
          BootstrapServers = BootstrapServers,
          // Note: The AutoOffsetReset property determines the start offset in the event
          // there are not yet any committed offsets for the consumer group for the
          // topic/partitions of interest. By default, offsets are committed
          // automatically, so in this example, consumption will only start from the
          // earliest message in the topic 'test-topic' the first time you run the program.
          AutoOffsetReset = AutoOffsetReset.Earliest
        };

        var sw = Stopwatch.StartNew();
        using (var consumer = new ConsumerBuilder<Ignore, string>(consConf).Build())
        {
          consumer.Subscribe(Topic);

          Console.WriteLine();
          var cts = new CancellationTokenSource();
          Console.CancelKeyPress += (_, e) =>
          {
            e.Cancel = true; // prevent the process from terminating.
            cts.Cancel();
          };
          Console.WriteLine("Ctrl-C to exit.");
          Console.WriteLine();

          var msgCount = 0;
          try
          {
            while (true)
            {
              try
              {
                var cr = consumer.Consume(cts.Token);
                Console.WriteLine($"Consumed message '{cr.Value}' at: '{DateTime.UtcNow.ToString("O")}'.");
                msgCount++;
              }
              catch (ConsumeException e)
              {
                Console.WriteLine($"Error occured: {e.Error.Reason}");
              }
            }
          }
          catch (OperationCanceledException)
          {
            // Ensure the consumer leaves the group cleanly and final offsets are committed.
            consumer.Close();
          }

          Console.WriteLine($"Received {msgCount} in {sw.ElapsedMilliseconds} ms");
        }
      }
    }
  }
}

Steps to reproduce:

  1. open two terminals
  2. in first terminal (consumer), run command:
    ./kafka.pubsub.console
  3. in second terminal (producer), run command:
    ./kafka.pubsub.console -producer
  4. inspect output in first terminal (consumer)

Observed result:
Similar to:

Consumed message '2020-03-03T08:16:23.4105607Z' at: '2020-03-03T08:16:24.4360593Z'.
Consumed message '2020-03-03T08:16:23.4105628Z' at: '2020-03-03T08:16:24.4360690Z'.
Consumed message '2020-03-03T08:16:23.4105649Z' at: '2020-03-03T08:16:24.4360778Z'.
Consumed message '2020-03-03T08:16:23.4105670Z' at: '2020-03-03T08:16:24.4360869Z'.
Consumed message '2020-03-03T08:16:23.4105692Z' at: '2020-03-03T08:16:24.4360955Z'.
Consumed message '2020-03-03T08:16:23.4105741Z' at: '2020-03-03T08:16:24.4361047Z'.
Consumed message '2020-03-03T08:16:23.4105763Z' at: '2020-03-03T08:16:24.4361130Z'.
Consumed message '2020-03-03T08:16:23.4105784Z' at: '2020-03-03T08:16:24.4361214Z'.
Consumed message '2020-03-03T08:16:23.4105805Z' at: '2020-03-03T08:16:24.4361298Z'.
Consumed message '2020-03-03T08:16:23.4105826Z' at: '2020-03-03T08:16:24.4361396Z'.
Consumed message '2020-03-03T08:16:23.4105847Z' at: '2020-03-03T08:16:24.4361479Z'.
Consumed message '2020-03-03T08:16:23.4105868Z' at: '2020-03-03T08:16:24.4361671Z'.
Consumed message '2020-03-03T08:16:23.4105889Z' at: '2020-03-03T08:16:24.4361760Z'.
Consumed message '2020-03-03T08:16:23.4105910Z' at: '2020-03-03T08:16:24.4361844Z'.
Consumed message '2020-03-03T08:16:23.4105932Z' at: '2020-03-03T08:16:24.4361928Z'.
Consumed message '2020-03-03T08:16:23.4105953Z' at: '2020-03-03T08:16:24.4362011Z'.
Consumed message '2020-03-03T08:16:23.4105974Z' at: '2020-03-03T08:16:24.4362095Z'.
Consumed message '2020-03-03T08:16:23.4105995Z' at: '2020-03-03T08:16:24.4362180Z'.
Consumed message '2020-03-03T08:16:23.4106016Z' at: '2020-03-03T08:16:24.4362262Z'.
Consumed message '2020-03-03T08:16:23.4106037Z' at: '2020-03-03T08:16:24.4362345Z'.

Note there is about a 1000ms delay between message being produced and consumed.

Expected result:
Delay between producing and consuming message would be a lot less, of the order of 1-10ms (?)

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version. 1.4.0-RC2
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.
@mhowlett
Copy link
Contributor

mhowlett commented Mar 3, 2020

thanks for the example, I see the same thing. the additional delay is pretty much exactly 1000ms, which looks suspiciously like something operating as designed, but I don't know what that is without digging deeper. It reminds me of socket.blocking.max.ms issues in earlier versions, but that's depreciated. I notice the delay applies to the first message - producing one message then waiting 1s, then producing 100, gives me ~2ms latency on all but the first message. Next step: @edenhill - do you know what this is off the top of your head?

some other misc notes:

  • you're synchronously producing messages, which will limit throughput to the request/response round trip time (~< 500 msg/s to localhost). asynchronously producing can get as high as 500000 msg/s.
  • the producer flush is unnecessary because you're synchronously producing.
  • it's very expensive to create a client, so your elapsed times will be distorted by this.

@mhowlett mhowlett added the investigate further we want to further investigate to understand properly label Mar 3, 2020
@edenhill
Copy link
Contributor

edenhill commented Mar 4, 2020

@mhowlett
Copy link
Contributor

I suspect this is this: confluentinc/librdkafka#2865 which will be included in librdkafka v1.5.

@mhowlett
Copy link
Contributor

closing because i believe this is resolved in v1.5. Please re-open if the problem persists.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
investigate further we want to further investigate to understand properly
Projects
None yet
Development

No branches or pull requests

3 participants