Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6786] Added Kafka components #10660

Closed
wants to merge 124 commits into from
Closed

[AIRFLOW-6786] Added Kafka components #10660

wants to merge 124 commits into from

Conversation

dferguson992
Copy link

Dear Airflow Maintainers,

Please accept the following PR that

Add the KafkaProducerHook.
Add the KafkaConsumerHook.
Add the KafkaSensor which listens to messages with a specific topic.
Related Issue:
#1311

Issue link: AIRFLOW-6786

Make sure to mark the boxes below before creating PR: [x]

Description above provides context of the change
Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
Unit tests coverage for changes (not needed for documentation changes)
Commits follow "How to write a good git commit message"
Relevant documentation is updated including usage instructions.
I will engage committers as explained in Contribution Workflow Example.
For document-only changes commit message can start with [AIRFLOW-XXXX].
Reminder to contributors:

You must add an Apache License header to all new files
Please squash your commits when possible and follow the 7 rules of good Git commits
I am new to the community, I am not sure the files are at the right place or missing anything.

The sensor could be used as the first node of a dag where the second node can be a TriggerDagRunOperator. The messages are polled in a batch and the dag runs are dynamically generated.

Thanks!

Note, as per denied PR #1415, it is important to mention these integrations are not suitable for low-latency/high-throughput/streaming. For reference, #1415 (comment).

Co-authored-by: Dan Ferguson dferguson992@gmail.com
Co-authored-by: YuanfΞi Zhu

Copy link
Member

@ryw ryw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally in favor of Airflow having Kafka integration, so +1 on this conceptually

@ryw
Copy link
Member

ryw commented Sep 2, 2020

@mrrobby do you want to review / test this out?

potiuk and others added 22 commits September 11, 2020 08:29
Co-authored-by: Marco Aguiar <marco@DESKTOP-8IVSCHM.localdomain>
Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>
The requirements are not needed any more. We replaced them
with a new, better "constraints" mechanism where constraints
are stored in a separate, orphaned branches in the repository
and they are automatically maintained by the CI process.

See more about our dependency management process here:

https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pinned-constraint-files
Add DataprepGetJobGroupOperator and DataprepRunJobGroupOperator
for Dataprep service.

Co-authored-by: Tomek Urbaszek <tomasz.urbaszek@polidea.com>
Inspired by the Google Shell Guide where they mentioned
separating package names with :: I realized that this was
one of the missing pieces in the bash scripts of ours.

While we already had packages (in libraries folders)
it's been difficult to realise which function is where.

With introducing packages - equal to the library file name
we are *almost* at a level of a structured language - and
it's easier to find the functions if you are looking for them.

Way easier in fact.

Part of #10576
* Add `log_id` field to log lines on ES handler

* Add `offset` field to log lines on ES handler

it will be set to the epoch timestamp in nanoseconds (this will just be
used for ordering log lines when displayed in the webserver UI).

* Update UPDATING.md

With information regarding log_id and offset fields in JSON log lines written to stdout
* Remove airflow-pr tool

* Add PyGithub back in

* Remove gitpython
Addressing documentation errors

Addressing documentation errors

Adding kafka installation requirement to setup

Update docs/operators-and-hooks-ref.rst

Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>

Added dag start date back to example dag

???

????

Adds 'cncf.kubernetes' package back to backport provider packages. (#10659)

Display conf as a JSON in the DagRun list view (#10644)

Co-authored-by: Marco Aguiar <marco@DESKTOP-8IVSCHM.localdomain>

Unify error messages and complete type field in response (#10333)

Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>

Add howto doc for Salesforce connection (#10482)

Add example on airflow users create --help (#10662)

Added some documentation requirements

Addressing documentation errors

Addressing documentation errors

Adding kafka installation requirement to setup

Update docs/operators-and-hooks-ref.rst

Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>

Added dag start date back to example dag

???

????

Adds 'cncf.kubernetes' package back to backport provider packages. (#10659)

Display conf as a JSON in the DagRun list view (#10644)

Co-authored-by: Marco Aguiar <marco@DESKTOP-8IVSCHM.localdomain>

Unify error messages and complete type field in response (#10333)

Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>

Add howto doc for Salesforce connection (#10482)

Add example on airflow users create --help (#10662)

Added some documentation requirements
* Unify command names in CLI
turbaszek and others added 24 commits September 11, 2020 08:31
* Add connection caching to KubernetesHook

* Fix recursion and remove redundant docstrings
Co-authored-by: Kamil Breguła <mik-laj@users.noreply.github.com>
Someone was overzealous in building the initial ignore list -- "wil" is
not a word :)

And the spell checker does not catch repeated words.
`changes` -> `changed`
Ensures that airflow can run without downloading the
Kubernetes python client
`assert` call was missing so the statement didn't test or wouldn't fail if condition isn't true
* Rename "Beyond the Horizon" section and refactor content

* Rework copy
Once HA mode for scheduler lands, we can no longer reset orphaned
task by looking at the tasks in (the memory of) the current executor.

This changes it to keep track of which (Scheduler)Job queued/scheduled a
TaskInstance (the new "queued_by_job_id" column stored against
TaskInstance table), and then we can use the existing heartbeat
mechanism for jobs to notice when a TI should be reset.

As part of this the existing implementation of
`reset_state_for_orphaned_tasks` has been moved out of BaseJob in to
BackfillJob -- as only this and SchedulerJob had these methods, and the
SchedulerJob version now operates differently
The SmartSensor PR introduces slightly different behaviour on
list_py_files happens when given a file path directly.

Prior to that PR, if given a file path it would not include examples.

After that PR was merged, it would return that path and the example dags
(assuming they were enabled.)
Relative and absolute imports are functionally equivalent, the only
pratical difference is that relative is shorter.

But it is also less obvious what exactly is imported, and harder to find
such imports with simple tools (such as grep).

Thus we have decided that Airflow house style is to use absolute imports
only
Inspired by the Google Shell Guide where they mentioned
separating package names with :: I realized that this was
one of the missing pieces in the bash scripts of ours.

While we already had packages (in libraries folders)
it's been difficult to realise which function is where.

With introducing packages - equal to the library file name
we are *almost* at a level of a structured language - and
it's easier to find the functions if you are looking for them.

Way easier in fact.

Part of #10576
@@ -65,10 +65,16 @@
# Update options
# [START how_to_cloud_dataproc_updatemask_cluster_operator]
CLUSTER_UPDATE = {
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is adding kafka integration related to modifying google dag examples?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk how those even got in. Whenever i can work on this, i rebase from master and a whole bunch of changes get added.. i think I'm rebasing wrong but i don't understand how. git rebase -i <my_branch> master?

I never have enough time to really isolate the issue with this PR, especially since everytime i try toupdate my local branch i incur other commits that I didn't write. not sure what's wrong with this at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a really good guide at the contributing section:
https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id9
On slack there is a channel airflow-how-to-pr where you can ask for help if you get into trouble.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've read through this guide and i'm still not sure what's happening here. I've joined the slack channel and raised my concerns there as well

@mik-laj mik-laj removed their request for review October 12, 2020 11:19
@ryw
Copy link
Member

ryw commented Dec 2, 2020

@dferguson992 curious why you closed this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet