From 4dd66be654684630570ffa512a9790798b67fdee Mon Sep 17 00:00:00 2001 From: Fredrik Sannholm Date: Mon, 5 Dec 2022 23:09:58 +0200 Subject: [PATCH] feat(ingest/kafka-connect): support MongoSourceConnector (#6416) Co-authored-by: John Joyce Co-authored-by: Tamas Nemeth --- .../datahub/ingestion/source/kafka_connect.py | 73 +++++++++++++++- .../kafka-connect/docker-compose.override.yml | 19 ++++ .../kafka_connect_mongo_mces_golden.json | 72 ++++++++++++++++ .../kafka-connect/setup/conf/mongo-init.sh | 22 +++++ .../kafka-connect/test_kafka_connect.py | 86 +++++++++++++++++++ 5 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mongo_mces_golden.json create mode 100644 metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 431f90643d017..da11d62aea692 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -75,7 +75,7 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase): ) generic_connectors: List[GenericConnectorConfig] = Field( default=[], - description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector or Debezium Source Connector", + description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector", ) @@ -558,6 +558,68 @@ def _extract_lineages(self): return +@dataclass +class MongoSourceConnector: + # https://www.mongodb.com/docs/kafka-connector/current/source-connector/ + + connector_manifest: ConnectorManifest + + def __init__( + self, connector_manifest: ConnectorManifest, config: KafkaConnectSourceConfig + ) -> None: + self.connector_manifest = connector_manifest + self.config = config + self._extract_lineages() + + @dataclass + class MongoSourceParser: + db_connection_url: Optional[str] + source_platform: str + database_name: Optional[str] + topic_prefix: Optional[str] + transforms: List[str] + + def get_parser( + self, + connector_manifest: ConnectorManifest, + ) -> MongoSourceParser: + parser = self.MongoSourceParser( + db_connection_url=connector_manifest.config.get("connection.uri"), + source_platform="mongodb", + database_name=connector_manifest.config.get("database"), + topic_prefix=connector_manifest.config.get("topic_prefix"), + transforms=connector_manifest.config["transforms"].split(",") + if "transforms" in connector_manifest.config + else [], + ) + + return parser + + def _extract_lineages(self): + lineages: List[KafkaConnectLineage] = list() + parser = self.get_parser(self.connector_manifest) + source_platform = parser.source_platform + topic_naming_pattern = r"mongodb\.(\w+)\.(\w+)" + + if not self.connector_manifest.topic_names: + return lineages + + for topic in self.connector_manifest.topic_names: + found = re.search(re.compile(topic_naming_pattern), topic) + + if found: + table_name = get_dataset_name(found.group(1), None, found.group(2)) + + lineage = KafkaConnectLineage( + source_dataset=table_name, + source_platform=source_platform, + target_dataset=topic, + target_platform="kafka", + ) + lineages.append(lineage) + self.connector_manifest.lineages = lineages + + @dataclass class DebeziumSourceConnector: connector_manifest: ConnectorManifest @@ -848,7 +910,7 @@ class KafkaConnectSource(Source): Current limitations: - works only for - - JDBC and Debezium source connectors + - JDBC, Debezium, and Mongo source connectors - Generic connectors with user-defined lineage graph - BigQuery sink connector """ @@ -943,6 +1005,13 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: connector_manifest = DebeziumSourceConnector( connector_manifest=connector_manifest, config=self.config ).connector_manifest + elif ( + connector_manifest.config.get("connector.class", "") + == "com.mongodb.kafka.connect.MongoSourceConnector" + ): + connector_manifest = MongoSourceConnector( + connector_manifest=connector_manifest, config=self.config + ).connector_manifest else: # Find the target connector object in the list, or log an error if unknown. target_connector = None diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index 77128adccaaae..e33e776be0d13 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -11,6 +11,7 @@ services: - zookeeper - broker - mysqldb + - mongo ports: - "58083:58083" # volumes: @@ -34,6 +35,8 @@ services: # #confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 # + confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0 + # curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \ | tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \ --strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar @@ -71,5 +74,21 @@ services: ports: - "5432:5432" + mongo: + hostname: mongo + image: mongo:4.2.9 + container_name: "test_mongo" + ports: + - "27017:27017" + command: --replSet rs0 + environment: + - MONGO_INITDB_ROOT_USERNAME=admin + - MONGO_INITDB_ROOT_PASSWORD=admin + - MONGO_INITDB_DATABASE=test_db + - MONGO_INITDB_USERNAME=kafka-connector + - MONGO_INITDB_PASSWORD=password + volumes: + - ./../kafka-connect/setup/conf/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro + volumes: test_zkdata: \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mongo_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mongo_mces_golden.json new file mode 100644 index 0000000000000..d95529b32b9f6 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mongo_mces_golden.json @@ -0,0 +1,72 @@ +[ + { + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector\", \"description\": \"Source connector using `com.mongodb.kafka.connect.MongoSourceConnector` plugin.\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector:test_db.purchases\", \"type\": {\"string\": \"COMMAND\"}}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:kafka\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "value": "{\"platform\": \"urn:li:dataPlatform:mongodb\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run" + } + } +] diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh new file mode 100644 index 0000000000000..acd8424e5e7c2 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS + conn = new Mongo(); + db = conn.getDB("test_db"); + db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 }); + db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 }); +EOJS + + +{ +sleep 3 && +mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS + var rootUser = '$MONGO_INITDB_ROOT_USERNAME'; + var rootPassword = '$MONGO_INITDB_ROOT_PASSWORD'; + var admin = db.getSiblingDB('admin'); + admin.auth(rootUser, rootPassword); +EOJS +} & + + + diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 0547d354e1670..75a4ca89e5466 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -53,6 +53,7 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc ).status_code == 200, ) + # Creating MySQL source with no transformations , only topic prefix r = requests.post( "http://localhost:58083/connectors", @@ -252,3 +253,88 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc golden_path=test_resources_dir / "kafka_connect_mces_golden.json", ignore_paths=[], ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration_batch_1 +def test_kafka_connect_mongosourceconnect_ingest( + docker_compose_runner, pytestconfig, tmp_path, mock_time +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect" + test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka" + + # Share Compose configurations between files and projects + # https://docs.docker.com/compose/extends/ + docker_compose_file = [ + str(test_resources_dir_kafka / "docker-compose.yml"), + str(test_resources_dir / "docker-compose.override.yml"), + ] + with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services: + time.sleep(10) + # Run the setup.sql file to populate the database. + command = 'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"' + ret = subprocess.run( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + assert ret.returncode == 0 + time.sleep(10) + + wait_for_port(docker_services, "test_broker", 59092, timeout=120) + wait_for_port(docker_services, "test_connect", 58083, timeout=120) + docker_services.wait_until_responsive( + timeout=30, + pause=1, + check=lambda: requests.get( + "http://localhost:58083/connectors", + ).status_code + == 200, + ) + + # Creating MongoDB source + r = requests.post( + "http://localhost:58083/connectors", + headers={"Content-Type": "application/json"}, + data=r"""{ + "name": "source_mongodb_connector", + "config": { + "tasks.max": "1", + "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", + "connection.uri": "mongodb://admin:admin@test_mongo:27017", + "topic.prefix": "mongodb", + "database": "test_db", + "collection": "purchases", + "copy.existing": true, + "copy.existing.namespace.regex": "test_db.purchases", + "change.stream.full.document": "updateLookup", + "topic.creation.enable": "true", + "topic.creation.default.replication.factor": "-1", + "topic.creation.default.partitions": "-1", + "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "key.converter.schemas.enable": false, + "value.converter.schemas.enable": false, + "output.format.key": "schema", + "output.format.value": "json", + "output.schema.infer.value": false, + "publish.full.document.only":true + } + }""", + ) + r.raise_for_status() + assert r.status_code == 201 # Created + + # Give time for connectors to process the table data + time.sleep(60) + + # Run the metadata ingestion pipeline. + config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve() + run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_mces.json", + golden_path=test_resources_dir / "kafka_connect_mongo_mces_golden.json", + ignore_paths=[], + )