Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6786] Added Kafka components, 3rd time's the charm #12388

Closed
wants to merge 14 commits into from

Conversation

dferguson992
Copy link

Dear Airflow Maintainers,

Please accept the following PR that

Add the KafkaProducerHook.
Add the KafkaConsumerHook.
Add the KafkaSensor which listens to messages with a specific topic.
Related Issue:
#1311

Issue link: AIRFLOW-6786

Make sure to mark the boxes below before creating PR: [x]

Description above provides context of the change
Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
Unit tests coverage for changes (not needed for documentation changes)
Commits follow "How to write a good git commit message"
Relevant documentation is updated including usage instructions.
I will engage committers as explained in Contribution Workflow Example.
For document-only changes commit message can start with [AIRFLOW-XXXX].
Reminder to contributors:

You must add an Apache License header to all new files
Please squash your commits when possible and follow the 7 rules of good Git commits
I am new to the community, I am not sure the files are at the right place or missing anything.

The sensor could be used as the first node of a dag where the second node can be a TriggerDagRunOperator. The messages are polled in a batch and the dag runs are dynamically generated.

Thanks!

Note, as per denied PR #1415, it is important to mention these integrations are not suitable for low-latency/high-throughput/streaming. For reference, #1415 (comment).

Co-authored-by: Dan Ferguson dferguson992@gmail.com
Co-authored-by: YuanfΞi Zhu

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

docs/installation.rst Outdated Show resolved Hide resolved
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@dferguson992
Copy link
Author

@mik-laj can i please get a review of this PR? It's passed all of the tests and just needs 1 approving review.

@mik-laj
Copy link
Member

mik-laj commented Nov 17, 2020

It seems to me that a few unit tests might have helped us prevent regression in the future. Can you add it?

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@rotten
Copy link

rotten commented Dec 18, 2020

Do you need help resolving the (documentation) merge conflicts? Is that the only thing blocking this getting merged?

@dferguson992
Copy link
Author

@rotten to my knowledge yes. Now its so outdated, and the structure of this project seems to change overnight. I missed the 2.0 release and frankly I'm just so over it. This community is great, but contributing to this project while you work and have other obligations, even if its literally three tiny objects, is really difficult for me.

@rotten
Copy link

rotten commented Dec 21, 2020

The documentation conflicts are easy to resolve and I'm happy to resolve them to see this move along. I can either push the changes to your branch in your repo directly if you want to invite me to that project, or I can fork your fork, and then pr back to you to then pr back to here.

Or, I can fork your repo, we can close this PR, and then PR a "try #4". Let me know how you'd like to proceed.

@ashb
Copy link
Member

ashb commented Dec 21, 2020

The structure looks good -- I'll try and rebase this for you.

@kubatyszko
Copy link
Contributor

@ashb do you need any help with this ? @rotten and I are happy to provide assistance.

@mik-laj
Copy link
Member

mik-laj commented Jan 6, 2021

I have looked at this library and it seems to me that it might complicate the tests a bit. Have you thought to use unittest.mock for unit tests and to set up a separate Docker container from Kafka for integration tests? This sounds a lot more stable and more maintainable to me than adding a library which may have further dependency issues.

Here is an example of adding integration tests that use a separate container.
#13195


DEFAULT_HOST = 'kafka1'
DEFAULT_PORT = 9092
templated_fields = ('topic', 'host', 'port', ß)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
templated_fields = ('topic', 'host', 'port', ß)
templated_fields = ('topic', 'host', 'port')

Comment on lines +1 to +44
# #
# # Licensed to the Apache Software Foundation (ASF) under one
# # or more contributor license agreements. See the NOTICE file
# # distributed with this work for additional information
# # regarding copyright ownership. The ASF licenses this file
# # to you under the Apache License, Version 2.0 (the
# # "License"); you may not use this file except in compliance
# # with the License. You may obtain a copy of the License at
# #
# # http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing,
# # software distributed under the License is distributed on an
# # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# # KIND, either express or implied. See the License for the
# # specific language governing permissions and limitations
# # under the License.
#
# from cached_property import cached_property
#
# from airflow.providers.apache.kafka.hooks.kafka_consumer import KafkaConsumerHook
# from airflow.sensors.base_sensor_operator import BaseSensorOperator
# from airflow.utils.decorators import apply_defaults
#
#
# class KafkaSensor(BaseSensorOperator):
# """Consumes the Kafka message with the specific topic"""
#
# DEFAULT_HOST = 'kafka1'
# DEFAULT_PORT = 9092
# templated_fields = ('topic', 'host', 'port', ß)
#
# @apply_defaults
# def __init__(self, topic, host=DEFAULT_HOST, port=DEFAULT_PORT, *args, **kwargs):
# """
# Initialize the sensor, the connection establish
# is put off to it's first time usage.
#
# :param topic:
# :param host:
# :param port:
# :param args:
# :param kwargs:
# """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why all the code is under comment ?

@luup2k
Copy link

luup2k commented Feb 20, 2021

Is there a good reason to use kafka lib, https://pypi.org/project/kafka/ ? this is a frozen release: dpkp/kafka-python#1726 . if we wish to go with a pure-python lib i guess that we'll need to use: https://pypi.org/project/kafka-python/ instead.

As a side topic: why we don't use confluent-kafka: https://github.com/confluentinc/confluent-kafka-python ??, this lib bind librdkafka and has a superior performance.


def get_messages(self, timeout_ms=5000) -> dict:
"""
Get all the messages haven't been consumed, it doesn't
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Get all the messages haven't been consumed,"

If we use poll() without max_records, the behavior is returns at most "max_poll_records" #records. "max_poll_records" is setted to 500 by default at Consumer Init config.

So, we're not going to consume "all" message except we put a very high number as max_poll_records(could be a memory bomb) or we have a low number of message in the topic.

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.poll

@github-actions
Copy link

github-actions bot commented Apr 8, 2021

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 8, 2021
@github-actions github-actions bot closed this Apr 13, 2021
@shkolnik
Copy link

@dferguson992 @luup2k @RosterIn @ashb @mik-laj this seems like a very useful capability and a lot of effort went into building it. What would it take to wrap it up and get it merged?

@ashb ashb reopened this May 21, 2021
@ashb
Copy link
Member

ashb commented May 21, 2021

Yup, I'll pick this up and finish it off/get it over the line.

Sorry I forgot about it (AGAIN!)

@ashb ashb self-assigned this May 21, 2021
@eladkal eladkal removed the stale Stale PRs per the .github/workflows/stale.yml policy file label May 21, 2021
@shkolnik
Copy link

shkolnik commented Jun 1, 2021

@ashb Thanks!

@flolas
Copy link
Contributor

flolas commented Jun 2, 2021

I'm interested in this PR! I can help if needed. @ashb @shkolnik

@ashb
Copy link
Member

ashb commented Jun 4, 2021

Yup, I'll pick this up and finish it off/get it over the line.

Sorry I forgot about it (AGAIN!)

I've taken a look at this PR, and it's further from being ready to merge than I thought.

  • there's no unit tests to speak of (or at all realy)
  • The Sensor takes a host and port, but not connection id parameters
  • The consumer Hook takes host and port but ignores these (it shouldnt' accept them)
  • Commented out code doesn't belong in git.

In short: there is more work here than I thought to get this in a state to be merged, so I don't have time right now. Sorry.

@ashb ashb removed their assignment Jun 10, 2021

if messages:
self.log.info('Got messages during poking: %s', str(messages))
return messages

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried out this patch, though as I understand it the poke is supposed to return a true/false - as the return value is discarded in base sensor - https://github.com/apache/airflow/blob/main/airflow/sensors/base.py#L234 - so you'd need to have something else also read from kafka in the dag ?

I suspect you might want to either turn this into a "peek" where you're checking for unread offsets from the consumer group (or messages like the above, only if there's no consumer group) and then have something else pull the actual messages. (perhaps this needs a wider example)

@DestroyerAlpha
Copy link

What is the update on this? Is it a work in progress? If so, is there an expected horizon? If not, what are the alternatives we could try for a task that needs to Check a Kafka topic for new events and trigger a task?

@rotten
Copy link

rotten commented Jul 2, 2021

My suggestion, as an interim solution, is to spin up a container running a faust app, and have it read the messages and then post to the airflow api to trigger a job.

@DestroyerAlpha
Copy link

My suggestion, as an interim solution, is to spin up a container running a faust app, and have it read the messages and then post to the airflow api to trigger a job.

Would using a 'confluent-kafka' consumer instead do the trick?

@CoinCoderBuffalo
Copy link

My suggestion, as an interim solution, is to spin up a container running a faust app, and have it read the messages and then post to the airflow api to trigger a job.

Would using a 'confluent-kafka' consumer instead do the trick?

I don't think so. You could use that to create a Kafka consumer, but it will not work like an Airflow sensor. A shame this sensor is not ready yet, as a workaround I'm looking at using either Faust (as suggested above) or Kafka Streaming.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 27, 2021
@github-actions github-actions bot closed this Sep 1, 2021
@serge-salamanka
Copy link

unbelievable (!) Ariflow does not support Kafka out of the box yet !?

@potiuk
Copy link
Member

potiuk commented Dec 2, 2021

@serge-salamanka-1pt - maybe you would like to contribute it ? Airflow is created by >1800 contributors and you can become one and add Kafka support! The OSS world works this way.

@potiuk
Copy link
Member

potiuk commented Dec 2, 2021

unbelivable (!) you have not done it yet @serge-salamanka-1pt !

@debashis-das
Copy link

@potiuk I have a same requirement. If I am able to implement it. Will raise a PR.

@potiuk
Copy link
Member

potiuk commented Dec 6, 2021

@potiuk I have a same requirement. If I am able to implement it. Will raise a PR.

Cool!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools kind:documentation stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet