Skip to content

Commit

Permalink
feat(ingest): add bigquery sink connector lineage in kafka connect so…
Browse files Browse the repository at this point in the history
…urce (#3590)
  • Loading branch information
mayurinehate committed Nov 18, 2021
1 parent ad340be commit 69036ac
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 44 deletions.
11 changes: 9 additions & 2 deletions metadata-ingestion/source_docs/kafka-connect.md
Expand Up @@ -16,7 +16,9 @@ This plugin extracts the following:

Current limitations:

- Currently works for JDBC and Debezium source connectors only.
- works only for
- JDBC and Debezium source connectors
- BigQuery sink connector

## Quickstart recipe

Expand All @@ -31,6 +33,10 @@ source:
# Coordinates
connect_uri: "http://localhost:8083"
cluster_name: "connect-cluster"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb

# Credentials
username: admin
Expand All @@ -46,10 +52,11 @@ Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
| -------------------------- | -------- | -------------------------- | ------------------------------------------------------- |
| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. |
| `connect_uri` | | `"http://localhost:8083/"` | URI to connect to. |
| `username` | | | Kafka Connect username. |
| `password` | | | Kafka Connect password. |
| `cluster_name` | | `"connect-cluster"` | Cluster to ingest from. |
| `provided_configs` | | | Provided Configurations |
| `construct_lineage_workunits` | | `True` | Whether to create the input and output Dataset entities |
| `connector_patterns.deny` | | | List of regex patterns for connectors to include in ingestion. |
| `connector_patterns.allow` | | | List of regex patterns for connectors to exclude from ingestion. |
Expand Down
204 changes: 191 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
@@ -1,7 +1,7 @@
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Tuple

import jpype
import jpype.imports
Expand All @@ -19,6 +19,12 @@
logger = logging.getLogger(__name__)


class ProvidedConfig(ConfigModel):
provider: str
path_key: str
value: str


class KafkaConnectSourceConfig(ConfigModel):
# See the Connect REST Interface for details
# https://docs.confluent.io/platform/current/connect/references/restapi.html#
Expand All @@ -29,6 +35,7 @@ class KafkaConnectSourceConfig(ConfigModel):
env: str = builder.DEFAULT_ENV
construct_lineage_workunits: bool = True
connector_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
provided_configs: Optional[List[ProvidedConfig]] = None


@dataclass
Expand Down Expand Up @@ -533,6 +540,166 @@ def _extract_lineages(self):
self.connector_manifest.lineages = lineages


@dataclass
class BigQuerySinkConnector:
connector_manifest: ConnectorManifest
report: KafkaConnectSourceReport

def __init__(
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
) -> None:
self.connector_manifest = connector_manifest
self.report = report
self._extract_lineages()

@dataclass
class BQParser:
project: str
target_platform: str
sanitizeTopics: str
topicsToTables: Optional[str] = None
datasets: Optional[str] = None
defaultDataset: Optional[str] = None
version: str = "v1"

def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)

def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> BQParser:
project = connector_manifest.config["project"]
sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false")

if "defaultDataset" in connector_manifest.config:
defaultDataset = connector_manifest.config["defaultDataset"]
return self.BQParser(
project=project,
defaultDataset=defaultDataset,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
version="v2",
)
else:
# version 1.6.x and similar configs supported
datasets = connector_manifest.config["datasets"]
topicsToTables = connector_manifest.config.get("topicsToTables")

return self.BQParser(
project=project,
topicsToTables=topicsToTables,
datasets=datasets,
target_platform="bigquery",
sanitizeTopics=sanitizeTopics.lower() == "true",
)

def get_list(self, property: str) -> Iterable[Tuple[str, str]]:
entries = property.split(",")
for entry in entries:
key, val = entry.rsplit("=")
yield (key.strip(), val.strip())

def get_dataset_for_topic_v1(self, topic: str, parser: BQParser) -> Optional[str]:
topicregex_dataset_map: Dict[str, str] = dict(self.get_list(parser.datasets)) # type: ignore
from java.util.regex import Pattern

for pattern, dataset in topicregex_dataset_map.items():
patternMatcher = Pattern.compile(pattern).matcher(topic)
if patternMatcher.matches():
return dataset
return None

def sanitize_table_name(self, table_name):
table_name = re.sub("[^a-zA-Z0-9_]", "_", table_name)
if re.match("^[^a-zA-Z_].*", table_name):
table_name = "_" + table_name

return table_name

def get_dataset_table_for_topic(
self, topic: str, parser: BQParser
) -> Optional[str]:
if parser.version == "v2":
dataset = parser.defaultDataset
parts = topic.split(":")
if len(parts) == 2:
dataset = parts[0]
table = parts[1]
else:
table = parts[0]
else:
dataset = self.get_dataset_for_topic_v1(topic, parser)
if dataset is None:
return None

table = topic
if parser.topicsToTables:
topicregex_table_map: Dict[str, str] = dict(
self.get_list(parser.topicsToTables) # type: ignore
)
from java.util.regex import Pattern

for pattern, tbl in topicregex_table_map.items():
patternMatcher = Pattern.compile(pattern).matcher(topic)
if patternMatcher.matches():
table = tbl
break

if parser.sanitizeTopics:
table = self.sanitize_table_name(table)
return f"{dataset}.{table}"

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
if not parser:
return lineages
target_platform = parser.target_platform
project = parser.project

self.connector_manifest.flow_property_bag = self.connector_manifest.config

# Mask/Remove properties that may reveal credentials
if "keyfile" in self.connector_manifest.flow_property_bag:
del self.connector_manifest.flow_property_bag["keyfile"]

for topic in self.connector_manifest.topic_names:
dataset_table = self.get_dataset_table_for_topic(topic, parser)
if dataset_table is None:
self.report_warning(
self.connector_manifest.name,
f"could not find target dataset for topic {topic}, please check your connector configuration",
)
continue
target_dataset = f"{project}.{dataset_table}"

lineages.append(
KafkaConnectLineage(
source_dataset=topic,
source_platform="kafka",
target_dataset=target_dataset,
target_platform=target_platform,
)
)
self.connector_manifest.lineages = lineages
return


def transform_connector_config(
connector_config: Dict, provided_configs: List[ProvidedConfig]
) -> None:
"""This method will update provided configs in connector config values, if any"""
lookupsByProvider = {}
for pconfig in provided_configs:
lookupsByProvider[f"${{{pconfig.provider}:{pconfig.path_key}}}"] = pconfig.value
for k, v in connector_config.items():
for key, value in lookupsByProvider.items():
if key in v:
connector_config[k] = v.replace(key, value)


class KafkaConnectSource(Source):
"""The class for Kafka Connect source.
Expand Down Expand Up @@ -589,17 +756,22 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:

manifest = connector_response.json()
connector_manifest = ConnectorManifest(**manifest)
if self.config.provided_configs:
transform_connector_config(
connector_manifest.config, self.config.provided_configs
)
# Initialize connector lineages
connector_manifest.lineages = list()
connector_manifest.url = connector_url

topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()

connector_manifest.topic_names = topics[c]["topics"]

# Populate Source Connector metadata
if connector_manifest.type == "source":
topics = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/topics",
).json()

connector_manifest.topic_names = topics[c]["topics"]

tasks = self.session.get(
f"{self.config.connect_uri}/connectors/{c}/tasks",
Expand Down Expand Up @@ -629,11 +801,17 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
continue

if connector_manifest.type == "sink":
# TODO: Sink Connector not yet implemented
self.report.report_dropped(connector_manifest.name)
logger.warning(
f"Skipping connector {connector_manifest.name}. Lineage for Sink Connector not yet implemented"
)
if connector_manifest.config.get("connector.class").__eq__(
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
):
connector_manifest = BigQuerySinkConnector(
connector_manifest=connector_manifest, report=self.report
).connector_manifest
else:
self.report.report_dropped(connector_manifest.name)
logger.warning(
f"Skipping connector {connector_manifest.name}. Lineage for Connector not yet implemented"
)
pass

connectors_manifest.append(connector_manifest)
Expand Down Expand Up @@ -688,7 +866,7 @@ def construct_job_workunits(
source_platform = lineage.source_platform
target_dataset = lineage.target_dataset
target_platform = lineage.target_platform
# job_property_bag = lineage.job_property_bag
job_property_bag = lineage.job_property_bag

job_id = (
source_dataset
Expand All @@ -713,7 +891,7 @@ def construct_job_workunits(
name=f"{connector_name}:{job_id}",
type="COMMAND",
description=None,
# customProperties=job_property_bag
customProperties=job_property_bag
# externalUrl=job_url,
),
)
Expand Down
Expand Up @@ -17,6 +17,7 @@ services:
# - ./../kafka-connect/setup/confluentinc-kafka-connect-jdbc-10.2.5:/usr/local/share/kafka/plugins/confluentinc-kafka-connect-jdbc-10.2.5
# - ./../kafka-connect/setup/confluentinc-connect-transforms-1.4.1:/usr/local/share/kafka/plugins/confluentinc-connect-transforms-1.4.1
# - ./../kafka-connect/setup/debezium-debezium-connector-mysql-1.7.0:/usr/local/share/kafka/plugins/debezium-debezium-connector-mysql-1.7.0
# - ./../kafka-connect/setup/gcp-bigquery-project-keyfile.json:/usr/local/share/gcp-bigquery-project-keyfile.json
command:
- bash
- -c
Expand All @@ -29,10 +30,15 @@ services:
#
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
#
curl -k -SL "https://repo1.maven.org/maven2/io/strimzi/kafka-env-var-config-provider/0.1.1/kafka-env-var-config-provider-0.1.1.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/
#
echo "Launching Kafka Connect worker"
#
/etc/confluent/docker/run &
Expand Down
Expand Up @@ -85,7 +85,7 @@
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"tasks.max\": \"1\", \"transforms\": \"TotalReplacement\", \"name\": \"mysql_source3\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"transforms.TotalReplacement.regex\": \".*\", \"transforms.TotalReplacement.type\": \"org.apache.kafka.connect.transforms.RegexRouter\", \"table.whitelist\": \"book\", \"transforms.TotalReplacement.replacement\": \"my-new-topic\"}, \"name\": \"mysql_source3\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
Expand Down Expand Up @@ -254,7 +254,7 @@
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"incrementing.column.name\": \"id\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"transforms.changetopic.field\": \"name\", \"table.whitelist\": \"book\", \"mode\": \"incrementing\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"name\": \"mysql_source5\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"value": "{\"customProperties\": {\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\", \"mode\": \"incrementing\", \"incrementing.column.name\": \"id\", \"topic.prefix\": \"test-mysql-jdbc2-\", \"transforms.changetopic.type\": \"io.confluent.connect.transforms.ExtractTopic$Value\", \"tasks.max\": \"1\", \"transforms\": \"changetopic\", \"name\": \"mysql_source5\", \"transforms.changetopic.field\": \"name\", \"connection.url\": \"mysql://test_mysql:3306/librarydb\", \"table.whitelist\": \"book\"}, \"name\": \"mysql_source5\", \"description\": \"Source connector using `io.confluent.connect.jdbc.JdbcSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": null
Expand Down
Expand Up @@ -5,8 +5,12 @@ run_id: kafka-connect-run
source:
type: "kafka-connect"
config:
connect_uri: "http://localhost:58083"
construct_lineage_workunits: False
connect_uri: "http://localhost:58083"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
construct_lineage_workunits: false

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
Expand Up @@ -4,15 +4,21 @@ CONNECT_GROUP_ID=kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs
CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets
CONNECT_STATUS_STORAGE_TOPIC=_connect-status
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
# CONNECT_INTERNAL_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
# CONNECT_INTERNAL_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
# CONNECT_INTERNAL_KEY_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
# CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMA_REGISTRY_URL='http://schema-registry:8081'
CONNECT_REST_ADVERTISED_HOST_NAME=test_connect
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
# CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN=[%d] %p %X{connector.context}%m (%c:%L)%n
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins
CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/plugins
CONNECT_CONFIG_PROVIDERS=env
CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider
MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb

0 comments on commit 69036ac

Please sign in to comment.