Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka output] Add config validation for topic and topics fields #38058

Merged
merged 5 commits into from Mar 18, 2024

Conversation

belimawr
Copy link
Contributor

@belimawr belimawr commented Feb 19, 2024

Proposed commit message

This commit adds configuration validation for topic and topics fields from the Kafka output. Validation is performed for both cases: standalone Beat and running under Elastic-Agent. The latter does not support dynamic topics, hence setting topics is rejected by the validation.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

## Author's Checklist

How to test this PR locally

For both test cases first start a Kafka cluster (instructions below)

Standalone Beats (Filebeat):

Start Filebeat using the following configuration file

filebeat.yml

filebeat.inputs:
    - id: filestream-input-id
      type: filestream
      paths:
        - /tmp/flog.log
output:
    kafka:
        hosts:
            - <YOUR LOCAL IP>:9091

queue.mem:
  flush.timeout: 1s

logging:
  level: debug
  selectors:
    - kafka

Validate that

Exiting: error initializing publisher: either 'topic' or 'topics' must be defined accessing 'output.kafka' (source:'filebeat.yml')

is logged

Under Elastic-Agent

  1. Build Filebeat
  2. Download and extract the Elastic-Agent
  3. Replace its filebeat by the one you built
  4. Generate some logs in the file /tmp/flog.log
  5. Start a standalone Elastic-Agent using the configuration below
elastic-agent.yml

outputs:
  default:
    type: kafka
    hosts:
        - <YOUR IP>:9091

inputs:
  - type: filestream
    id: your-input-id
    logging.level: debug
    streams:
      - id: your-filestream-stream-id
        data_stream:
          dataset: generic
        paths:
          - /tmp/flog.log

Validate you see an error log like:

{"log.level":"error","@timestamp":"2024-02-19T15:28:52.127Z","message":"could not start output","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"filestream-default","type":"filestream"},"log":{"source":"filestream-default"},"log.logger":"centralmgmt.V2-manager","log.origin":{"file.line":631,"file.name":"management/managerV2.go","function":"github.com/elastic/beats/v7/x-pack/libbeat/management.(*BeatV2Manager).reload"},"service.name":"filebeat","error":{"message":"failed to reload output: either 'topic' or 'topics' must be defined accessing 'kafka'"},"ecs.version":"1.6.0","ecs.version":"1.6.0"}

Running a Kafka cluster

  1. Copy the docker-compose.yml provided below
  2. Replace <YOUR LOCAL IP> by your IP address (e.g: 10.0.0.42). Do not use a loopback address
  3. Start the containers docker-compose up
  4. Open Kafdrop (http://localhost:9000/) on your browser. You can use it to manage topics and look at the messages.
docker-compose.yml

version: '3'
services:
  zookeeper:
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalog
  kafka1:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://<YOUR LOCAL IP>:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka2:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://<YOUR LOCAL IP>:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: 2
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper 
  kafka3:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://<YOUR LOCAL IP>:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka1:19091,kafka2:19092,kafka3:19093"
    depends_on:
      - kafka1
      - kafka2
      - kafka3

Tutorial on running a Kafka cluster with Docker: https://betterprogramming.pub/a-simple-apache-kafka-cluster-with-docker-kafdrop-and-python-cf45ab99e2b9

Related issues

## Use cases
## Screenshots
## Logs

@belimawr belimawr requested a review from a team as a code owner February 19, 2024 15:18
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Feb 19, 2024
Copy link
Contributor

mergify bot commented Feb 19, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @belimawr? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@pierrehilbert pierrehilbert added the Team:Elastic-Agent Label for the Agent team label Feb 19, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Feb 19, 2024
@elasticmachine
Copy link
Collaborator

elasticmachine commented Feb 19, 2024

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Duration: 133 min 35 sec

❕ Flaky test report

No test was executed to be analysed.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@belimawr belimawr requested a review from faec February 21, 2024 14:26
@cmacknz cmacknz added the backport-v8.13.0 Automated backport with mergify label Feb 21, 2024
@@ -169,6 +175,20 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

if c.Topic == "" && len(c.Topics) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to attempt to evaluate c.Topic as a format string and log a warning if we think it is one?

That would help people figure out why their topic routing is failing. I don't think the %{ syntax is a valid topic name so the only way we'd get that is if someone is trying to use the Beat topic syntax with agent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bits of code that used to convert the %{foo} notation is deep within some helper from the outputs (outil.BuildSelectorFromConfig) and it will return some errors that might not be what we want.

I could either search for %{ in the topic name and return an error or use a regexp validation with the characters Lee pointed. I prefer the latter and returning an error message stating the valid characters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed just use the character set Lee linked and link back to that file in our code.

Copy link
Contributor

mergify bot commented Feb 22, 2024

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b validation-for-kafka-config upstream/validation-for-kafka-config
git merge upstream/main
git push upstream validation-for-kafka-config

@belimawr belimawr force-pushed the validation-for-kafka-config branch from 05f96f7 to 50738a1 Compare March 7, 2024 12:19
Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

mergify bot commented Mar 14, 2024

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b validation-for-kafka-config upstream/validation-for-kafka-config
git merge upstream/main
git push upstream validation-for-kafka-config

This commit adds configuration validation for `topic` and `topics`
fields from the Kafka output. Validation is performed for both cases:
standalone Beat and running under Elastic-Agent. The latter does not
support dynamic topics, hence setting `topics` is rejected by the
validation.
This commit adds validation for the topic when running under
Elastic-Agent. The topic is validates using a regexpt taken from the
Kafka source code:
https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33
This commit adds a test case making explicity that topics containing
the syntax `%{}` is not supported under Elastic-Agent
@belimawr belimawr force-pushed the validation-for-kafka-config branch from c24b4c3 to e6280db Compare March 14, 2024 19:36
@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

cc @belimawr

@elasticmachine
Copy link
Collaborator

elasticmachine commented Mar 14, 2024

💔 Build Failed

Failed CI Steps

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

elasticmachine commented Mar 14, 2024

💔 Build Failed

Failed CI Steps

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

elasticmachine commented Mar 14, 2024

💔 Build Failed

Failed CI Steps

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

elasticmachine commented Mar 14, 2024

💔 Build Failed

Failed CI Steps

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @belimawr

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

cc @belimawr

@belimawr belimawr requested a review from cmacknz March 18, 2024 07:34
@belimawr belimawr merged commit 8289144 into elastic:main Mar 18, 2024
106 of 109 checks passed
mergify bot pushed a commit that referenced this pull request Mar 18, 2024
)

This commit adds configuration validation for `topic` and `topics`
fields from the Kafka output. Validation is performed for both cases:
standalone Beat and running under Elastic-Agent. The latter does not
support dynamic topics, hence setting `topics` is rejected by the
validation.

When running under Elastic-Agent the topic is validated using a regexpt
taken from the Kafka source code:
https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33

(cherry picked from commit 8289144)
pierrehilbert pushed a commit that referenced this pull request Mar 19, 2024
) (#38381)

This commit adds configuration validation for `topic` and `topics`
fields from the Kafka output. Validation is performed for both cases:
standalone Beat and running under Elastic-Agent. The latter does not
support dynamic topics, hence setting `topics` is rejected by the
validation.

When running under Elastic-Agent the topic is validated using a regexpt
taken from the Kafka source code:
https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33

(cherry picked from commit 8289144)

Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-v8.13.0 Automated backport with mergify Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

No data for Kafka output under topic for System integration.
6 participants