Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions tests/ducktape/consumer_benchmark_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@
self.operation_attempts += 1
self.operation_latencies.append(operation_latency_ms)

def record_timeout(self, topic: str = "unknown"):

Check warning on line 61 in tests/ducktape/consumer_benchmark_metrics.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/consumer_benchmark_metrics.py#L61

Remove the unused function parameter "topic".
"""Record a consumer operation timeout"""
self.operation_timeouts += 1

def record_error(self, error_msg: str, topic: str = "unknown"):

Check warning on line 65 in tests/ducktape/consumer_benchmark_metrics.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/consumer_benchmark_metrics.py#L65

Remove the unused function parameter "topic".
"""Record a consumer error"""
self.operation_errors += 1
self.error_messages.append(error_msg)

def record_processed_message(self, message_size: int, topic: str, partition: int,
offset: int, operation_latency_ms: float):

Check warning on line 71 in tests/ducktape/consumer_benchmark_metrics.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/consumer_benchmark_metrics.py#L71

Remove the unused function parameter "operation_latency_ms".
"""Record a successfully processed message"""
self.messages_consumed += 1
self.total_bytes += message_size
Expand Down Expand Up @@ -119,7 +119,7 @@
return sorted_data[f]
return sorted_data[f] * (1 - c) + sorted_data[f + 1] * c

def get_summary(self) -> Dict[str, Any]:

Check failure on line 122 in tests/ducktape/consumer_benchmark_metrics.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/consumer_benchmark_metrics.py#L122

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.
"""Get consumer-specific metrics summary"""
if not self.start_time or not self.end_time:
return {}
Expand All @@ -132,10 +132,10 @@

# Operation metrics (generic for poll/consume)
operation_error_rate = (self.operation_errors / self.operation_attempts
if self.operation_attempts > 0 else 0)
operation_success_rate = ((self.operation_attempts - self.operation_timeouts -
self.operation_errors) / self.operation_attempts
if self.operation_attempts > 0 else 0)
operation_success_rate = ((self.operation_attempts - self.operation_timeouts -
self.operation_errors) / self.operation_attempts
if self.operation_attempts > 0 else 0)

# Operation latency analysis
if self.operation_latencies:
Expand Down Expand Up @@ -282,12 +282,12 @@
messages_per_consume = metrics.get('messages_per_consume', 0)
if messages_per_consume < bounds.min_messages_per_consume:
violations.append(f"Messages per consume {messages_per_consume:.2f} "
f"below minimum {bounds.min_messages_per_consume}")
f"below minimum {bounds.min_messages_per_consume}")

empty_consume_rate = metrics.get('empty_consume_rate', 0)
if empty_consume_rate > bounds.max_empty_consume_rate:
violations.append(f"Empty consume rate {empty_consume_rate:.3f} "
f"exceeds maximum {bounds.max_empty_consume_rate}")
f"exceeds maximum {bounds.max_empty_consume_rate}")

# For poll operations, we skip batch efficiency validation since they're single-message operations
is_valid = len(violations) == 0
Expand Down
2 changes: 1 addition & 1 deletion tests/ducktape/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from tests.ducktape.services.kafka import KafkaClient
from tests.ducktape.consumer_benchmark_metrics import (ConsumerMetricsCollector, ConsumerMetricsBounds,
validate_consumer_metrics, print_consumer_metrics_report)
validate_consumer_metrics, print_consumer_metrics_report)
from tests.ducktape.consumer_strategy import SyncConsumerStrategy, AsyncConsumerStrategy
from confluent_kafka import Producer

Expand All @@ -28,7 +28,7 @@
self.logger.info("Verifying connection to external Kafka at localhost:9092")

if not self.kafka.verify_connection():
raise Exception("Cannot connect to Kafka at localhost:9092. "

Check warning on line 31 in tests/ducktape/test_consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/test_consumer.py#L31

Replace this generic exception class with a more specific one.
"Please ensure Kafka is running.")

self.logger.info("Successfully connected to Kafka")
Expand Down Expand Up @@ -80,7 +80,7 @@

topic_name = f"test-{consumer_type}-consumer-topic"
test_duration = 5.0 # 5 seconds
# TODO: clean up this magic number

Check notice on line 83 in tests/ducktape/test_consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/test_consumer.py#L83

Complete the task associated to this "TODO" comment.
num_messages = 1500000 # 1.5M messages for sustained 5-second consumption at ~300K msg/s

# Create topic
Expand Down
Loading