Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16811 Sliding window approach to calculate non-zero punctuate-ratio metric #16162

Open
wants to merge 13 commits into
base: trunk
Choose a base branch
from

Conversation

ganesh-sadanala
Copy link
Contributor

@ganesh-sadanala ganesh-sadanala commented Jun 1, 2024

This pull request changes the method to calculate the punctuate-ratio metric. The current implementation calculates the metric after the last record of the poll loop. After a puntuate, the value is close to 1, but there is little chance that metric is sampled at this time. So its value is almost always 0.

The updated implementation calculates the metric value over the window of last 30 seconds.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ganesh-sadanala ganesh-sadanala marked this pull request as ready for review June 1, 2024 03:45
@ganesh-sadanala ganesh-sadanala changed the title Sliding window approach to calculate non-zero punctuate-ratio metric KAFKA-16811 Sliding window approach to calculate non-zero punctuate-ratio metric Jun 1, 2024
@ganesh-sadanala ganesh-sadanala marked this pull request as draft June 1, 2024 03:47
@ganesh-sadanala ganesh-sadanala marked this pull request as ready for review June 2, 2024 01:04
@ganesh-sadanala
Copy link
Contributor Author

ganesh-sadanala commented Jun 2, 2024

I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: #16162

I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. 

  1. Start ZooKeeper, Kafka Broker.
  2. Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) 
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 
  1. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors:
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo 
  1. Produce and consume data
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning

 5. Open the jconsole and watch the metrics

I see that all the metrics are getting calculated. When I run the debugger, I see that in this code tasks.activeTasks() is an empty list. Because of that punctuated values is becoming zero, hence the punctuate ratio.

TaskExecutor.java

int punctuate() {
        int punctuated = 0;

        for (final Task task : tasks.activeTasks()) {
            try {
                if (executionMetadata.canPunctuateTask(task)) {
                    if (task.maybePunctuateStreamTime()) {
                        punctuated++;
                    }
                    if (task.maybePunctuateSystemTime()) {
                        punctuated++;
                    }
                }
            } catch (final TaskMigratedException e) {
                log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " +
                    "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
                throw e;
            } catch (final StreamsException e) {
                log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e);
                e.setTaskId(task.id());
                throw e;
            } catch (final KafkaException e) {
                log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e);
                throw new StreamsException(e, task.id());
            }
        }

        return punctuated;
    }
} 

Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests?

Is this behaviour normal in the local environment?

@mjsax
Copy link
Member

mjsax commented Jun 5, 2024

Following up from Jira.

With 3 input topic partitions, there should be 3 tasks, one for each partition. But the application must first join the consumer group and the rebalancing must finish, before tasks are created. Do you might want to wait until the app goes into RUNNING state?

However, this condition should never be true (https://github.com/apache/kafka/pull/16162/files#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R1028) w/o a punctuator and thus the metric should stay at zero.

In the end, we should have some unit test anyway? Build it failing right now, due to missing license header in the newly added files. You need to fix this. -- Did you locally run tests? I would hope we have some test coverage already, and would expect that these test would need to get updated, too?

@ganesh-sadanala
Copy link
Contributor Author

With 3 input topic partitions, there should be 3 tasks, one for each partition. But the application must first join the consumer group and the rebalancing must finish, before tasks are created. Do you might want to wait until the app goes into RUNNING state?

Thank you! However, I made a mistake running the wrong file with older implementation, thus it was still showing ratio as 0.

Ran the update file, and with all setup, voila! it woks now!
Screenshot 2024-06-05 at 7 20 57 PM

In the end, we should have some unit test anyway? Build it failing right now, due to missing license header in the newly added files. You need to fix this. -- Did you locally run tests? I would hope we have some test coverage already, and would expect that these test would need to get updated, too?

License headers are fixed.

Regarding Unit tests, I don't see existing test needs any revisal since there is no test already existing testing the punctuate-ratio metric. The existing tests test if all the setup and configuration are done well. Since punctuate-ratio can always be >=0, we can't add a test saying it is never 0.

Do you have any specific details to be added as a test for checking if any metric value is valid/invalid?

public double getAverageRatio() {
return ratioQueue.stream()
.mapToDouble(RatioTimeStamp::getRatio)
.average()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we are computing an average over a ratio. Is this mathematically sound? I believe not.

When we record punctuation ratio, the "time frame" over with the ration is computed is not guaranteed to be of a fixed size. Thus, the different ratios would need to be weighted differently for a correct computation?

Seem, instead of keeping window of ratio samples, we should rather keep the raw latency and runtime values, ie, a window of pairs totalPunctuateLatency and runOnceLatency , including a pre-computed sumTotalPunctuateLatency and sumRunOnceLatency and compute this result as sumTotalPunctuateLatency / sumRunOnceLatency ?

When updating this queue, we can also update both running sums by adding new and removing old values.

@mjsax
Copy link
Member

mjsax commented Jun 19, 2024

Do we need to also update/refine the description of the metric in docs/ops.html? -- Even wondering if this might need a KIP (strictly speaking) as we change the semantics of the metric? \cc @cadonna WDYT?

@cadonna
Copy link
Contributor

cadonna commented Jun 20, 2024

I agree with @mjsax that this would need a KIP. KIP-444 states that this metric is a dynamic gauge.

I was also wondering whether it would be possible to use a MeasureableStat that are provided by Kafka (e.g WindowedSum) instead of implementing a separate computation in Streams.

@ganesh-sadanala
Copy link
Contributor Author

@mjsax @cadonna Thanks for the insights. I will make the necessary changes and would love to write KIP for this change. I recently requested the confluence account for another KIP, and it got approved.

This is one of my first two KIPs. If anything you want to share that would help me, please do it. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants