diff --git a/metadata-ingestion/source_docs/kafka-connect.md b/metadata-ingestion/source_docs/kafka-connect.md index db3458b000631..09dd23ddc8425 100644 --- a/metadata-ingestion/source_docs/kafka-connect.md +++ b/metadata-ingestion/source_docs/kafka-connect.md @@ -16,7 +16,9 @@ This plugin extracts the following: Current limitations: -- Currently works for JDBC and Debezium source connectors only. +- works only for + - JDBC and Debezium source connectors + - BigQuery sink connector ## Quickstart recipe @@ -31,6 +33,10 @@ source: # Coordinates connect_uri: "http://localhost:8083" cluster_name: "connect-cluster" + provided_configs: + - provider: env + path_key: MYSQL_CONNECTION_URL + value: jdbc:mysql://test_mysql:3306/librarydb # Credentials username: admin @@ -46,10 +52,11 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | Field | Required | Default | Description | | -------------------------- | -------- | -------------------------- | ------------------------------------------------------- | -| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. | +| `connect_uri` | ✅ | `"http://localhost:8083/"` | URI to connect to. | | `username` | | | Kafka Connect username. | | `password` | | | Kafka Connect password. | | `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. | +| `provided_configs` | | | Provided Configurations | | `construct_lineage_workunits` | | `True` | Whether to create the input and output Dataset entities | | `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. | | `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. | diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 35eadbaede305..74d027d6d7542 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -1,7 +1,7 @@ import logging import re from dataclasses import dataclass, field -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, List, Optional, Tuple import jpype import jpype.imports @@ -19,6 +19,12 @@ logger = logging.getLogger(__name__) +class ProvidedConfig(ConfigModel): + provider: str + path_key: str + value: str + + class KafkaConnectSourceConfig(ConfigModel): # See the Connect REST Interface for details # https://docs.confluent.io/platform/current/connect/references/restapi.html# @@ -29,6 +35,7 @@ class KafkaConnectSourceConfig(ConfigModel): env: str = builder.DEFAULT_ENV construct_lineage_workunits: bool = True connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all() + provided_configs: Optional[List[ProvidedConfig]] = None @dataclass @@ -533,6 +540,166 @@ def _extract_lineages(self): self.connector_manifest.lineages = lineages +@dataclass +class BigQuerySinkConnector: + connector_manifest: ConnectorManifest + report: KafkaConnectSourceReport + + def __init__( + self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport + ) -> None: + self.connector_manifest = connector_manifest + self.report = report + self._extract_lineages() + + @dataclass + class BQParser: + project: str + target_platform: str + sanitizeTopics: str + topicsToTables: Optional[str] = None + datasets: Optional[str] = None + defaultDataset: Optional[str] = None + version: str = "v1" + + def report_warning(self, key: str, reason: str) -> None: + logger.warning(f"{key}: {reason}") + self.report.report_warning(key, reason) + + def get_parser( + self, + connector_manifest: ConnectorManifest, + ) -> BQParser: + project = connector_manifest.config["project"] + sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false") + + if "defaultDataset" in connector_manifest.config: + defaultDataset = connector_manifest.config["defaultDataset"] + return self.BQParser( + project=project, + defaultDataset=defaultDataset, + target_platform="bigquery", + sanitizeTopics=sanitizeTopics.lower() == "true", + version="v2", + ) + else: + # version 1.6.x and similar configs supported + datasets = connector_manifest.config["datasets"] + topicsToTables = connector_manifest.config.get("topicsToTables") + + return self.BQParser( + project=project, + topicsToTables=topicsToTables, + datasets=datasets, + target_platform="bigquery", + sanitizeTopics=sanitizeTopics.lower() == "true", + ) + + def get_list(self, property: str) -> Iterable[Tuple[str, str]]: + entries = property.split(",") + for entry in entries: + key, val = entry.rsplit("=") + yield (key.strip(), val.strip()) + + def get_dataset_for_topic_v1(self, topic: str, parser: BQParser) -> Optional[str]: + topicregex_dataset_map: Dict[str, str] = dict(self.get_list(parser.datasets)) # type: ignore + from java.util.regex import Pattern + + for pattern, dataset in topicregex_dataset_map.items(): + patternMatcher = Pattern.compile(pattern).matcher(topic) + if patternMatcher.matches(): + return dataset + return None + + def sanitize_table_name(self, table_name): + table_name = re.sub("[^a-zA-Z0-9_]", "_", table_name) + if re.match("^[^a-zA-Z_].*", table_name): + table_name = "_" + table_name + + return table_name + + def get_dataset_table_for_topic( + self, topic: str, parser: BQParser + ) -> Optional[str]: + if parser.version == "v2": + dataset = parser.defaultDataset + parts = topic.split(":") + if len(parts) == 2: + dataset = parts[0] + table = parts[1] + else: + table = parts[0] + else: + dataset = self.get_dataset_for_topic_v1(topic, parser) + if dataset is None: + return None + + table = topic + if parser.topicsToTables: + topicregex_table_map: Dict[str, str] = dict( + self.get_list(parser.topicsToTables) # type: ignore + ) + from java.util.regex import Pattern + + for pattern, tbl in topicregex_table_map.items(): + patternMatcher = Pattern.compile(pattern).matcher(topic) + if patternMatcher.matches(): + table = tbl + break + + if parser.sanitizeTopics: + table = self.sanitize_table_name(table) + return f"{dataset}.{table}" + + def _extract_lineages(self): + lineages: List[KafkaConnectLineage] = list() + parser = self.get_parser(self.connector_manifest) + if not parser: + return lineages + target_platform = parser.target_platform + project = parser.project + + self.connector_manifest.flow_property_bag = self.connector_manifest.config + + # Mask/Remove properties that may reveal credentials + if "keyfile" in self.connector_manifest.flow_property_bag: + del self.connector_manifest.flow_property_bag["keyfile"] + + for topic in self.connector_manifest.topic_names: + dataset_table = self.get_dataset_table_for_topic(topic, parser) + if dataset_table is None: + self.report_warning( + self.connector_manifest.name, + f"could not find target dataset for topic {topic}, please check your connector configuration", + ) + continue + target_dataset = f"{project}.{dataset_table}" + + lineages.append( + KafkaConnectLineage( + source_dataset=topic, + source_platform="kafka", + target_dataset=target_dataset, + target_platform=target_platform, + ) + ) + self.connector_manifest.lineages = lineages + return + + +def transform_connector_config( + connector_config: Dict, provided_configs: List[ProvidedConfig] +) -> None: + """This method will update provided configs in connector config values, if any""" + lookupsByProvider = {} + for pconfig in provided_configs: + lookupsByProvider[f"${{{pconfig.provider}:{pconfig.path_key}}}"] = pconfig.value + for k, v in connector_config.items(): + for key, value in lookupsByProvider.items(): + if key in v: + connector_config[k] = v.replace(key, value) + + class KafkaConnectSource(Source): """The class for Kafka Connect source. @@ -589,17 +756,22 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: manifest = connector_response.json() connector_manifest = ConnectorManifest(**manifest) + if self.config.provided_configs: + transform_connector_config( + connector_manifest.config, self.config.provided_configs + ) # Initialize connector lineages connector_manifest.lineages = list() connector_manifest.url = connector_url + topics = self.session.get( + f"{self.config.connect_uri}/connectors/{c}/topics", + ).json() + + connector_manifest.topic_names = topics[c]["topics"] + # Populate Source Connector metadata if connector_manifest.type == "source": - topics = self.session.get( - f"{self.config.connect_uri}/connectors/{c}/topics", - ).json() - - connector_manifest.topic_names = topics[c]["topics"] tasks = self.session.get( f"{self.config.connect_uri}/connectors/{c}/tasks", @@ -629,11 +801,17 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: continue if connector_manifest.type == "sink": - # TODO: Sink Connector not yet implemented - self.report.report_dropped(connector_manifest.name) - logger.warning( - f"Skipping connector {connector_manifest.name}. Lineage for Sink Connector not yet implemented" - ) + if connector_manifest.config.get("connector.class").__eq__( + "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector" + ): + connector_manifest = BigQuerySinkConnector( + connector_manifest=connector_manifest, report=self.report + ).connector_manifest + else: + self.report.report_dropped(connector_manifest.name) + logger.warning( + f"Skipping connector {connector_manifest.name}. Lineage for Connector not yet implemented" + ) pass connectors_manifest.append(connector_manifest) @@ -688,7 +866,7 @@ def construct_job_workunits( source_platform = lineage.source_platform target_dataset = lineage.target_dataset target_platform = lineage.target_platform - # job_property_bag = lineage.job_property_bag + job_property_bag = lineage.job_property_bag job_id = ( source_dataset @@ -713,7 +891,7 @@ def construct_job_workunits( name=f"{connector_name}:{job_id}", type="COMMAND", description=None, - # customProperties=job_property_bag + customProperties=job_property_bag # externalUrl=job_url, ), ) diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index ae17e7c9ae48a..0df1a0bdd03de 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -17,6 +17,7 @@ services: # - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5 # - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1 # - ./../kafka-connect/setup/debezium-debezium-connector-mysql-1.7.0:/usr/local/share/kafka/plugins/debezium-debezium-connector-mysql-1.7.0 +# - ./../kafka-connect/setup/gcp-bigquery-project-keyfile.json:/usr/local/share/gcp-bigquery-project-keyfile.json command: - bash - -c @@ -29,10 +30,15 @@ services: # confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0 # + #confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 + # curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \ | tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \ --strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar # + curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \ + | tar -xzf - -C /usr/share/confluent-hub-components/ + # echo "Launching Kafka Connect worker" # /etc/confluent/docker/run & diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json index 6cf123a1e7c11..97269ab24cf11 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json @@ -85,7 +85,7 @@ "changeType": "UPSERT", "aspectName": "dataFlowInfo", "aspect": { - "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", "contentType": "application/json" }, "systemMetadata": null @@ -254,7 +254,7 @@ "changeType": "UPSERT", "aspectName": "dataFlowInfo", "aspect": { - "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"transforms.changetopic.field\": \"name\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"name\": \"mysql_source5\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", + "value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"name\": \"mysql_source5\", \"transforms.changetopic.field\": \"name\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"table.whitelist\": \"book\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}", "contentType": "application/json" }, "systemMetadata": null diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml index 4974b20bae52f..c9d91d2523454 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml @@ -5,8 +5,12 @@ run_id: kafka-connect-run source: type: "kafka-connect" config: - connect_uri: "http://localhost:58083" - construct_lineage_workunits: False + connect_uri: "http://localhost:58083" + provided_configs: + - provider: env + path_key: MYSQL_CONNECTION_URL + value: jdbc:mysql://test_mysql:3306/librarydb + construct_lineage_workunits: false # see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env index c1088c2fe5738..a3ba004511c74 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env @@ -4,15 +4,21 @@ CONNECT_GROUP_ID=kafka-connect CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets CONNECT_STATUS_STORAGE_TOPIC=_connect-status -CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter -CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter -CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter -CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter -# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081' +CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter +CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter +# CONNECT_INTERNAL_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter +# CONNECT_INTERNAL_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter +CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081' +CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081' +# CONNECT_INTERNAL_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081' +# CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081' CONNECT_REST_ADVERTISED_HOST_NAME=test_connect CONNECT_LOG4J_ROOT_LOGLEVEL=INFO # CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 -CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins \ No newline at end of file +CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins +CONNECT_CONFIG_PROVIDERS=env +CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider +MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 91e3356317936..88c0fd52e7a7a 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -47,10 +47,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-mysql-jdbc-", - "connection.password": "datahub", - "connection.user": "foo", "tasks.max": "1", - "connection.url": "jdbc:mysql://test_mysql:3306/librarydb" + "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, @@ -66,10 +64,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", - "connection.password": "datahub", - "connection.user": "foo", "tasks.max": "1", - "connection.url": "jdbc:mysql://test_mysql:3306/librarydb", + "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "TotalReplacement", "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TotalReplacement.regex": ".*(book)", @@ -90,10 +86,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "book", - "connection.password": "datahub", - "connection.user": "foo", "tasks.max": "1", - "connection.url": "jdbc:mysql://test_mysql:3306/librarydb", + "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "TotalReplacement", "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TotalReplacement.regex": ".*", @@ -115,10 +109,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc "incrementing.column.name": "id", "query": "select * from member", "topic.prefix": "query-topic", - "connection.password": "datahub", - "connection.user": "foo", "tasks.max": "1", - "connection.url": "jdbc:mysql://test_mysql:3306/librarydb" + "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, @@ -134,12 +126,10 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", - "connection.password": "datahub", - "connection.user": "foo", "table.whitelist": "book", "topic.prefix": "test-mysql-jdbc2-", "tasks.max": "1", - "connection.url": "jdbc:mysql://test_mysql:3306/librarydb", + "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "changetopic", "transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value", "transforms.changetopic.field": "name" @@ -159,10 +149,8 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc "insert.mode": "insert", "auto.create": true, "topics": "my-topic", - "connection.password": "datahub", - "connection.user": "foo", "tasks.max": "1", - "connection.url": "jdbc:mysql://test_mysql:3306/librarydb" + "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, @@ -193,7 +181,7 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc assert r.status_code == 201 # Created # Give time for connectors to process the table data - time.sleep(60) + time.sleep(45) # Run the metadata ingestion pipeline. runner = CliRunner()