diff --git a/databricks_cli/click_types.py b/databricks_cli/click_types.py index 7d3a0cf8..18852792 100644 --- a/databricks_cli/click_types.py +++ b/databricks_cli/click_types.py @@ -95,6 +95,11 @@ class PipelineSpecClickType(ParamType): help = 'The path to the pipelines deployment spec file' +class PipelineIdClickType(ParamType): + name = 'PIPELINE_ID' + help = 'Delta Pipeline ID' + + class OneOfOption(Option): def __init__(self, *args, **kwargs): self.one_of = kwargs.pop('one_of') diff --git a/databricks_cli/pipelines/api.py b/databricks_cli/pipelines/api.py index b9226a5e..31f6fc92 100644 --- a/databricks_cli/pipelines/api.py +++ b/databricks_cli/pipelines/api.py @@ -31,11 +31,6 @@ from databricks_cli.dbfs.api import DbfsApi from databricks_cli.dbfs.dbfs_path import DbfsPath -# These imports are specific to the credentials part -from databricks_cli.configure.config import get_profile_from_context -from databricks_cli.configure.provider import get_config, ProfileConfigProvider -from databricks_cli.utils import InvalidConfigurationError - BUFFER_SIZE = 1024 * 64 base_pipelines_dir = 'dbfs:/pipelines/code' supported_lib_types = {'jar', 'whl'} @@ -53,14 +48,19 @@ def deploy(self, spec, headers=None): spec['libraries'] = LibraryObject.to_json(external_lib_objects + self._upload_local_libraries(local_lib_objects)) - spec['credentials'] = self._get_credentials_for_request() self.client.client.perform_query('PUT', '/pipelines/{}'.format(spec['id']), data=spec, headers=headers) def delete(self, pipeline_id, headers=None): - self.client.delete(pipeline_id, self._get_credentials_for_request(), headers) + self.client.delete(pipeline_id, headers) + + def get(self, pipeline_id, headers=None): + self.client.get(pipeline_id, headers) + + def reset(self, pipeline_id, headers=None): + self.client.reset(pipeline_id, headers) @staticmethod def _identify_local_libraries(lib_objects): @@ -130,25 +130,6 @@ def _get_hashed_path(path): path = '{}/{}.{}'.format(base_pipelines_dir, file_hash, extension) return path - @staticmethod - def _get_credentials_for_request(): - """ - Only required while the deploy/delete APIs require credentials in the body as well - as the header. Once the API requirement is relaxed, we can remove this function" - """ - profile = get_profile_from_context() - if profile: - config = ProfileConfigProvider.get_config(profile) - else: - config = get_config() - if not config or not config.is_valid: - raise InvalidConfigurationError.for_profile(profile) - - if config.is_valid_with_token: - return {'token': config.token} - else: - return {'user': config.username, 'password': config.password} - class LibraryObject(object): def __init__(self, lib_type, lib_path): diff --git a/databricks_cli/pipelines/cli.py b/databricks_cli/pipelines/cli.py index 4c7d905e..fa261d50 100644 --- a/databricks_cli/pipelines/cli.py +++ b/databricks_cli/pipelines/cli.py @@ -26,7 +26,7 @@ import click -from databricks_cli.click_types import PipelineSpecClickType +from databricks_cli.click_types import PipelineSpecClickType, PipelineIdClickType from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS from databricks_cli.version import print_version_callback, version from databricks_cli.pipelines.api import PipelinesApi @@ -36,18 +36,24 @@ @click.command(context_settings=CONTEXT_SETTINGS, short_help='Deploys a delta pipeline according to the pipeline specification') @click.argument('spec_arg', default=None, required=False) -@click.option('--spec', default=None, help=PipelineSpecClickType.help) +@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help) @debug_option @profile_option @eat_exceptions @provide_api_client def deploy_cli(api_client, spec_arg, spec): """ - Deploys a delta pipeline according to the pipeline specification. - * The pipeline spec is a deployment specification that explains how to run a - Delta Pipeline on Databricks. - * The CLI simply forwards the spec to Databricks. - * All the local libraries referenced in the spec are uploaded to DBFS. + Deploys a delta pipeline according to the pipeline specification. The pipeline spec is a + specification that explains how to run a Delta Pipeline on Databricks. All local libraries + referenced in the spec are uploaded to DBFS. + + Usage: + + databricks pipelines deploy example.json + + OR + + databricks pipelines deploy --spec example.json """ if bool(spec_arg) == bool(spec): raise RuntimeError('The spec should be provided either by an option or argument') @@ -57,30 +63,97 @@ def deploy_cli(api_client, spec_arg, spec): @click.command(context_settings=CONTEXT_SETTINGS, - short_help='Stops a delta pipeline and cleans ' - 'up Databricks resources associated with it') + short_help='Stops a delta pipeline and deletes its associated Databricks resources') @click.argument('spec_arg', default=None, required=False) -@click.option('--spec', default=None, help=PipelineSpecClickType.help) -@click.option('--pipeline-id', default=None, - help='id associated with the pipeline to be stopped') +@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help) +@click.option('--pipeline-id', default=None, type=PipelineIdClickType(), + help=PipelineIdClickType.help) @debug_option @profile_option @eat_exceptions @provide_api_client def delete_cli(api_client, spec_arg, spec, pipeline_id): """ - Stops a delta pipeline and cleans up Databricks resources associated with it + Stops a delta pipeline and deletes its associated Databricks resources. The pipeline can be + resumed by deploying it again. + + Usage: + + databricks pipelines delete example.json + + OR + + databricks pipelines delete --spec example.json + + OR + + databricks pipelines delete --pipeline-id 1234 """ - # Only one out of spec/pipeline_id/spec_arg should be supplied - if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1: - raise RuntimeError('Either spec should be provided as an argument ' - 'or option, or the pipeline-id should be provided') - if bool(spec_arg) or bool(spec): - src = spec_arg if bool(spec_arg) else spec - pipeline_id = _read_spec(src)["id"] + pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id) PipelinesApi(api_client).delete(pipeline_id) +@click.command(context_settings=CONTEXT_SETTINGS, + short_help='Gets a delta pipeline\'s current spec and status') +@click.argument('spec_arg', default=None, required=False) +@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help) +@click.option('--pipeline-id', default=None, type=PipelineIdClickType(), + help=PipelineIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def get_cli(api_client, spec_arg, spec, pipeline_id): + """ + Gets a delta pipeline's current spec and status. + + Usage: + + databricks pipelines get example.json + + OR + + databricks pipelines get --spec example.json + + OR + + databricks pipelines get --pipeline-id 1234 + """ + pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id) + PipelinesApi(api_client).get(pipeline_id) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help='Resets a delta pipeline so data can be reprocessed from scratch') +@click.argument('spec_arg', default=None, required=False) +@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help) +@click.option('--pipeline-id', default=None, type=PipelineIdClickType(), + help=PipelineIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def reset_cli(api_client, spec_arg, spec, pipeline_id): + """ + Resets a delta pipeline by truncating tables and creating new checkpoint folders so data is + reprocessed from scratch. + + Usage: + + databricks pipelines reset example.json + + OR + + databricks pipelines reset --spec example.json + + OR + + databricks pipelines reset --pipeline-id 1234 + """ + pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id) + PipelinesApi(api_client).reset(pipeline_id) + + def _read_spec(src): """ Reads the spec at src as a JSON if no file extension is provided, or if in the extension format @@ -95,6 +168,21 @@ def _read_spec(src): raise RuntimeError('The provided file extension for the spec is not supported') +def _get_pipeline_id(spec_arg, spec, pipeline_id): + """ + Ensures that the user has either specified a spec (either through argument or option) or a + pipeline ID directly, and returns the pipeline id to use. + """ + # Only one out of spec/pipeline_id/spec_arg should be supplied + if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1: + raise RuntimeError('Either spec should be provided as an argument ' + 'or option, or the pipeline-id should be provided') + if bool(spec_arg) or bool(spec): + src = spec_arg if bool(spec_arg) else spec + pipeline_id = _read_spec(src)["id"] + return pipeline_id + + @click.group(context_settings=CONTEXT_SETTINGS, short_help='Utility to interact with the Databricks Delta Pipelines.') @click.option('--version', '-v', is_flag=True, callback=print_version_callback, @@ -110,3 +198,5 @@ def pipelines_group(): pipelines_group.add_command(deploy_cli, name='deploy') pipelines_group.add_command(delete_cli, name='delete') +pipelines_group.add_command(get_cli, name='get') +pipelines_group.add_command(reset_cli, name='reset') diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index b61eb393..10eb045e 100644 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -796,9 +796,8 @@ class DeltaPipelinesService(object): def __init__(self, client): self.client = client - def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=None, - clusters=None, libraries=None, transformations=None, credentials=None, - headers=None): + def deploy(self, pipeline_id=None, id=None, name=None, storage=None, configuration=None, + clusters=None, libraries=None, transformations=None, filters=None, headers=None): _data = {} if pipeline_id is not None: _data['pipeline_id'] = pipeline_id @@ -808,28 +807,35 @@ def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=Non _data['name'] = name if storage is not None: _data['storage'] = storage - if filters is not None: - _data['filters'] = filters - if not isinstance(filters, dict): - raise TypeError('Expected databricks.Filters() or dict for field filters') + if configuration is not None: + _data['configuration'] = configuration if clusters is not None: _data['clusters'] = clusters if libraries is not None: _data['libraries'] = libraries if transformations is not None: _data['transformations'] = transformations - if credentials is not None: - _data['credentials'] = credentials - if not isinstance(credentials, dict): - raise TypeError('Expected databricks.Credentials() or dict for field credentials') - return self.client.perform_query('PUT', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers) + if filters is not None: + _data['filters'] = filters + if not isinstance(filters, dict): + raise TypeError('Expected databricks.Filters() or dict for field filters') + return self.client.perform_query('PUT', '/pipelines/{pipeline_id}', data=_data, headers=headers) - def delete(self, pipeline_id=None, credentials=None, headers=None): + def delete(self, pipeline_id=None, headers=None): _data = {} if pipeline_id is not None: _data['pipeline_id'] = pipeline_id - if credentials is not None: - _data['credentials'] = credentials - if not isinstance(credentials, dict): - raise TypeError('Expected databricks.Credentials() or dict for field credentials') - return self.client.perform_query('DELETE', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers) + return self.client.perform_query('DELETE', '/pipelines/{pipeline_id}', data=_data, headers=headers) + + def get(self, pipeline_id=None, headers=None): + _data = {} + if pipeline_id is not None: + _data['pipeline_id'] = pipeline_id + return self.client.perform_query('GET', '/pipelines/{pipeline_id}', data=_data, headers=headers) + + def reset(self, pipeline_id=None, headers=None): + _data = {} + if pipeline_id is not None: + _data['pipeline_id'] = pipeline_id + return self.client.perform_query('POST', '/pipelines/{pipeline_id}/reset', data=_data, headers=headers) + diff --git a/tests/pipelines/test_api.py b/tests/pipelines/test_api.py index 17c5c7f2..8fb500a3 100644 --- a/tests/pipelines/test_api.py +++ b/tests/pipelines/test_api.py @@ -39,7 +39,6 @@ 'id': PIPELINE_ID, 'name': 'test_pipeline' } -CREDENTIALS = 'dummy_credentials' HEADERS = 'dummy_headers' @@ -64,9 +63,8 @@ def file_exists_stub(_, dbfs_path): @mock.patch('databricks_cli.dbfs.api.DbfsApi.file_exists', file_exists_stub) @mock.patch('databricks_cli.dbfs.dbfs_path.DbfsPath.validate') -@mock.patch('databricks_cli.pipelines.api.PipelinesApi._get_credentials_for_request') @mock.patch('databricks_cli.dbfs.api.DbfsApi.put_file') -def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelines_api, tmpdir): +def test_deploy(put_file_mock, dbfs_path_validate, pipelines_api, tmpdir): """ Scenarios Tested: 1. All three types of local file paths (absolute, relative, file: scheme) @@ -77,7 +75,6 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin A test local file which has '456' written to it is not present in Dbfs and therefore must be. uploaded to dbfs. """ - get_credentials_mock.return_value = CREDENTIALS deploy_mock = pipelines_api.client.client.perform_query # set-up the test jar1 = tmpdir.join('jar1.jar').strpath @@ -115,7 +112,6 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin {'whl': 'dbfs:/pipelines/code/51eac6b471a284d3341d8c0c63d0f1a286262a18/wheel-name-conv.whl'} ] - expected_spec['credentials'] = CREDENTIALS pipelines_api.deploy(spec) assert dbfs_path_validate.call_count == 5 @@ -134,21 +130,31 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin data=expected_spec, headers=HEADERS) -@mock.patch('databricks_cli.pipelines.api.PipelinesApi._get_credentials_for_request') -def test_delete(get_credentials_mock, pipelines_api): - get_credentials_mock.return_value = CREDENTIALS +def test_delete(pipelines_api): pipelines_api.delete(PIPELINE_ID) delete_mock = pipelines_api.client.delete - assert get_credentials_mock.call_count == 1 assert delete_mock.call_count == 1 assert delete_mock.call_args[0][0] == PIPELINE_ID - assert delete_mock.call_args[0][1] == CREDENTIALS - assert delete_mock.call_args[0][2] is None + assert delete_mock.call_args[0][1] is None pipelines_api.delete(PIPELINE_ID, HEADERS) assert delete_mock.call_args[0][0] == PIPELINE_ID - assert delete_mock.call_args[0][1] == CREDENTIALS - assert delete_mock.call_args[0][2] == HEADERS + assert delete_mock.call_args[0][1] == HEADERS + + +def test_get(pipelines_api): + pipelines_api.get(PIPELINE_ID) + get_mock = pipelines_api.client.get + assert get_mock.call_count == 1 + assert get_mock.call_args[0][0] == PIPELINE_ID + + +def test_reset(pipelines_api): + pipelines_api.reset(PIPELINE_ID) + reset_mock = pipelines_api.client.reset + assert reset_mock.call_count == 1 + assert reset_mock.call_args[0][0] == PIPELINE_ID + assert reset_mock.call_args[0][1] is None def test_partition_local_remote(pipelines_api): diff --git a/tests/pipelines/test_cli.py b/tests/pipelines/test_cli.py index 00ee7d22..c5299977 100644 --- a/tests/pipelines/test_cli.py +++ b/tests/pipelines/test_cli.py @@ -174,3 +174,73 @@ def test_deploy_delete_cli_correct_spec_extensions(pipelines_api_mock, tmpdir): assert result.exit_code == 0 assert pipelines_api_mock.delete.call_count == 1 pipelines_api_mock.reset_mock() + + +@provide_conf +def test_reset_cli_spec_arg(pipelines_api_mock, tmpdir): + path = tmpdir.join('/spec.json').strpath + with open(path, 'w') as f: + f.write(DEPLOY_SPEC) + runner = CliRunner() + runner.invoke(cli.reset_cli, [path]) + assert pipelines_api_mock.reset.call_args[0][0] == PIPELINE_ID + + +@provide_conf +def test_reset_cli_spec_option(pipelines_api_mock, tmpdir): + path = tmpdir.join('/spec.json').strpath + with open(path, 'w') as f: + f.write(DEPLOY_SPEC) + runner = CliRunner() + runner.invoke(cli.reset_cli, ['--spec', path]) + assert pipelines_api_mock.reset.call_args[0][0] == PIPELINE_ID + + +@provide_conf +def test_reset_cli_id(pipelines_api_mock): + runner = CliRunner() + runner.invoke(cli.reset_cli, ['--pipeline-id', PIPELINE_ID]) + assert pipelines_api_mock.reset.call_args[0][0] == PIPELINE_ID + + +@provide_conf +def test_reset_cli_no_id(pipelines_api_mock): + runner = CliRunner() + result = runner.invoke(cli.reset_cli, []) + assert result.exit_code == 1 + assert pipelines_api_mock.reset.call_count == 0 + + +@provide_conf +def test_get_cli_spec_arg(pipelines_api_mock, tmpdir): + path = tmpdir.join('/spec.json').strpath + with open(path, 'w') as f: + f.write(DEPLOY_SPEC) + runner = CliRunner() + runner.invoke(cli.get_cli, [path]) + assert pipelines_api_mock.get.call_args[0][0] == PIPELINE_ID + + +@provide_conf +def test_get_cli_spec_option(pipelines_api_mock, tmpdir): + path = tmpdir.join('/spec.json').strpath + with open(path, 'w') as f: + f.write(DEPLOY_SPEC) + runner = CliRunner() + runner.invoke(cli.get_cli, ['--spec', path]) + assert pipelines_api_mock.get.call_args[0][0] == PIPELINE_ID + + +@provide_conf +def test_get_cli_id(pipelines_api_mock): + runner = CliRunner() + runner.invoke(cli.get_cli, ['--pipeline-id', PIPELINE_ID]) + assert pipelines_api_mock.get.call_args[0][0] == PIPELINE_ID + + +@provide_conf +def test_get_cli_no_id(pipelines_api_mock): + runner = CliRunner() + result = runner.invoke(cli.get_cli, []) + assert result.exit_code == 1 + assert pipelines_api_mock.get.call_count == 0