Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PYTHON] Add support for MSK IAM authentication with a new transport #2478

Merged

Conversation

mattiabertorello
Copy link
Contributor

@mattiabertorello mattiabertorello commented Feb 28, 2024

Problem

The AWS MSK has an IAM authentication type that uses the OAuth Kafka authentication type but needs a specific code to generate the token.
That makes creating a custom transport necessary, at least in Python.

Solution

Use the library aws-msk-iam-sasl-signer-python to generate the OAuth token and create a new transport called msk-aim this will avoid the users of OpenLineage to create a custom one.

Additional configurations are:

  • region: The region of the MSK cluster [Mandatory]
    The next ones exclude each other and cannot be used at the same time.
  • aws_profile: the profile to select if there is more than the default one. [Optional]
  • role_arn: Generate the IAM credentials by assuming the provided role arn. This one can be useful when you need to assume another role in a different account to connect to MSK [Optional]

The credentials will be loaded from the environment with the boto3 library.

Example:

transport:
  type: msk-iam
  config:
    bootstrap.servers: mybroker
    acks: all
    retries: 3
  topic: my_topic
  flush: true
  region: us-west-1

Or for cross account

transport:
  type: msk-iam
  config:
    bootstrap.servers: mybroker
    acks: all
    retries: 3
  topic: my_topic
  flush: true
  region: us-west-1
  role_arn: arn:aws:iam::12345:role/my_role_different_account

One-line summary:

Makes easier to publish events to MSK with IAM authentication.

Manual integration test

import datetime
import uuid

from openlineage.client import OpenLineageClient
from openlineage.client.run import Job, Run, RunEvent, RunState
from openlineage.client.transport import MSKIAMTransport
from openlineage.client.transport.msk_iam import MSKIAMConfig


if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.DEBUG)
    config = MSKIAMConfig(
        config={
            "bootstrap.servers": "b-2.xxx.c2.kafka.eu-west-2.amazonaws.com:9098,b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098"
        },
        topic="my_test_topic",
        region="eu-west-2",
        flush=True,
    )
    transport = MSKIAMTransport(config)
    client = OpenLineageClient(transport=transport)
    event = RunEvent(
        eventType=RunState.START,
        eventTime=datetime.datetime.now().isoformat(),
        run=Run(runId=str(uuid.uuid4())),
        job=Job(namespace="kafka", name="test"),
        producer="prod",
        schemaURL="schema/RunEvent",
    )

    client.emit(event)
    client.transport.producer.flush(timeout=1)
    print("Messages sent")

Logs

2024-02-29T12:14:47.560294645Z DEBUG:openlineage.client.transport.kafka:TOPIC [rdkafka#producer-1] [thrd:app]: New local topic: my_test_topic
2024-02-29T12:14:47.560297342Z DEBUG:openlineage.client.transport.kafka:TOPPARNEW [rdkafka#producer-1] [thrd:app]: NEW my_test_topic [-1] 0x5598e047bbf0 refcnt 0x5598e047bc80 (at rd_kafka_topic_new0:472)
2024-02-29T12:14:47.560300475Z DEBUG:openlineage.client.transport.kafka:BRKMAIN [rdkafka#producer-1] [thrd:app]: Waking up waiting broker threads after setting OAUTHBEARER token
2024-02-29T12:14:47.560303259Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap: Wake-up: OAUTHBEARER token update
2024-02-29T12:14:47.560306334Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: Wake-up sent to 1 broker thread in state >= TRY_CONNECT: OAUTHBEARER token update
2024-02-29T12:14:47.560309239Z DEBUG:openlineage.client.transport.kafka:CONNECT [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap: broker in state TRY_CONNECT connecting
2024-02-29T12:14:47.560312101Z DEBUG:openlineage.client.transport.kafka:STATE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
...
DEBUG:openlineage.client.transport.kafka:PRODUCE [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1: my_test_topic [0]: Produce MessageSet with 1 message(s) (349 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
2024-02-29T12:14:48.326364842Z DEBUG:openlineage.client.transport.kafka:SEND [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1: Sent ProduceRequest (v7, 454 bytes @ 0, CorrId 5)
2024-02-29T12:14:48.382471756Z DEBUG:openlineage.client.transport.kafka:RECV [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1: Received ProduceResponse (v7, 102 bytes, CorrId 5, rtt 55.99ms)
2024-02-29T12:14:48.382517219Z DEBUG:openlineage.client.transport.kafka:MSGSET [rdkafka#producer-1] [thrd:sasl_ssl://b-1.xxx.c2.kafka.eu-west-2]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1: my_test_topic [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
2024-02-29T12:14:48.382623532Z DEBUG:openlineage.client.transport.kafka:Send message <cimpl.Message object at 0x7fb116fcde40>
2024-02-29T12:14:48.382648622Z DEBUG:openlineage.client.transport.kafka:Amount of messages left in Kafka buffers after flush 0
2024-02-29T12:14:48.382730647Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: sasl_ssl://b-1.xxx.c2.kafka.eu-west-2.amazonaws.com:9098/1: Wake-up: flushing
2024-02-29T12:14:48.382747018Z DEBUG:openlineage.client.transport.kafka:WAKEUP [rdkafka#producer-1] [thrd:app]: Wake-up sent to 1 broker thread in state >= UP: flushing
2024-02-29T12:14:48.382752798Z Messages sent

Checklist

  • You've signed-off your work
  • Your pull request title follows our guidelines
  • Your changes are accompanied by tests (if relevant)
  • Your change contains a small diff and is self-contained
  • You've updated any relevant documentation (if relevant)
  • Your comment includes a one-liner for the changelog about the specific purpose of the change (if necessary)
  • You've versioned the core OpenLineage model or facets according to SchemaVer (if relevant)
  • You've added a header to source files (if relevant)

SPDX-License-Identifier: Apache-2.0
Copyright 2018-2023 contributors to the OpenLineage project

@boring-cyborg boring-cyborg bot added the area:client/python openlineage-python label Feb 28, 2024
@mattiabertorello mattiabertorello force-pushed the python/add_msk_aim_transporter branch 2 times, most recently from 9489cfd to 4ff38d0 Compare February 29, 2024 12:21
client/python/openlineage/client/transport/msk_iam.py Outdated Show resolved Hide resolved


def _oauth_cb(config: MSKIAMConfig, *_: Any) -> tuple[str, float]:
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider # type: ignore[import]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local import 👍

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 84.54%. Comparing base (840dd35) to head (4ff38d0).

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2478   +/-   ##
=======================================
  Coverage   84.54%   84.54%           
=======================================
  Files          59       59           
  Lines        3351     3351           
=======================================
  Hits         2833     2833           
  Misses        518      518           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mattiabertorello mattiabertorello marked this pull request as ready for review March 1, 2024 14:09
…t additional custom ones

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
@mobuchowski mobuchowski merged commit 16f8d8f into OpenLineage:main Mar 6, 2024
31 checks passed
@mattiabertorello mattiabertorello deleted the python/add_msk_aim_transporter branch March 11, 2024 12:33
Ruihua98 pushed a commit to Ruihua98/OpenLineage that referenced this pull request Mar 15, 2024
…penLineage#2478)

* Add the MSK IAM transport to support AWS MSK cluster instances without additional custom ones

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Remove support to get the default from instance metadata

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Remove test for instance metadata

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Use only debug level logs for the transport

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Remove redundant checks

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

---------

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
Signed-off-by: Ruihua Wang <ruihuawang@microsoft.com>
blacklight pushed a commit to blacklight/OpenLineage that referenced this pull request Apr 4, 2024
…penLineage#2478)

* Add the MSK IAM transport to support AWS MSK cluster instances without additional custom ones

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Remove support to get the default from instance metadata

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Remove test for instance metadata

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Use only debug level logs for the transport

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

* Remove redundant checks

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>

---------

Signed-off-by: Mattia Bertorello <mattia.bertorello@booking.com>
Signed-off-by: Fabio Manganiello <fabio@manganiello.tech>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:client/python openlineage-python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants