Skip to content

Conversation

Gezi-lzq
Copy link
Contributor

@Gezi-lzq Gezi-lzq commented Sep 10, 2025

Background

  • Goal: Validate the full closed loop from Table Topic production (Avro) → Kafka write → Iceberg commit and query, ensuring record count consistency and providing diagnostic information on failure.
  • Scenario: Use REST Iceberg Catalog and Schema Registry, start dependencies in a production-like deployment manner for end-to-end verification.

Components

  • Kafka Cluster: KafkaService (KRaft mode, 3 nodes).
  • External Dependencies: DockerComposeService to start Schema Registry (10.5.1.3:8081) and Iceberg Catalog (10.5.1.4:8181).
  • Load Test Producer: AutoMQPerformanceService wraps bin/automq-perf-test.sh (org.apache.kafka.tools.automq.PerfCommand).

Core Flow

  1. Startup Order: Iceberg Catalog → Kafka → Schema Registry (injecting KAFKA_BOOTSTRAP_SERVERS).
  2. Health Checks: _wait_for_service polls Catalog /v1/config and Registry root path; proceeds after READY status.
  3. Production: Use AutoMQPerformanceService to send Avro data; configure topic/producer parameters required by TableTopic.
  4. Produced Records Counting: Use Kafka side "end offsets" (_sum_kafka_end_offsets_stable) as produced count; produceCountTotal from perf is only for log comparison.
  5. Iceberg Verification: Poll REST metadata, parse total-records, compare with expected record count; on failure, print detailed context info.

New test cases added (asserting consistency: Kafka offsets == Iceberg total-records)

  • Minimal test: automq_tabletopic_e2e_test.py::test_tabletopic_avro_e2e_flow
  • Parameter matrix: commit.interval.ms ∈ {500, 2000, 10000}; topics ∈ {1, 10, 100}
  • Periodic restart: automq_tabletopic_broker_restart_test.py::test_broker_periodic_restart_10m
  • Schema evolution: automq_tabletopic_schema_evolution_test.py::test_schema_evolution_4_phases
  • Partition migration: automq_tabletopic_partition_reassignment_test.py::test_partition_reassignment_during_produce

@Gezi-lzq Gezi-lzq force-pushed the feat/table-topic-e2e branch from d445cf8 to 25d1466 Compare September 11, 2025 06:36
@Gezi-lzq Gezi-lzq marked this pull request as ready for review September 12, 2025 02:32
@Gezi-lzq Gezi-lzq requested a review from Copilot September 12, 2025 06:21
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds comprehensive end-to-end testing capability for the TableTopic feature in AutoMQ. It introduces random Avro value generation, establishes a base test framework with external service management, and implements multiple test scenarios to validate TableTopic functionality under various conditions.

  • Adds random Avro value generation to PerfCommand for schema-based testing without requiring predefined value files
  • Creates a comprehensive TableTopic test framework with base services and helper methods
  • Implements multiple test scenarios including schema evolution, partition reassignment, broker restarts, and matrix testing

Reviewed Changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java Adds random Avro value generation functionality with comprehensive schema type support
tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java Removes --values-random option, defaulting to random generation when no values file is provided
tests/kafkatest/tests/core/automq_tabletopic_base.py Base test framework providing service lifecycle management and common test utilities
tests/kafkatest/tests/core/automq_tabletopic_e2e_test.py Basic end-to-end test validating core TableTopic functionality
tests/kafkatest/tests/core/automq_tabletopic_schema_evolution_test.py Tests schema evolution across multiple compatible Avro schema versions
tests/kafkatest/tests/core/automq_tabletopic_partition_reassignment_test.py Tests partition reassignment during active data production
tests/kafkatest/tests/core/automq_tabletopic_broker_restart_test.py Tests broker restart scenarios with continuous data flow
tests/kafkatest/tests/core/automq_tabletopic_matrix_test.py Matrix testing for different configurations and topic counts
tests/kafkatest/services/performance/automq_performance.py Performance service wrapper for AutoMQ PerfCommand with Avro support
tests/kafkatest/services/external_services.py Docker Compose service management for external dependencies
tests/docker/ Docker configuration files for Schema Registry and Iceberg Catalog services
tests/docker/ducker-ak Modified to enable Docker-in-Docker for ducker01 node
tests/docker/Dockerfile Adds Docker installation to test container

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

env_prefix += f"{key}='{value}' "

# Use sudo -E to preserve environment variables for the docker compose command.
cmd = f"{env_prefix} sudo -E docker compose -f {self.compose_file_path} {command}"
Copy link
Preview

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using shell=True with user-controlled input in subprocess.check_call is a security risk. Consider using subprocess.run with a list of arguments instead of shell=True.

Copilot uses AI. Check for mistakes.

@Gezi-lzq Gezi-lzq merged commit d269d1a into main Sep 15, 2025
6 checks passed
@Gezi-lzq Gezi-lzq deleted the feat/table-topic-e2e branch September 15, 2025 03:15
Gezi-lzq added a commit that referenced this pull request Sep 15, 2025
* feat(perf): implement random Avro value generation for performance testing

* feat(e2e): implement end-to-end tests for Table Topic feature with Avro messages

* feat(tests): add end-to-end tests for Table Topic feature with schema evolution and broker restart scenarios

* fix(docker): streamline Docker installation in Dockerfile and update run command in ducker-ak

* fix(config): update S3 bucket name for Iceberg catalog in configuration
Gezi-lzq added a commit that referenced this pull request Sep 15, 2025
* feat(perf): implement random Avro value generation for performance testing

* feat(e2e): implement end-to-end tests for Table Topic feature with Avro messages

* feat(tests): add end-to-end tests for Table Topic feature with schema evolution and broker restart scenarios

* fix(docker): streamline Docker installation in Dockerfile and update run command in ducker-ak

* fix(config): update S3 bucket name for Iceberg catalog in configuration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants