Skip to content

Commit

Permalink
feat(ingest/kafka-connect): support MongoSourceConnector (#6416)
Browse files Browse the repository at this point in the history
Co-authored-by: John Joyce <john@acryl.io>
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
  • Loading branch information
3 people committed Dec 5, 2022
1 parent e5a823e commit 4dd66be
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 2 deletions.
73 changes: 71 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Expand Up @@ -75,7 +75,7 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase):
)
generic_connectors: List[GenericConnectorConfig] = Field(
default=[],
description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector or Debezium Source Connector",
description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector",
)


Expand Down Expand Up @@ -558,6 +558,68 @@ def _extract_lineages(self):
return


@dataclass
class MongoSourceConnector:
# https://www.mongodb.com/docs/kafka-connector/current/source-connector/

connector_manifest: ConnectorManifest

def __init__(
self, connector_manifest: ConnectorManifest, config: KafkaConnectSourceConfig
) -> None:
self.connector_manifest = connector_manifest
self.config = config
self._extract_lineages()

@dataclass
class MongoSourceParser:
db_connection_url: Optional[str]
source_platform: str
database_name: Optional[str]
topic_prefix: Optional[str]
transforms: List[str]

def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> MongoSourceParser:
parser = self.MongoSourceParser(
db_connection_url=connector_manifest.config.get("connection.uri"),
source_platform="mongodb",
database_name=connector_manifest.config.get("database"),
topic_prefix=connector_manifest.config.get("topic_prefix"),
transforms=connector_manifest.config["transforms"].split(",")
if "transforms" in connector_manifest.config
else [],
)

return parser

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
source_platform = parser.source_platform
topic_naming_pattern = r"mongodb\.(\w+)\.(\w+)"

if not self.connector_manifest.topic_names:
return lineages

for topic in self.connector_manifest.topic_names:
found = re.search(re.compile(topic_naming_pattern), topic)

if found:
table_name = get_dataset_name(found.group(1), None, found.group(2))

lineage = KafkaConnectLineage(
source_dataset=table_name,
source_platform=source_platform,
target_dataset=topic,
target_platform="kafka",
)
lineages.append(lineage)
self.connector_manifest.lineages = lineages


@dataclass
class DebeziumSourceConnector:
connector_manifest: ConnectorManifest
Expand Down Expand Up @@ -848,7 +910,7 @@ class KafkaConnectSource(Source):
Current limitations:
- works only for
- JDBC and Debezium source connectors
- JDBC, Debezium, and Mongo source connectors
- Generic connectors with user-defined lineage graph
- BigQuery sink connector
"""
Expand Down Expand Up @@ -943,6 +1005,13 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest = DebeziumSourceConnector(
connector_manifest=connector_manifest, config=self.config
).connector_manifest
elif (
connector_manifest.config.get("connector.class", "")
== "com.mongodb.kafka.connect.MongoSourceConnector"
):
connector_manifest = MongoSourceConnector(
connector_manifest=connector_manifest, config=self.config
).connector_manifest
else:
# Find the target connector object in the list, or log an error if unknown.
target_connector = None
Expand Down
Expand Up @@ -11,6 +11,7 @@ services:
- zookeeper
- broker
- mysqldb
- mongo
ports:
- "58083:58083"
# volumes:
Expand All @@ -34,6 +35,8 @@ services:
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
Expand Down Expand Up @@ -71,5 +74,21 @@ services:
ports:
- "5432:5432"

mongo:
hostname: mongo
image: mongo:4.2.9
container_name: "test_mongo"
ports:
- "27017:27017"
command: --replSet rs0
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin
- MONGO_INITDB_DATABASE=test_db
- MONGO_INITDB_USERNAME=kafka-connector
- MONGO_INITDB_PASSWORD=password
volumes:
- ./../kafka-connect/setup/conf/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro

volumes:
test_zkdata:
@@ -0,0 +1,72 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector\", \"description\": \"Source connector using `com.mongodb.kafka.connect.MongoSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector:test_db.purchases\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:kafka\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:mongodb\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
}
]
@@ -0,0 +1,22 @@
#!/bin/bash

mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS
conn = new Mongo();
db = conn.getDB("test_db");
db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 });
db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 });
EOJS


{
sleep 3 &&
mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS
var rootUser = '$MONGO_INITDB_ROOT_USERNAME';
var rootPassword = '$MONGO_INITDB_ROOT_PASSWORD';
var admin = db.getSiblingDB('admin');
admin.auth(rootUser, rootPassword);
EOJS
} &



Expand Up @@ -53,6 +53,7 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
).status_code
== 200,
)

# Creating MySQL source with no transformations , only topic prefix
r = requests.post(
"http://localhost:58083/connectors",
Expand Down Expand Up @@ -252,3 +253,88 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
golden_path=test_resources_dir / "kafka_connect_mces_golden.json",
ignore_paths=[],
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_mongosourceconnect_ingest(
docker_compose_runner, pytestconfig, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect"
test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"

# Share Compose configurations between files and projects
# https://docs.docker.com/compose/extends/
docker_compose_file = [
str(test_resources_dir_kafka / "docker-compose.yml"),
str(test_resources_dir / "docker-compose.override.yml"),
]
with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
time.sleep(10)
# Run the setup.sql file to populate the database.
command = 'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"'
ret = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
assert ret.returncode == 0
time.sleep(10)

wait_for_port(docker_services, "test_broker", 59092, timeout=120)
wait_for_port(docker_services, "test_connect", 58083, timeout=120)
docker_services.wait_until_responsive(
timeout=30,
pause=1,
check=lambda: requests.get(
"http://localhost:58083/connectors",
).status_code
== 200,
)

# Creating MongoDB source
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data=r"""{
"name": "source_mongodb_connector",
"config": {
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:admin@test_mongo:27017",
"topic.prefix": "mongodb",
"database": "test_db",
"collection": "purchases",
"copy.existing": true,
"copy.existing.namespace.regex": "test_db.purchases",
"change.stream.full.document": "updateLookup",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "-1",
"topic.creation.default.partitions": "-1",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"output.format.key": "schema",
"output.format.value": "json",
"output.schema.infer.value": false,
"publish.full.document.only":true
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created

# Give time for connectors to process the table data
time.sleep(60)

# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_mongo_mces_golden.json",
ignore_paths=[],
)

0 comments on commit 4dd66be

Please sign in to comment.