Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add bigquery sink connector lineage in kafka connect source #3590

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions metadata-ingestion/source_docs/kafka-connect.md
Expand Up @@ -16,7 +16,9 @@ This plugin extracts the following:

Current limitations:

- Currently works for JDBC and Debezium source connectors only.
- works only for
- JDBC and Debezium source connectors
- BigQuery sink connector

## Quickstart recipe

Expand All @@ -31,6 +33,10 @@ source:
# Coordinates
connect_uri: "http://localhost:8083"
cluster_name: "connect-cluster"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb

# Credentials
username: admin
Expand All @@ -46,10 +52,11 @@ Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
| -------------------------- | -------- | -------------------------- | ------------------------------------------------------- |
| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. |
| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. |
| `username` | | | Kafka Connect username. |
| `password` | | | Kafka Connect password. |
| `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. |
| `provided_configs` | | | Provided Configurations |
| `construct_lineage_workunits` | | `True` | Whether to create the input and output Dataset entities |
| `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. |
| `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. |
Expand Down
204 changes: 191 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
@@ -1,7 +1,7 @@
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Tuple

import jpype
import jpype.imports
Expand All @@ -19,6 +19,12 @@
logger = logging.getLogger(__name__)


class ProvidedConfig(ConfigModel):
provider: str
path_key: str
value: str


class KafkaConnectSourceConfig(ConfigModel):
# See the Connect REST Interface for details
# https://docs.confluent.io/platform/current/connect/references/restapi.html#
Expand All @@ -29,6 +35,7 @@ class KafkaConnectSourceConfig(ConfigModel):
env: str = builder.DEFAULT_ENV
construct_lineage_workunits: bool = True
connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
provided_configs: Optional[List[ProvidedConfig]] = None


@dataclass
Expand Down Expand Up @@ -533,6 +540,166 @@ def _extract_lineages(self):
self.connector_manifest.lineages = lineages


@dataclass
class BigQuerySinkConnector:
connector_manifest: ConnectorManifest
report: KafkaConnectSourceReport

def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()

@dataclass
class BQParser:
project: str
target_platform: str
sanitizeTopics: str
topicsToTables: Optional[str] = None
datasets: Optional[str] = None
defaultDataset: Optional[str] = None
version: str = "v1"

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> BQParser:
project = connector_manifest.config["project"]
sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false")

if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
return self.BQParser(
project=project,
defaultDataset=defaultDataset,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
version="v2",
)
else:
# version 1.6.x and similar configs supported
datasets = connector_manifest.config["datasets"]
topicsToTables = connector_manifest.config.get("topicsToTables")

return self.BQParser(
project=project,
topicsToTables=topicsToTables,
datasets=datasets,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
)

def get_list(self, property: str) -> Iterable[Tuple[str, str]]:
entries = property.split(",")
for entry in entries:
key, val = entry.rsplit("=")
yield (key.strip(), val.strip())

def get_dataset_for_topic_v1(self, topic: str, parser: BQParser) -> Optional[str]:
topicregex_dataset_map: Dict[str, str] = dict(self.get_list(parser.datasets)) # type: ignore
from java.util.regex import Pattern

for pattern, dataset in topicregex_dataset_map.items():
patternMatcher = Pattern.compile(pattern).matcher(topic)
if patternMatcher.matches():
return dataset
return None

def sanitize_table_name(self, table_name):
table_name = re.sub("[^a-zA-Z0-9_]", "_", table_name)
if re.match("^[^a-zA-Z_].*", table_name):
table_name = "_" + table_name

return table_name

def get_dataset_table_for_topic(
self, topic: str, parser: BQParser
) -> Optional[str]:
if parser.version == "v2":
dataset = parser.defaultDataset
parts = topic.split(":")
if len(parts) == 2:
dataset = parts[0]
table = parts[1]
else:
table = parts[0]
else:
dataset = self.get_dataset_for_topic_v1(topic, parser)
if dataset is None:
return None

table = topic
if parser.topicsToTables:
topicregex_table_map: Dict[str, str] = dict(
self.get_list(parser.topicsToTables) # type: ignore
)
from java.util.regex import Pattern

for pattern, tbl in topicregex_table_map.items():
patternMatcher = Pattern.compile(pattern).matcher(topic)
if patternMatcher.matches():
table = tbl
break

if parser.sanitizeTopics:
table = self.sanitize_table_name(table)
return f"{dataset}.{table}"

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
if not parser:
return lineages
target_platform = parser.target_platform
project = parser.project

self.connector_manifest.flow_property_bag = self.connector_manifest.config

# Mask/Remove properties that may reveal credentials
if "keyfile" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["keyfile"]

for topic in self.connector_manifest.topic_names:
dataset_table = self.get_dataset_table_for_topic(topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {topic}, please check your connector configuration",
)
continue
target_dataset = f"{project}.{dataset_table}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=target_platform,
)
)
self.connector_manifest.lineages = lineages
return


def transform_connector_config(
connector_config: Dict, provided_configs: List[ProvidedConfig]
) -> None:
"""This method will update provided configs in connector config values, if any"""
lookupsByProvider = {}
for pconfig in provided_configs:
lookupsByProvider[f"${{{pconfig.provider}:{pconfig.path_key}}}"] = pconfig.value
for k, v in connector_config.items():
for key, value in lookupsByProvider.items():
if key in v:
connector_config[k] = v.replace(key, value)


class KafkaConnectSource(Source):
"""The class for Kafka Connect source.

Expand Down Expand Up @@ -589,17 +756,22 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:

manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
if self.config.provided_configs:
transform_connector_config(
connector_manifest.config, self.config.provided_configs
)
# Initialize connector lineages
connector_manifest.lineages = list()
connector_manifest.url = connector_url

topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()

connector_manifest.topic_names = topics[c]["topics"]

# Populate Source Connector metadata
if connector_manifest.type == "source":
topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()

connector_manifest.topic_names = topics[c]["topics"]

tasks = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/tasks",
Expand Down Expand Up @@ -629,11 +801,17 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
continue

if connector_manifest.type == "sink":
# TODO: Sink Connector not yet implemented
self.report.report_dropped(connector_manifest.name)
logger.warning(
f"Skipping connector {connector_manifest.name}. Lineage for Sink Connector not yet implemented"
)
if connector_manifest.config.get("connector.class").__eq__(
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
):
connector_manifest = BigQuerySinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
else:
self.report.report_dropped(connector_manifest.name)
logger.warning(
f"Skipping connector {connector_manifest.name}. Lineage for Connector not yet implemented"
)
pass

connectors_manifest.append(connector_manifest)
Expand Down Expand Up @@ -688,7 +866,7 @@ def construct_job_workunits(
source_platform = lineage.source_platform
target_dataset = lineage.target_dataset
target_platform = lineage.target_platform
# job_property_bag = lineage.job_property_bag
job_property_bag = lineage.job_property_bag

job_id = (
source_dataset
Expand All @@ -713,7 +891,7 @@ def construct_job_workunits(
name=f"{connector_name}:{job_id}",
type="COMMAND",
description=None,
# customProperties=job_property_bag
customProperties=job_property_bag
# externalUrl=job_url,
),
)
Expand Down
Expand Up @@ -17,6 +17,7 @@ services:
# - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5
# - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1
# - ./../kafka-connect/setup/debezium-debezium-connector-mysql-1.7.0:/usr/local/share/kafka/plugins/debezium-debezium-connector-mysql-1.7.0
# - ./../kafka-connect/setup/gcp-bigquery-project-keyfile.json:/usr/local/share/gcp-bigquery-project-keyfile.json
command:
- bash
- -c
Expand All @@ -29,10 +30,15 @@ services:
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
#
curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/
#
echo "Launching Kafka Connect worker"
#
/etc/confluent/docker/run &
Expand Down
Expand Up @@ -85,7 +85,7 @@
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
Expand Down Expand Up @@ -254,7 +254,7 @@
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"transforms.changetopic.field\": \"name\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"name\": \"mysql_source5\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"name\": \"mysql_source5\", \"transforms.changetopic.field\": \"name\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"table.whitelist\": \"book\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
Expand Down
Expand Up @@ -5,8 +5,12 @@ run_id: kafka-connect-run
source:
type: "kafka-connect"
config:
connect_uri: "http://localhost:58083"
construct_lineage_workunits: False
connect_uri: "http://localhost:58083"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
construct_lineage_workunits: false

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
Expand Up @@ -4,15 +4,21 @@ CONNECT_GROUP_ID=kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs
CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets
CONNECT_STATUS_STORAGE_TOPIC=_connect-status
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
# CONNECT_INTERNAL_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
# CONNECT_INTERNAL_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
# CONNECT_INTERNAL_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
# CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME=test_connect
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
# CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins
CONNECT_CONFIG_PROVIDERS=env
CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider
MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb