Skip to content

Commit

Permalink
Use Oracle library only if the recon source is Oracle (#532)
Browse files Browse the repository at this point in the history
Use Oracle library for recon job only if the recon source is Oracle
  • Loading branch information
bishwajit-db committed Jul 8, 2024
1 parent 6aef33f commit 65aef2b
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 13 deletions.
15 changes: 13 additions & 2 deletions src/databricks/labs/remorph/helpers/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from databricks.sdk.service import compute
from databricks.sdk.service.jobs import Task, PythonWheelTask, JobCluster, JobSettings

from databricks.labs.remorph.config import ReconcileConfig
from databricks.labs.remorph.reconcile.constants import SourceType

logger = logging.getLogger(__name__)

TEST_JOBS_PURGE_TIMEOUT = timedelta(hours=1, minutes=15)
Expand Down Expand Up @@ -48,11 +51,13 @@ def __init__(
installation: Installation,
install_state: InstallState,
product_info: ProductInfo,
recon_config: ReconcileConfig,
):
self._ws = workspace_client
self._installation = installation
self._install_state = install_state
self._product_info = product_info
self._recon_config = recon_config

def deploy_job(self) -> str:
logger.info("Deploying reconciliation job.")
Expand Down Expand Up @@ -133,11 +138,17 @@ def _job_settings(self, job_name: str, task_key: str, description: str) -> dict[
}

def _job_recon_task(self, jobs_task: Task) -> Task:
# TODO: fetch a version list for `ojdbc8` and use the second latest version instead of hardcoding
libraries = [
compute.Library(pypi=compute.PythonPyPiLibrary("databricks-labs-remorph")),
compute.Library(maven=compute.MavenLibrary("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")),
]
source = self._recon_config.data_source
if source == SourceType.ORACLE.value:
# TODO: Automatically fetch a version list for `ojdbc8` and
# use the second latest version instead of hardcoding
libraries.append(
compute.Library(maven=compute.MavenLibrary("com.oracle.database.jdbc:ojdbc8:23.4.0.24.05")),
)

return dataclasses.replace(
jobs_task,
libraries=libraries,
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/remorph/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def _deploy_reconcile_job(self):
self._installation,
self._state,
self._product_info,
self._config.reconcile,
)
job_id = job_deployer.deploy_job()
self._config.reconcile.job_id = job_id
Expand Down
61 changes: 50 additions & 11 deletions tests/unit/helpers/test_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from databricks.sdk.service.workspace import ObjectInfo

import databricks.labs.remorph.resources
from databricks.labs.remorph.config import ReconcileConfig
from databricks.labs.remorph.config import ReconcileConfig, DatabaseConfig, ReconcileMetadataConfig
from databricks.labs.remorph.helpers.deployment import TableDeployer, JobDeployer

PRODUCT_INFO = ProductInfo.from_class(ReconcileConfig)
Expand Down Expand Up @@ -53,6 +53,45 @@ def installation_with_jobs():
)


@pytest.fixture
def oracle_recon_config() -> ReconcileConfig:
return ReconcileConfig(
data_source="oracle",
report_type="all",
secret_scope="remorph_oracle9",
database_config=DatabaseConfig(
source_schema="tpch_sf10009",
target_catalog="tpch9",
target_schema="1000gb9",
),
metadata_config=ReconcileMetadataConfig(
catalog="remorph9",
schema="reconcile9",
volume="reconcile_volume9",
),
)


@pytest.fixture
def snowflake_recon_config() -> ReconcileConfig:
return ReconcileConfig(
data_source="snowflake",
report_type="all",
secret_scope="remorph_snowflake9",
database_config=DatabaseConfig(
source_schema="tpch_sf10009",
target_catalog="tpch9",
target_schema="1000gb9",
source_catalog="snowflake_sample_data9",
),
metadata_config=ReconcileMetadataConfig(
catalog="remorph9",
schema="reconcile9",
volume="reconcile_volume9",
),
)


def test_deploy_recon_table():
sql_backend = MockBackend()
table_deployer = TableDeployer(sql_backend, "test_catalog", "test_schema")
Expand All @@ -63,44 +102,44 @@ def test_deploy_recon_table():
assert expected_query in sql_backend.queries


def test_deploy_job(ws, installation):
def test_deploy_job(ws, installation, oracle_recon_config):
install_state = InstallState.from_installation(installation)
job_deployer = JobDeployer(ws, installation, install_state, PRODUCT_INFO)
job_deployer = JobDeployer(ws, installation, install_state, PRODUCT_INFO, oracle_recon_config)
job_id = job_deployer.deploy_job()
assert job_id == "123"


def test_deploy_job_with_valid_state(ws, installation_with_jobs):
def test_deploy_job_with_valid_state(ws, installation_with_jobs, snowflake_recon_config):
install_state = InstallState.from_installation(installation_with_jobs)
job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO)
job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO, snowflake_recon_config)
job_id = job_deployer.deploy_job()
assert ws.jobs.reset.called
assert job_id == "123"


def test_deploy_job_with_invalid_state(ws, installation_with_jobs):
def test_deploy_job_with_invalid_state(ws, installation_with_jobs, snowflake_recon_config):
install_state = InstallState.from_installation(installation_with_jobs)
ws.jobs.reset.side_effect = InvalidParameterValue("Job not found")
job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO)
job_deployer = JobDeployer(ws, installation_with_jobs, install_state, PRODUCT_INFO, snowflake_recon_config)
job_id = job_deployer.deploy_job()
assert ws.jobs.create.called
assert job_id == "123"


def test_deploy_job_in_test_mode(ws, installation):
def test_deploy_job_in_test_mode(ws, installation, snowflake_recon_config):
product_info = create_autospec(ProductInfo)
product_info.product_name.return_value = "test_product"
install_state = InstallState.from_installation(installation)
job_deployer = JobDeployer(ws, installation, install_state, product_info)
job_deployer = JobDeployer(ws, installation, install_state, product_info, snowflake_recon_config)
job_id = job_deployer.deploy_job()
assert job_id == "123"


def test_deploy_job_in_gcp(ws, installation):
def test_deploy_job_in_gcp(ws, installation, snowflake_recon_config):
ws.config.is_aws = False
ws.config.is_azure = False
ws.config.is_gcp = True
install_state = InstallState.from_installation(installation)
job_deployer_gcp = JobDeployer(ws, installation, install_state, PRODUCT_INFO)
job_deployer_gcp = JobDeployer(ws, installation, install_state, PRODUCT_INFO, snowflake_recon_config)
job_id = job_deployer_gcp.deploy_job()
assert job_id == "123"

0 comments on commit 65aef2b

Please sign in to comment.