Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions databricks_cli/click_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class PipelineSpecClickType(ParamType):
help = 'The path to the pipelines deployment spec file'


class PipelineIdClickType(ParamType):
name = 'PIPELINE_ID'
help = 'Delta Pipeline ID'


class OneOfOption(Option):
def __init__(self, *args, **kwargs):
self.one_of = kwargs.pop('one_of')
Expand Down
33 changes: 7 additions & 26 deletions databricks_cli/pipelines/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
from databricks_cli.dbfs.api import DbfsApi
from databricks_cli.dbfs.dbfs_path import DbfsPath

# These imports are specific to the credentials part
from databricks_cli.configure.config import get_profile_from_context
from databricks_cli.configure.provider import get_config, ProfileConfigProvider
from databricks_cli.utils import InvalidConfigurationError

BUFFER_SIZE = 1024 * 64
base_pipelines_dir = 'dbfs:/pipelines/code'
supported_lib_types = {'jar', 'whl'}
Expand All @@ -53,14 +48,19 @@ def deploy(self, spec, headers=None):

spec['libraries'] = LibraryObject.to_json(external_lib_objects +
self._upload_local_libraries(local_lib_objects))
spec['credentials'] = self._get_credentials_for_request()
self.client.client.perform_query('PUT',
'/pipelines/{}'.format(spec['id']),
data=spec,
headers=headers)

def delete(self, pipeline_id, headers=None):
self.client.delete(pipeline_id, self._get_credentials_for_request(), headers)
self.client.delete(pipeline_id, headers)

def get(self, pipeline_id, headers=None):
self.client.get(pipeline_id, headers)

def reset(self, pipeline_id, headers=None):
self.client.reset(pipeline_id, headers)

@staticmethod
def _identify_local_libraries(lib_objects):
Expand Down Expand Up @@ -130,25 +130,6 @@ def _get_hashed_path(path):
path = '{}/{}.{}'.format(base_pipelines_dir, file_hash, extension)
return path

@staticmethod
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems unrelated - how come we don't need this any more?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the

Stop double-passing credentials to the Delta Pipelines service
part of the change. This method pulled CLI credentials from the config file so they could be included in the body of the request. Now that we don't need to include them in the body, we don't need this method. Arul wrote it nicely so that we could just whack this method once we fixed the credentials server-side situation.

Copy link
Copy Markdown
Contributor

@anew anew Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I think it is better to make these kind of refactorings in a separate PR, but no biggie.

def _get_credentials_for_request():
"""
Only required while the deploy/delete APIs require credentials in the body as well
as the header. Once the API requirement is relaxed, we can remove this function"
"""
profile = get_profile_from_context()
if profile:
config = ProfileConfigProvider.get_config(profile)
else:
config = get_config()
if not config or not config.is_valid:
raise InvalidConfigurationError.for_profile(profile)

if config.is_valid_with_token:
return {'token': config.token}
else:
return {'user': config.username, 'password': config.password}


class LibraryObject(object):
def __init__(self, lib_type, lib_path):
Expand Down
130 changes: 110 additions & 20 deletions databricks_cli/pipelines/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import click

from databricks_cli.click_types import PipelineSpecClickType
from databricks_cli.click_types import PipelineSpecClickType, PipelineIdClickType
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS
from databricks_cli.version import print_version_callback, version
from databricks_cli.pipelines.api import PipelinesApi
Expand All @@ -36,18 +36,24 @@
@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Deploys a delta pipeline according to the pipeline specification')
@click.argument('spec_arg', default=None, required=False)
@click.option('--spec', default=None, help=PipelineSpecClickType.help)
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def deploy_cli(api_client, spec_arg, spec):
"""
Deploys a delta pipeline according to the pipeline specification.
* The pipeline spec is a deployment specification that explains how to run a
Delta Pipeline on Databricks.
* The CLI simply forwards the spec to Databricks.
* All the local libraries referenced in the spec are uploaded to DBFS.
Deploys a delta pipeline according to the pipeline specification. The pipeline spec is a
specification that explains how to run a Delta Pipeline on Databricks. All local libraries
referenced in the spec are uploaded to DBFS.

Usage:

databricks pipelines deploy example.json

OR

databricks pipelines deploy --spec example.json
"""
if bool(spec_arg) == bool(spec):
raise RuntimeError('The spec should be provided either by an option or argument')
Expand All @@ -57,30 +63,97 @@ def deploy_cli(api_client, spec_arg, spec):


@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Stops a delta pipeline and cleans '
'up Databricks resources associated with it')
short_help='Stops a delta pipeline and deletes its associated Databricks resources')
@click.argument('spec_arg', default=None, required=False)
@click.option('--spec', default=None, help=PipelineSpecClickType.help)
@click.option('--pipeline-id', default=None,
help='id associated with the pipeline to be stopped')
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
help=PipelineIdClickType.help)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def delete_cli(api_client, spec_arg, spec, pipeline_id):
"""
Stops a delta pipeline and cleans up Databricks resources associated with it
Stops a delta pipeline and deletes its associated Databricks resources. The pipeline can be
resumed by deploying it again.

Usage:

databricks pipelines delete example.json

OR

databricks pipelines delete --spec example.json

OR

databricks pipelines delete --pipeline-id 1234
"""
# Only one out of spec/pipeline_id/spec_arg should be supplied
if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1:
raise RuntimeError('Either spec should be provided as an argument '
'or option, or the pipeline-id should be provided')
if bool(spec_arg) or bool(spec):
src = spec_arg if bool(spec_arg) else spec
pipeline_id = _read_spec(src)["id"]
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
PipelinesApi(api_client).delete(pipeline_id)


@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Gets a delta pipeline\'s current spec and status')
@click.argument('spec_arg', default=None, required=False)
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to provide the spec to an API that retrieves the spec? Perhaps we should only accept a pipeline id as argument here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also gets the status. Arul and Michael and I debated a bunch when we first did this for delete, but we figured it was probably best to support all options there. I think we should do the same here, but check in with early customers and see if this is making it easier or harder to use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Classic REST will assign a unique ID on resource creation and then use that ID for subsequent requests.

Accepting the entire spec for a delete() or similar call may seem harmless. But I have always believed that a good API has exactly one way to do a specific thing. Allowing to do it in multiple ways creates ambiguity, it can cause confusion, and it multiplies the amount of code that needs to be maintained (and kept backwards-compatible) in the future.

One drawback of accepting a spec for get or delete etc. is that the user may think the entire spec must match, when in reality everything but ID is ignored. For example, a user may expect that the spec returned is equal to the spec passed in - that would not be the case. Or, on delete, the user may expect that it only deletes resources that match the whole spec, including, say, the name. But it will delete a pipeline with a matching ID, regardless of name.

This kind of ambiguities can easily be avoided.

@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
help=PipelineIdClickType.help)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def get_cli(api_client, spec_arg, spec, pipeline_id):
"""
Gets a delta pipeline's current spec and status.

Usage:

databricks pipelines get example.json

OR

databricks pipelines get --spec example.json

OR

databricks pipelines get --pipeline-id 1234
"""
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
PipelinesApi(api_client).get(pipeline_id)


@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Resets a delta pipeline so data can be reprocessed from scratch')
@click.argument('spec_arg', default=None, required=False)
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - I think it the pipeline id is the only argument we should accept here.
And the same applies to delete...

@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
help=PipelineIdClickType.help)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def reset_cli(api_client, spec_arg, spec, pipeline_id):
"""
Resets a delta pipeline by truncating tables and creating new checkpoint folders so data is
reprocessed from scratch.

Usage:

databricks pipelines reset example.json

OR

databricks pipelines reset --spec example.json

OR

databricks pipelines reset --pipeline-id 1234
"""
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
PipelinesApi(api_client).reset(pipeline_id)


def _read_spec(src):
"""
Reads the spec at src as a JSON if no file extension is provided, or if in the extension format
Expand All @@ -95,6 +168,21 @@ def _read_spec(src):
raise RuntimeError('The provided file extension for the spec is not supported')


def _get_pipeline_id(spec_arg, spec, pipeline_id):
"""
Ensures that the user has either specified a spec (either through argument or option) or a
pipeline ID directly, and returns the pipeline id to use.
"""
# Only one out of spec/pipeline_id/spec_arg should be supplied
if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1:
raise RuntimeError('Either spec should be provided as an argument '
'or option, or the pipeline-id should be provided')
if bool(spec_arg) or bool(spec):
src = spec_arg if bool(spec_arg) else spec
pipeline_id = _read_spec(src)["id"]
return pipeline_id


@click.group(context_settings=CONTEXT_SETTINGS,
short_help='Utility to interact with the Databricks Delta Pipelines.')
@click.option('--version', '-v', is_flag=True, callback=print_version_callback,
Expand All @@ -110,3 +198,5 @@ def pipelines_group():

pipelines_group.add_command(deploy_cli, name='deploy')
pipelines_group.add_command(delete_cli, name='delete')
pipelines_group.add_command(get_cli, name='get')
pipelines_group.add_command(reset_cli, name='reset')
42 changes: 24 additions & 18 deletions databricks_cli/sdk/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,8 @@ class DeltaPipelinesService(object):
def __init__(self, client):
self.client = client

def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=None,
clusters=None, libraries=None, transformations=None, credentials=None,
headers=None):
def deploy(self, pipeline_id=None, id=None, name=None, storage=None, configuration=None,
clusters=None, libraries=None, transformations=None, filters=None, headers=None):
_data = {}
if pipeline_id is not None:
_data['pipeline_id'] = pipeline_id
Expand All @@ -808,28 +807,35 @@ def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=Non
_data['name'] = name
if storage is not None:
_data['storage'] = storage
if filters is not None:
_data['filters'] = filters
if not isinstance(filters, dict):
raise TypeError('Expected databricks.Filters() or dict for field filters')
if configuration is not None:
_data['configuration'] = configuration
if clusters is not None:
_data['clusters'] = clusters
if libraries is not None:
_data['libraries'] = libraries
if transformations is not None:
_data['transformations'] = transformations
if credentials is not None:
_data['credentials'] = credentials
if not isinstance(credentials, dict):
raise TypeError('Expected databricks.Credentials() or dict for field credentials')
return self.client.perform_query('PUT', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers)
if filters is not None:
_data['filters'] = filters
if not isinstance(filters, dict):
raise TypeError('Expected databricks.Filters() or dict for field filters')
return self.client.perform_query('PUT', '/pipelines/{pipeline_id}', data=_data, headers=headers)

def delete(self, pipeline_id=None, credentials=None, headers=None):
def delete(self, pipeline_id=None, headers=None):
_data = {}
if pipeline_id is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect that pipeline_id must not be None.

_data['pipeline_id'] = pipeline_id
if credentials is not None:
_data['credentials'] = credentials
if not isinstance(credentials, dict):
raise TypeError('Expected databricks.Credentials() or dict for field credentials')
return self.client.perform_query('DELETE', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers)
return self.client.perform_query('DELETE', '/pipelines/{pipeline_id}', data=_data, headers=headers)

def get(self, pipeline_id=None, headers=None):
_data = {}
if pipeline_id is not None:
_data['pipeline_id'] = pipeline_id
Comment thread
mukulmurthy marked this conversation as resolved.
return self.client.perform_query('GET', '/pipelines/{pipeline_id}', data=_data, headers=headers)

def reset(self, pipeline_id=None, headers=None):
_data = {}
if pipeline_id is not None:
_data['pipeline_id'] = pipeline_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

return self.client.perform_query('POST', '/pipelines/{pipeline_id}/reset', data=_data, headers=headers)

32 changes: 19 additions & 13 deletions tests/pipelines/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
'id': PIPELINE_ID,
'name': 'test_pipeline'
}
CREDENTIALS = 'dummy_credentials'
HEADERS = 'dummy_headers'


Expand All @@ -64,9 +63,8 @@ def file_exists_stub(_, dbfs_path):

@mock.patch('databricks_cli.dbfs.api.DbfsApi.file_exists', file_exists_stub)
@mock.patch('databricks_cli.dbfs.dbfs_path.DbfsPath.validate')
@mock.patch('databricks_cli.pipelines.api.PipelinesApi._get_credentials_for_request')
@mock.patch('databricks_cli.dbfs.api.DbfsApi.put_file')
def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelines_api, tmpdir):
def test_deploy(put_file_mock, dbfs_path_validate, pipelines_api, tmpdir):
"""
Scenarios Tested:
1. All three types of local file paths (absolute, relative, file: scheme)
Expand All @@ -77,7 +75,6 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin
A test local file which has '456' written to it is not present in Dbfs and therefore must be.
uploaded to dbfs.
"""
get_credentials_mock.return_value = CREDENTIALS
deploy_mock = pipelines_api.client.client.perform_query
# set-up the test
jar1 = tmpdir.join('jar1.jar').strpath
Expand Down Expand Up @@ -115,7 +112,6 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin
{'whl':
'dbfs:/pipelines/code/51eac6b471a284d3341d8c0c63d0f1a286262a18/wheel-name-conv.whl'}
]
expected_spec['credentials'] = CREDENTIALS

pipelines_api.deploy(spec)
assert dbfs_path_validate.call_count == 5
Expand All @@ -134,21 +130,31 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin
data=expected_spec, headers=HEADERS)


@mock.patch('databricks_cli.pipelines.api.PipelinesApi._get_credentials_for_request')
def test_delete(get_credentials_mock, pipelines_api):
get_credentials_mock.return_value = CREDENTIALS
def test_delete(pipelines_api):
pipelines_api.delete(PIPELINE_ID)
delete_mock = pipelines_api.client.delete
assert get_credentials_mock.call_count == 1
assert delete_mock.call_count == 1
assert delete_mock.call_args[0][0] == PIPELINE_ID
assert delete_mock.call_args[0][1] == CREDENTIALS
assert delete_mock.call_args[0][2] is None
assert delete_mock.call_args[0][1] is None

pipelines_api.delete(PIPELINE_ID, HEADERS)
assert delete_mock.call_args[0][0] == PIPELINE_ID
assert delete_mock.call_args[0][1] == CREDENTIALS
assert delete_mock.call_args[0][2] == HEADERS
assert delete_mock.call_args[0][1] == HEADERS


def test_get(pipelines_api):
pipelines_api.get(PIPELINE_ID)
get_mock = pipelines_api.client.get
assert get_mock.call_count == 1
assert get_mock.call_args[0][0] == PIPELINE_ID


def test_reset(pipelines_api):
pipelines_api.reset(PIPELINE_ID)
reset_mock = pipelines_api.client.reset
assert reset_mock.call_count == 1
assert reset_mock.call_args[0][0] == PIPELINE_ID
assert reset_mock.call_args[0][1] is None


def test_partition_local_remote(pipelines_api):
Expand Down
Loading