From 65aef2b964a443fd17c90dc38975001803e9dc59 Mon Sep 17 00:00:00 2001 From: Bishwajit <147722855+bishwajit-db@users.noreply.github.com> Date: Mon, 8 Jul 2024 13:46:04 +0530 Subject: [PATCH] Use Oracle library only if the recon source is Oracle (#532) Use Oracle library for recon job only if the recon source is Oracle --- .../labs/remorph/helpers/deployment.py | 15 ++++- src/databricks/labs/remorph/install.py | 1 + tests/unit/helpers/test_deployment.py | 61 +++++++++++++++---- 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/src/databricks/labs/remorph/helpers/deployment.py b/src/databricks/labs/remorph/helpers/deployment.py index 36def87d2..cbeca1910 100644 --- a/src/databricks/labs/remorph/helpers/deployment.py +++ b/src/databricks/labs/remorph/helpers/deployment.py @@ -15,6 +15,9 @@ from databricks.sdk.service import compute from databricks.sdk.service.jobs import Task, PythonWheelTask, JobCluster, JobSettings +from databricks.labs.remorph.config import ReconcileConfig +from databricks.labs.remorph.reconcile.constants import SourceType + logger = logging.getLogger(__name__) TEST_JOBS_PURGE_TIMEOUT = timedelta(hours=1, minutes=15) @@ -48,11 +51,13 @@ def __init__( installation: Installation, install_state: InstallState, product_info: ProductInfo, + recon_config: ReconcileConfig, ): self._ws = workspace_client self._installation = installation self._install_state = install_state self._product_info = product_info + self._recon_config = recon_config def deploy_job(self) -> str: logger.info("Deploying reconciliation job.") @@ -133,11 +138,17 @@ def _job_settings(self, job_name: str, task_key: str, description: str) -> dict[ } def _job_recon_task(self, jobs_task: Task) -> Task: - # TODO: fetch a version list for `ojdbc8` and use the second latest version instead of hardcoding libraries = [ compute.Library(pypi=compute.PythonPyPiLibrary("databricks-labs-remorph")), - compute.Library(maven=compute.MavenLibrary("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")), ] + source = self._recon_config.data_source + if source == SourceType.ORACLE.value: + # TODO: Automatically fetch a version list for `ojdbc8` and + # use the second latest version instead of hardcoding + libraries.append( + compute.Library(maven=compute.MavenLibrary("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")), + ) + return dataclasses.replace( jobs_task, libraries=libraries, diff --git a/src/databricks/labs/remorph/install.py b/src/databricks/labs/remorph/install.py index 12f870681..e24264568 100644 --- a/src/databricks/labs/remorph/install.py +++ b/src/databricks/labs/remorph/install.py @@ -475,6 +475,7 @@ def _deploy_reconcile_job(self): self._installation, self._state, self._product_info, + self._config.reconcile, ) job_id = job_deployer.deploy_job() self._config.reconcile.job_id = job_id diff --git a/tests/unit/helpers/test_deployment.py b/tests/unit/helpers/test_deployment.py index 86991bf54..2a669d282 100644 --- a/tests/unit/helpers/test_deployment.py +++ b/tests/unit/helpers/test_deployment.py @@ -14,7 +14,7 @@ from databricks.sdk.service.workspace import ObjectInfo import databricks.labs.remorph.resources -from databricks.labs.remorph.config import ReconcileConfig +from databricks.labs.remorph.config import ReconcileConfig, DatabaseConfig, ReconcileMetadataConfig from databricks.labs.remorph.helpers.deployment import TableDeployer, JobDeployer PRODUCT_INFO = ProductInfo.from_class(ReconcileConfig) @@ -53,6 +53,45 @@ def installation_with_jobs(): ) +@pytest.fixture +def oracle_recon_config() -> ReconcileConfig: + return ReconcileConfig( + data_source="oracle", + report_type="all", + secret_scope="remorph_oracle9", + database_config=DatabaseConfig( + source_schema="tpch_sf10009", + target_catalog="tpch9", + target_schema="1000gb9", + ), + metadata_config=ReconcileMetadataConfig( + catalog="remorph9", + schema="reconcile9", + volume="reconcile_volume9", + ), + ) + + +@pytest.fixture +def snowflake_recon_config() -> ReconcileConfig: + return ReconcileConfig( + data_source="snowflake", + report_type="all", + secret_scope="remorph_snowflake9", + database_config=DatabaseConfig( + source_schema="tpch_sf10009", + target_catalog="tpch9", + target_schema="1000gb9", + source_catalog="snowflake_sample_data9", + ), + metadata_config=ReconcileMetadataConfig( + catalog="remorph9", + schema="reconcile9", + volume="reconcile_volume9", + ), + ) + + def test_deploy_recon_table(): sql_backend = MockBackend() table_deployer = TableDeployer(sql_backend, "test_catalog", "test_schema") @@ -63,44 +102,44 @@ def test_deploy_recon_table(): assert expected_query in sql_backend.queries -def test_deploy_job(ws, installation): +def test_deploy_job(ws, installation, oracle_recon_config): install_state = InstallState.from_installation(installation) - job_deployer = JobDeployer(ws, installation, install_state, PRODUCT_INFO) + job_deployer = JobDeployer(ws, installation, install_state, PRODUCT_INFO, oracle_recon_config) job_id = job_deployer.deploy_job() assert job_id == "123" -def test_deploy_job_with_valid_state(ws, installation_with_jobs): +def test_deploy_job_with_valid_state(ws, installation_with_jobs, snowflake_recon_config): install_state = InstallState.from_installation(installation_with_jobs) - job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO) + job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO, snowflake_recon_config) job_id = job_deployer.deploy_job() assert ws.jobs.reset.called assert job_id == "123" -def test_deploy_job_with_invalid_state(ws, installation_with_jobs): +def test_deploy_job_with_invalid_state(ws, installation_with_jobs, snowflake_recon_config): install_state = InstallState.from_installation(installation_with_jobs) ws.jobs.reset.side_effect = InvalidParameterValue("Job not found") - job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO) + job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO, snowflake_recon_config) job_id = job_deployer.deploy_job() assert ws.jobs.create.called assert job_id == "123" -def test_deploy_job_in_test_mode(ws, installation): +def test_deploy_job_in_test_mode(ws, installation, snowflake_recon_config): product_info = create_autospec(ProductInfo) product_info.product_name.return_value = "test_product" install_state = InstallState.from_installation(installation) - job_deployer = JobDeployer(ws, installation, install_state, product_info) + job_deployer = JobDeployer(ws, installation, install_state, product_info, snowflake_recon_config) job_id = job_deployer.deploy_job() assert job_id == "123" -def test_deploy_job_in_gcp(ws, installation): +def test_deploy_job_in_gcp(ws, installation, snowflake_recon_config): ws.config.is_aws = False ws.config.is_azure = False ws.config.is_gcp = True install_state = InstallState.from_installation(installation) - job_deployer_gcp = JobDeployer(ws, installation, install_state, PRODUCT_INFO) + job_deployer_gcp = JobDeployer(ws, installation, install_state, PRODUCT_INFO, snowflake_recon_config) job_id = job_deployer_gcp.deploy_job() assert job_id == "123"