Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Provider for OpenTelemetry #37628

Open
2 tasks done
howardyoo opened this issue Feb 22, 2024 · 10 comments
Open
2 tasks done

Airflow Provider for OpenTelemetry #37628

howardyoo opened this issue Feb 22, 2024 · 10 comments
Labels

Comments

@howardyoo
Copy link
Contributor

howardyoo commented Feb 22, 2024

Description

Opentelemetry (https://opentelemetry.io/) is quickly becoming the standard of sending telemetry data such as metrics, logs, and traces) to any targets supporting its protocol. Airflow as a popular choice of data pipelines need to send opentelemetry telemetry to any targets during its operation to make the pipeline be observed by many monitoring/observability tools currently available. This feature for OpenTelemetry will contain plugins, listeners, and hooks to make it easy for users to send out their own telemetry data via OpenTelemetry standards.

Use case/motivation

By providing 'opentelemetry provider' for airflow, user can easily use the provider that contains :

  1. listeners - to listen for task runs and dag runs, and emit them out as opentelemetry traces to configured OTEL endpoint.
  2. provider opentelemetry hooks for users sending out traces, and logs using opentelemetry within their DAG source codes

Motivation for this feature.

AIP-49 is finally released and became part of native Airflow support. With the new OTEL tracing capability in place, when enabled, users should be able to monitor:

  • The performance and detailed trace of events happening around DAG run. Users would be able to note when each of the tasks inside the DAG run happened, and what was their outcome. Additionally, these information is now capable of being sent into OTLP compatible endpoint for collecting, storing, and analyzing in the compatible opentelemetry backend of choice.
  • Not only that, there are other traces emitted from Airflow processes such as scheduler, triggerer, and executor that will emit trace on when and how long each heartbeat had taken place, how long did the dag processing took place, whether there were any errors, and the interaction between each task instances and executors linked together, such that users would be able to identify how the DAG runs were queued, and executed in much more detail.

But what about custom spans and attributes?

DAG run now can be expressed in traces which would look like the following:
image
These relationship between when a certain task ran at which time and order, and should be good enough for most of the observing how well the DAG run went.

However, this does not capture per task specific details that can describe additional information such as

  • What were the business context and details specific to certain runs?
  • Any additional information that could not be collected as part of the basic Run itself, such as
    • data quality related information that user may want to instrument
    • any execution or runtime information specific to certain operators or external systems
    • any additional logs in which can be part of span events
from otel_hook import OtelHook

...

    def setup(**dag_context):
        with otel_hook.start_as_current_span(name="do_setup", dag_context=dag_context) as s:
            s.set_attribute("data quality", "fair")
            s.set_attribute("description", "You can add attributes in otel hook to have business or data specific details on top of existing task instnace span.")
            with otel_hook.start_as_current_span(name="do_sleep") as ss:
                ss.set_attribute("sleep for", "one second")
                time.sleep(1)

    # simple setup operator in python
    t0 = PythonOperator(
        task_id="setup",
        python_callable=setup
    )

Providing such means for the users to instrument any DAG run specific spans or contexts and make them as part of the task instance would tremendously help to gain deeper insights. Imagine if the user can easily add their own Spans inside their DAG file to record certain activities happening:
image

image

OTEL provider will have OTEL hook that can help you to do this. Using the hook, you may be able to create active span and attach it to the current running DAG's task instance, conveniently.

Supporting lower version of Airflow to emit DAG run trace

Another feature for the OpenTelemetry provider is providing ways to capture DAG runs in case the Airflow version is lower (v2.10.0) than the Airflow that has OTEL trace enabled. In that case, with the limited capability, OTEL provider can generate traces coming from the task instances, that can complement Airflows that either have lower version or have the OTEL capability disabled. In that case, provider can automatically detect and start generating its own traces regardless.

image

This can also be combined with user's custom instrumentation so that the instrumentation can still be used, whether the Airflow is enabled with OTEL, or airflow is using OTEL provider.

  • OTEL provider will implement an event listener.
  • this event listener will only work when it is enabled via configuration
  • If the even listener detects the airflow version is >= 2.10.0, then it will try to check whether OTEL instrumentation is enabled. If so, it will use its configuration (endpoint) to emit traces. If not, it will default to the configuration provided by its connection info.

However, event listener can only detect and collect whatever information is available via DagRun and TaskInstance, and therefore will not be able to collect all the traces (e.g. scheduler job) that are available from OTEL traces for Airflow.

This feature could also be useful if user is unable to upgrade into Airflow v2.10, and still want to leverage the OTEL tracing.

Related issues

None

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@howardyoo howardyoo added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Feb 22, 2024
@ferruzzi
Copy link
Contributor

I'm not sure I see the benefit here? We have already discussed and plan to implement traces and logging Some Day ™️ what is gained by adding a provider? A user can already currently add custom metrics to their operators and hooks by using

from airflow.stats import Stats
Stats.incr("some_new_metric_name", tags=stats_tags)

@howardyoo
Copy link
Contributor Author

I'm not sure I see the benefit here? We have already discussed and plan to implement traces and logging Some Day ™️ what is gained by adding a provider? A user can already currently add custom metrics to their operators and hooks by using

from airflow.stats import Stats
Stats.incr("some_new_metric_name", tags=stats_tags)

@ferruzzi , there are couple of things.

  • Sometimes, user want to use hooks to send OTEL metrics on multiple separate targets, using airflow connections (not OTEL configuration centrally set up by the admins).
  • providing listeners for catching up DAG and tasks runs could be useful if user does not need or want to produce detailed traces around how Airflow is operating (schedulers, webservers, workers, executors, etc), but rather want to capture DAG runs separately from the main OTEL instrumentation of Airflow.
  • otel listeners can also be helpful in case where Airflow is not or cannot be configured with OTEL, but users want to separatley instrument with OTEL.

I did happen to think about it for some time, and looks like some vendors like datadog does support dogstats as well as datadog hooks to send arbitrary telemetry data as needed. This seemed to be a reasonable choice.

@rutvikjshah
Copy link

rutvikjshah commented Mar 14, 2024

@ferruzzi , @howardyoo

I am trying to do a POC, basically I want to provide the status of my Airflow DAG runs (even task runs) and send this metrics to opentelemetry and show these logs as graphs in grafana dashboard. I followed the medium blog you have provided Better Apache Airflow Observability using OpenTelemetry. But I am kind of failed in achieving the results.

facilitate Denial of Service attacks   {"kind": "receiver", "name": "otlp", "pipeline": "metrics", "documentation": "https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/security-best-practices.md#safeguards-against-denial-of-service-attacks"}
breeze-otel-collector  | 2024-03-14T12:14:59.105Z       info    otlpreceiver@v0.70.0/otlp.go:112        Starting HTTP server    {"kind": "receiver", "name": "otlp", "pipeline": "metrics", "endpoint": "0.0.0.0:4318"}
breeze-otel-collector  | 2024-03-14T12:14:59.105Z       info    service/pipelines.go:118        Receiver started.       {"kind": "receiver", "name": "otlp", "pipeline": "metrics"}
breeze-otel-collector  | 2024-03-14T12:14:59.105Z       info    service/pipelines.go:114        Receiver is starting... {"kind": "receiver", "name": "otlp", "pipeline": "traces"}
breeze-otel-collector  | 2024-03-14T12:14:59.105Z       info    service/pipelines.go:118        Receiver started.       {"kind": "receiver", "name": "otlp", "pipeline": "traces"}
breeze-otel-collector  | 2024-03-14T12:14:59.105Z       info    service/service.go:145  Everything is ready. Begin running and processing data.

When I run otel-collector service as per the yaml that you attached in the medium blog. It is generating these logs. This means otel is up and running. But I am unable to send the metrics to otel. How can I do it. Informations are not sufficient in your blog. Can you please provide quick solution

@ferruzzi
Copy link
Contributor

ferruzzi commented Mar 14, 2024

Hey @rutvikjshah, thanks for your interest in checking it out and thanks for having a look at the blog post. This is definitely not the right place for me to help you out on that though. Join us over on the community slack and start up a thread (maybe in #user-troubleshooting) and we can have a chat and get you sorted out.

@josix
Copy link
Contributor

josix commented Aug 7, 2024

Hi @howardyoo and @ferruzzi,
I'm wondering if the OpenTelemetry provider would still fall under the scope of AIP-49. I noticed there's a discussion about the provider proposal in the dev-list. Could we move this topic into a Discussion? I'm currently helping to triage the open issues.
Thanks!

@howardyoo
Copy link
Contributor Author

howardyoo commented Aug 7, 2024 via email

@josix
Copy link
Contributor

josix commented Aug 7, 2024

Thanks @howardyoo for the quick response. Would you mind helping remove the label for AIP-49? I probably don't have permission to do this myself. I think we could leave it open, and I'll keep the status in sync with the email thread once a conclusion has been reached.

@howardyoo
Copy link
Contributor Author

howardyoo commented Aug 7, 2024 via email

@howardyoo
Copy link
Contributor Author

Hi folks! As part of the items raised during the discussion in dev mailing list, I updated the document for this issue to further clarify what is the requirements and reasons behind OpenTelemetry provider for Airflow.

@ferruzzi ferruzzi removed the AIP-49 label Aug 16, 2024
@swythan
Copy link
Contributor

swythan commented Sep 10, 2024

I'd be really interested in having access to this so I thought I would post my use case.

What I want to do is:

  • Add more detailed manual tracing to some of my tasks (as per @howardyoo's example in the main description)
  • Use the shared connection when setting up library instrumentations.
  • See how my Airflow DAG is interacting with other systems for which I have tracing

Stupid example:

from opentelemetry.instrumentation.requests import RequestsInstrumentor

# get the Open Telemetry hook
otel_hook = OtelHook()

# Setup `requests` instrumentation using the shared OTLP connection
RequestsInstrumentor().instrument(tracer_provider=otel_hook.tracer_provider)

@otel_hook.span
def initialise_impl():
    with otel_hook.start_as_current_span(name="Call FooService")
        r = requests.get('https://fooservice.mycompany/info')
    with otel_hook.start_as_current_span(name="Call BarService")
        r = requests.get('https://barservice.mycompany/more_info')

# Snip

with DAG(
    'test_otel_tracing',
) as dag:
    initialise_task= PythonOperator(
        task_id='initialise_task',
        python_callable=initialise_impl,
        provide_context=True,
    )

With the proposed provider my trace would be:

Airflow: test_otel_tracing
|- Airflow: initialise_task
   |- Airflow: initialise_impl
      |- Airflow: Call FooService
      |  |- Airflow: GET 'https://fooservice.mycompany/info'
      |     |- FooService: GET '/info`
      |        |- <More traces from FooService and its dependencies here>
      |- Airflow: Call BarService
         |- Airflow: GET 'https://barservice.mycompany/more_info'
            |- BarService: GET '/more_info`
               |- <More traces from BarService and its dependencies here>

This really helps diagnostics if the problem you're looking into is in e.g. FooService (or one of its dependencies).

I've verified this works with the code Howard has posted here: https://github.com/howardyoo/airflow_otel_provider

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants