From 823e844e44edaa013573edd978ab9a180ca49d3e Mon Sep 17 00:00:00 2001 From: Ege Elgun Date: Fri, 26 Feb 2021 12:41:03 +0000 Subject: [PATCH 01/26] Bump version to 0.4.3.dev0 --- databricks_cli/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks_cli/version.py b/databricks_cli/version.py index 51bf2021..d4bdcec8 100644 --- a/databricks_cli/version.py +++ b/databricks_cli/version.py @@ -21,7 +21,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '0.14.2' # NOQA +version = '0.14.3.dev0' # NOQA def print_version_callback(ctx, param, value): # NOQA From dd9c7502aa5a377da3c4f2e83bf8a1bb852e41dc Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 5 May 2021 12:57:58 +0100 Subject: [PATCH 02/26] Add draft changes. --- databricks_cli/dbfs/api.py | 13 +++------- databricks_cli/dbfs/cli.py | 44 ++++++++++++++++++++++++++++++++ databricks_cli/sdk/api_client.py | 12 ++++++--- databricks_cli/sdk/service.py | 12 +++++++-- 4 files changed, 67 insertions(+), 14 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index eb363a4a..11dd5543 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -114,15 +114,10 @@ def get_status(self, dbfs_path, headers=None): return FileInfo.from_json(json) def put_file(self, src_path, dbfs_path, overwrite, headers=None): - handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle'] - with open(src_path, 'rb') as local_file: - while True: - contents = local_file.read(BUFFER_SIZE_BYTES) - if len(contents) == 0: - break - # add_block should not take a bytes object. - self.client.add_block(handle, b64encode(contents).decode(), headers=headers) - self.client.close(handle, headers=headers) + self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) + + def put_content(self, content, dbfs_path, overwrite, headers=None): + self.client.put(dbfs_path.absolute_path, contents=content, overwrite=overwrite, headers=headers) def get_file(self, dbfs_path, dst_path, overwrite, headers=None): if os.path.exists(dst_path) and not overwrite: diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 4355b6cc..1062a0c9 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -159,6 +159,48 @@ def cat_cli(api_client, src): DbfsApi(api_client).cat(src) +@click.command(context_settings=CONTEXT_SETTINGS) +@click.argument('src_path', type=click.Path(exists=True)) +@click.argument('dbfs_path', nargs=-1, type=DbfsPathClickType()) +@click.option('--overwrite', is_flag=True, default=False) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def put_file_cli(api_client, src_path, dbfs_path, overwrite, headers=None): + """ + Put file to DBFS with multipart form post. + """ + if len(dbfs_path) == 0: + dbfs_path = DbfsPath('dbfs:/') + elif len(dbfs_path) == 1: + dbfs_path = dbfs_path[0] + else: + error_and_quit('ls can take a maximum of one path.') + DbfsApi(api_client).put_file(src_path, dbfs_path, overwrite, headers=headers) + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.argument('content') +@click.argument('dbfs_path', nargs=-1, type=DbfsPathClickType()) +@click.option('--overwrite', is_flag=True, default=False) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def put_content_cli(api_client, content, dbfs_path, overwrite, headers=None): + """ + Put contents to a file in DBFS with multipart form post. + """ + if len(dbfs_path) == 0: + dbfs_path = DbfsPath('dbfs:/') + elif len(dbfs_path) == 1: + dbfs_path = dbfs_path[0] + else: + error_and_quit('ls can take a maximum of one path.') + DbfsApi(api_client).put_content(content, dbfs_path, overwrite, headers=headers) + + dbfs_group.add_command(configure_cli, name='configure') dbfs_group.add_command(ls_cli, name='ls') dbfs_group.add_command(mkdirs_cli, name='mkdirs') @@ -166,3 +208,5 @@ def cat_cli(api_client, src): dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') dbfs_group.add_command(cat_cli, name='cat') +dbfs_group.add_command(put_file_cli, name='put_file') +dbfs_group.add_command(put_content_cli, name='put_content') \ No newline at end of file diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index 70501e43..0cea8ef2 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -109,7 +109,7 @@ def close(self): # helper functions starting here - def perform_query(self, method, path, data = {}, headers = None): + def perform_query(self, method, path, data = {}, files=None, headers = None): """set up connection and perform query""" if headers is None: headers = self.default_headers @@ -125,8 +125,14 @@ def perform_query(self, method, path, data = {}, headers = None): resp = self.session.request(method, self.url + path, params = translated_data, verify = self.verify, headers = headers) else: - resp = self.session.request(method, self.url + path, data = json.dumps(data), - verify = self.verify, headers = headers) + if files is None: + resp = self.session.request(method, self.url + path, data = json.dumps(data), + verify = self.verify, headers = headers) + else: + print("here") + print(files) + resp = self.session.post(self.url + path, files=files, data=data, verify=self.verify, + headers=headers) try: resp.raise_for_status() except requests.exceptions.HTTPError as e: diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index c3cc1a7b..3b25b9fc 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -23,6 +23,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os + + class JobsService(object): def __init__(self, client): self.client = client @@ -519,15 +522,20 @@ def list_test(self, path, headers=None): _data['path'] = path return self.client.perform_query('GET', '/dbfs-testing/list', data=_data, headers=headers) - def put(self, path, contents=None, overwrite=None, headers=None): + def put(self, path, src_path=None, contents=None, overwrite=None, headers=None): _data = {} + _files = None if path is not None: _data['path'] = path if contents is not None: _data['contents'] = contents + elif src_path is not None: + filename = os.path.basename(src_path) + _files = {"file": (filename, open(src_path, 'rb'))} if overwrite is not None: _data['overwrite'] = overwrite - return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers) + print("query") + return self.client.perform_query('POST', '/dbfs/put', data=_data, files=_files, headers=headers) def put_test(self, path, contents=None, overwrite=None, headers=None): _data = {} From df78ca1a7a50209a33b7913985ad1c49e74e353c Mon Sep 17 00:00:00 2001 From: Ege E Date: Fri, 7 May 2021 14:06:26 +0100 Subject: [PATCH 03/26] Add working prototype with two interfaces. --- databricks_cli/dbfs/cli.py | 19 ++++--------------- databricks_cli/sdk/api_client.py | 6 ++---- databricks_cli/sdk/service.py | 20 +++++++++++++++----- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 1062a0c9..e3fd98d1 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -160,8 +160,8 @@ def cat_cli(api_client, src): @click.command(context_settings=CONTEXT_SETTINGS) -@click.argument('src_path', type=click.Path(exists=True)) -@click.argument('dbfs_path', nargs=-1, type=DbfsPathClickType()) +@click.argument('src_path', type=click.Path(exists=True, resolve_path=True)) +@click.argument('dbfs_path', type=DbfsPathClickType()) @click.option('--overwrite', is_flag=True, default=False) @debug_option @profile_option @@ -171,18 +171,13 @@ def put_file_cli(api_client, src_path, dbfs_path, overwrite, headers=None): """ Put file to DBFS with multipart form post. """ - if len(dbfs_path) == 0: - dbfs_path = DbfsPath('dbfs:/') - elif len(dbfs_path) == 1: - dbfs_path = dbfs_path[0] - else: - error_and_quit('ls can take a maximum of one path.') + print(src_path) DbfsApi(api_client).put_file(src_path, dbfs_path, overwrite, headers=headers) @click.command(context_settings=CONTEXT_SETTINGS) @click.argument('content') -@click.argument('dbfs_path', nargs=-1, type=DbfsPathClickType()) +@click.argument('dbfs_path', type=DbfsPathClickType()) @click.option('--overwrite', is_flag=True, default=False) @debug_option @profile_option @@ -192,12 +187,6 @@ def put_content_cli(api_client, content, dbfs_path, overwrite, headers=None): """ Put contents to a file in DBFS with multipart form post. """ - if len(dbfs_path) == 0: - dbfs_path = DbfsPath('dbfs:/') - elif len(dbfs_path) == 1: - dbfs_path = dbfs_path[0] - else: - error_and_quit('ls can take a maximum of one path.') DbfsApi(api_client).put_content(content, dbfs_path, overwrite, headers=headers) diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index 0cea8ef2..c1b4e344 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -129,10 +129,8 @@ def perform_query(self, method, path, data = {}, files=None, headers = None): resp = self.session.request(method, self.url + path, data = json.dumps(data), verify = self.verify, headers = headers) else: - print("here") - print(files) - resp = self.session.post(self.url + path, files=files, data=data, verify=self.verify, - headers=headers) + resp = self.session.request(method, self.url + path, files=files, data=data, + verify=self.verify, headers=headers) try: resp.raise_for_status() except requests.exceptions.HTTPError as e: diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 3b25b9fc..c1b39469 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -23,6 +23,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import base64 +import json import os @@ -528,13 +530,21 @@ def put(self, path, src_path=None, contents=None, overwrite=None, headers=None): if path is not None: _data['path'] = path if contents is not None: - _data['contents'] = contents - elif src_path is not None: - filename = os.path.basename(src_path) - _files = {"file": (filename, open(src_path, 'rb'))} + # Because terminal might add trailing newlines, they need to be encoded properly. + encoded_contents = base64.b64encode(contents.encode('utf-8')) + _data['contents'] = encoded_contents.decode("utf-8") if overwrite is not None: _data['overwrite'] = overwrite - print("query") + if src_path is not None: + # @self.client sets Content-Type 'text/json' by default. + # For multipart/form-data POST Content-Type should be set automatically + # to decode 'Boundary' parameter. + headers = {'Content-Type': None} + filename = os.path.basename(src_path) + _files = { + 'file': (filename, open(src_path, 'rb'), 'multipart/form-data') + } + # headers = {'Content-Type': 'multipart/form-data'} return self.client.perform_query('POST', '/dbfs/put', data=_data, files=_files, headers=headers) def put_test(self, path, contents=None, overwrite=None, headers=None): From ff1c4f14b440b92ff56e4e77726e078e97a28c7a Mon Sep 17 00:00:00 2001 From: Ege E Date: Fri, 7 May 2021 14:30:52 +0100 Subject: [PATCH 04/26] Update put_test. --- databricks_cli/sdk/service.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index c1b39469..72938a15 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -541,21 +541,28 @@ def put(self, path, src_path=None, contents=None, overwrite=None, headers=None): # to decode 'Boundary' parameter. headers = {'Content-Type': None} filename = os.path.basename(src_path) - _files = { - 'file': (filename, open(src_path, 'rb'), 'multipart/form-data') - } - # headers = {'Content-Type': 'multipart/form-data'} + _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} return self.client.perform_query('POST', '/dbfs/put', data=_data, files=_files, headers=headers) - def put_test(self, path, contents=None, overwrite=None, headers=None): + def put_test(self, path, src_path=None, contents=None, overwrite=None, headers=None): _data = {} + _files = None if path is not None: _data['path'] = path if contents is not None: - _data['contents'] = contents + # Because terminal might add trailing newlines, they need to be encoded properly. + encoded_contents = base64.b64encode(contents.encode('utf-8')) + _data['contents'] = encoded_contents.decode("utf-8") if overwrite is not None: _data['overwrite'] = overwrite - return self.client.perform_query('POST', '/dbfs-testing/put', data=_data, headers=headers) + if src_path is not None: + # @self.client sets Content-Type 'text/json' by default. + # For multipart/form-data POST Content-Type should be set automatically + # to decode 'Boundary' parameter. + headers = {'Content-Type': None} + filename = os.path.basename(src_path) + _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} + return self.client.perform_query('POST', '/dbfs/put', data=_data, files=_files, headers=headers) def mkdirs(self, path, headers=None): _data = {} From d396ec9cc3ff05866224f6f2639bf7e6ba433d9c Mon Sep 17 00:00:00 2001 From: Ege E Date: Fri, 7 May 2021 16:55:52 +0100 Subject: [PATCH 05/26] Remove interfaces. --- databricks_cli/dbfs/cli.py | 31 ------------------------------- databricks_cli/sdk/api_client.py | 2 +- databricks_cli/sdk/service.py | 3 +-- 3 files changed, 2 insertions(+), 34 deletions(-) diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index e3fd98d1..834a9ffd 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -159,37 +159,6 @@ def cat_cli(api_client, src): DbfsApi(api_client).cat(src) -@click.command(context_settings=CONTEXT_SETTINGS) -@click.argument('src_path', type=click.Path(exists=True, resolve_path=True)) -@click.argument('dbfs_path', type=DbfsPathClickType()) -@click.option('--overwrite', is_flag=True, default=False) -@debug_option -@profile_option -@eat_exceptions -@provide_api_client -def put_file_cli(api_client, src_path, dbfs_path, overwrite, headers=None): - """ - Put file to DBFS with multipart form post. - """ - print(src_path) - DbfsApi(api_client).put_file(src_path, dbfs_path, overwrite, headers=headers) - - -@click.command(context_settings=CONTEXT_SETTINGS) -@click.argument('content') -@click.argument('dbfs_path', type=DbfsPathClickType()) -@click.option('--overwrite', is_flag=True, default=False) -@debug_option -@profile_option -@eat_exceptions -@provide_api_client -def put_content_cli(api_client, content, dbfs_path, overwrite, headers=None): - """ - Put contents to a file in DBFS with multipart form post. - """ - DbfsApi(api_client).put_content(content, dbfs_path, overwrite, headers=headers) - - dbfs_group.add_command(configure_cli, name='configure') dbfs_group.add_command(ls_cli, name='ls') dbfs_group.add_command(mkdirs_cli, name='mkdirs') diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index c1b4e344..f9cb4301 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -109,7 +109,7 @@ def close(self): # helper functions starting here - def perform_query(self, method, path, data = {}, files=None, headers = None): + def perform_query(self, method, path, data = {}, headers = None, files=None): """set up connection and perform query""" if headers is None: headers = self.default_headers diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 72938a15..21cc892f 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -24,7 +24,6 @@ # limitations under the License. # import base64 -import json import os @@ -524,7 +523,7 @@ def list_test(self, path, headers=None): _data['path'] = path return self.client.perform_query('GET', '/dbfs-testing/list', data=_data, headers=headers) - def put(self, path, src_path=None, contents=None, overwrite=None, headers=None): + def put(self, path, contents=None, overwrite=None, headers=None, src_path=None): _data = {} _files = None if path is not None: From 24da3ad0aadfffc392f236070df93b5a83546d2e Mon Sep 17 00:00:00 2001 From: Ege E Date: Mon, 10 May 2021 10:36:46 +0100 Subject: [PATCH 06/26] Remove asserts in put test for open, add_block, close. --- tests/dbfs/test_api.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index c8de6899..4d01001f 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -133,13 +133,9 @@ def test_put_file(self, dbfs_api, tmpdir): api_mock = dbfs_api.client test_handle = 0 api_mock.create.return_value = {'handle': test_handle} + # Should succeed. dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - assert api_mock.add_block.call_count == 1 - assert test_handle == api_mock.add_block.call_args[0][0] - assert b64encode(b'test').decode() == api_mock.add_block.call_args[0][1] - assert api_mock.close.call_count == 1 - assert test_handle == api_mock.close.call_args[0][0] def test_get_file_check_overwrite(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') From 270d6d10ccc36729e70c60fea1b2def108444fb5 Mon Sep 17 00:00:00 2001 From: Ege E Date: Mon, 10 May 2021 10:41:23 +0100 Subject: [PATCH 07/26] Remove usesr facing add_command additions. Nit changes. --- databricks_cli/dbfs/api.py | 8 +++++--- databricks_cli/dbfs/cli.py | 4 +--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 11dd5543..524b58f7 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -21,7 +21,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from base64 import b64encode, b64decode +from base64 import b64decode import os import shutil @@ -114,10 +114,12 @@ def get_status(self, dbfs_path, headers=None): return FileInfo.from_json(json) def put_file(self, src_path, dbfs_path, overwrite, headers=None): - self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) + self.client.put(dbfs_path.absolute_path, src_path=src_path, + overwrite=overwrite, headers=headers) def put_content(self, content, dbfs_path, overwrite, headers=None): - self.client.put(dbfs_path.absolute_path, contents=content, overwrite=overwrite, headers=headers) + self.client.put(dbfs_path.absolute_path, contents=content, + overwrite=overwrite, headers=headers) def get_file(self, dbfs_path, dst_path, overwrite, headers=None): if os.path.exists(dst_path) and not overwrite: diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 834a9ffd..213499cd 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -165,6 +165,4 @@ def cat_cli(api_client, src): dbfs_group.add_command(rm_cli, name='rm') dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') -dbfs_group.add_command(cat_cli, name='cat') -dbfs_group.add_command(put_file_cli, name='put_file') -dbfs_group.add_command(put_content_cli, name='put_content') \ No newline at end of file +dbfs_group.add_command(cat_cli, name='cat') \ No newline at end of file From 46dd267963d5d8c3e53c9f69527dd19d2b7f85f5 Mon Sep 17 00:00:00 2001 From: Ege E Date: Mon, 10 May 2021 10:45:06 +0100 Subject: [PATCH 08/26] Remove extra empty line. --- tests/dbfs/test_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 4d01001f..aba44565 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -136,7 +136,6 @@ def test_put_file(self, dbfs_api, tmpdir): # Should succeed. dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - def test_get_file_check_overwrite(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'w') as f: From c6216da2a657fbfc9af45cb27105b2775e02b900 Mon Sep 17 00:00:00 2001 From: Ege E Date: Tue, 11 May 2021 19:35:57 +0100 Subject: [PATCH 09/26] Draft changes for autogen --- databricks_cli/dbfs/api.py | 3 +++ databricks_cli/dbfs/cli.py | 2 +- databricks_cli/sdk/api_client.py | 3 ++- databricks_cli/sdk/service.py | 9 ++------- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 524b58f7..98d52f86 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -113,10 +113,13 @@ def get_status(self, dbfs_path, headers=None): json = self.client.get_status(dbfs_path.absolute_path, headers=headers) return FileInfo.from_json(json) + # Two variations of put implemented. See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put + # @put_file() is for multipart-form-data file upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) + # @put_content() is for sending base64 content string. def put_content(self, content, dbfs_path, overwrite, headers=None): self.client.put(dbfs_path.absolute_path, contents=content, overwrite=overwrite, headers=headers) diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 213499cd..4355b6cc 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -165,4 +165,4 @@ def cat_cli(api_client, src): dbfs_group.add_command(rm_cli, name='rm') dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') -dbfs_group.add_command(cat_cli, name='cat') \ No newline at end of file +dbfs_group.add_command(cat_cli, name='cat') diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index f9cb4301..865f1209 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -125,10 +125,11 @@ def perform_query(self, method, path, data = {}, headers = None, files=None): resp = self.session.request(method, self.url + path, params = translated_data, verify = self.verify, headers = headers) else: - if files is None: + if 'file' not in data: resp = self.session.request(method, self.url + path, data = json.dumps(data), verify = self.verify, headers = headers) else: + # Multipart file upload resp = self.session.request(method, self.url + path, files=files, data=data, verify=self.verify, headers=headers) try: diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 21cc892f..3c4b3410 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -529,19 +529,14 @@ def put(self, path, contents=None, overwrite=None, headers=None, src_path=None): if path is not None: _data['path'] = path if contents is not None: - # Because terminal might add trailing newlines, they need to be encoded properly. - encoded_contents = base64.b64encode(contents.encode('utf-8')) - _data['contents'] = encoded_contents.decode("utf-8") + _data['contents'] = contents if overwrite is not None: _data['overwrite'] = overwrite if src_path is not None: - # @self.client sets Content-Type 'text/json' by default. - # For multipart/form-data POST Content-Type should be set automatically - # to decode 'Boundary' parameter. headers = {'Content-Type': None} filename = os.path.basename(src_path) _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} - return self.client.perform_query('POST', '/dbfs/put', data=_data, files=_files, headers=headers) + return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files) def put_test(self, path, src_path=None, contents=None, overwrite=None, headers=None): _data = {} From 52437ac85f05a177fec111efabe889972d90ef4e Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 12 May 2021 17:33:19 +0100 Subject: [PATCH 10/26] Paste auto-generated put(...) method to service.py. Fix api_client.py to check files argument. --- databricks_cli/dbfs/api.py | 9 ++------- databricks_cli/sdk/api_client.py | 2 +- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 98d52f86..4494d3c3 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -113,17 +113,12 @@ def get_status(self, dbfs_path, headers=None): json = self.client.get_status(dbfs_path.absolute_path, headers=headers) return FileInfo.from_json(json) - # Two variations of put implemented. See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put - # @put_file() is for multipart-form-data file upload. + # Single variation of put implemented. See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put + # @put_file() is for multipart file upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) - # @put_content() is for sending base64 content string. - def put_content(self, content, dbfs_path, overwrite, headers=None): - self.client.put(dbfs_path.absolute_path, contents=content, - overwrite=overwrite, headers=headers) - def get_file(self, dbfs_path, dst_path, overwrite, headers=None): if os.path.exists(dst_path) and not overwrite: raise LocalFileExistsException('{} exists already.'.format(dst_path)) diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index 865f1209..ba07a082 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -125,7 +125,7 @@ def perform_query(self, method, path, data = {}, headers = None, files=None): resp = self.session.request(method, self.url + path, params = translated_data, verify = self.verify, headers = headers) else: - if 'file' not in data: + if files is None: resp = self.session.request(method, self.url + path, data = json.dumps(data), verify = self.verify, headers = headers) else: From 08f451b51dafb403245504f79af6155e56a18b6a Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 13 May 2021 14:30:14 +0100 Subject: [PATCH 11/26] Add fall-back to put method if files are larger than 2gb. --- databricks_cli/dbfs/api.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 4494d3c3..b051ffe4 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -21,7 +21,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from base64 import b64decode +from base64 import b64encode, b64decode import os import shutil @@ -116,8 +116,20 @@ def get_status(self, dbfs_path, headers=None): # Single variation of put implemented. See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put # @put_file() is for multipart file upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): - self.client.put(dbfs_path.absolute_path, src_path=src_path, - overwrite=overwrite, headers=headers) + # If file size is >2Gb use streaming upload. + if os.path.getsize(src_path) <= 2147483648: + self.client.put(dbfs_path.absolute_path, src_path=src_path, + overwrite=overwrite, headers=headers) + else: + handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle'] + with open(src_path, 'rb') as local_file: + while True: + contents = local_file.read(BUFFER_SIZE_BYTES) + if len(contents) == 0: + break + # add_block should not take a bytes object. + self.client.add_block(handle, b64encode(contents).decode(), headers=headers) + self.client.close(handle, headers=headers) def get_file(self, dbfs_path, dst_path, overwrite, headers=None): if os.path.exists(dst_path) and not overwrite: From 09d61d208afe8cc1d5e41e9ca6a5678516226acb Mon Sep 17 00:00:00 2001 From: Ege E Date: Mon, 17 May 2021 10:23:57 +0100 Subject: [PATCH 12/26] Update put_file tests. --- tests/dbfs/test_api.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index aba44565..a5bdb4c9 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -129,13 +129,28 @@ def test_put_file(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'wt') as f: f.write('test') + api_mock = dbfs_api.client + test_handle = 0 + api_mock.create.return_value = {'handle': test_handle} + dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) + # Files > 2GB should use open, add_block, close stream upload. + def test_put_large_file(self, dbfs_api, tmpdir): + test_file_path = os.path.join(tmpdir.strpath, 'test') + with open(test_file_path, 'wt') as f: + # 2.1 GB file. + f.write('\0' * 2254856676) api_mock = dbfs_api.client test_handle = 0 api_mock.create.return_value = {'handle': test_handle} - # Should succeed. dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) + assert api_mock.add_block.call_count > 1 + assert test_handle == api_mock.add_block.call_args[0][0] + assert b64encode(b'\0').decode() == api_mock.add_block.call_args[0][1] + assert api_mock.close.call_count == 1 + assert test_handle == api_mock.close.call_args[0][0] + def test_get_file_check_overwrite(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'w') as f: From 947389e1997623253ec5f843466974d00e67ca6d Mon Sep 17 00:00:00 2001 From: Ege E Date: Mon, 17 May 2021 16:19:48 +0100 Subject: [PATCH 13/26] Edit test assert for large file upload. --- tests/dbfs/test_api.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index a5bdb4c9..81ba9aaa 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -144,10 +144,9 @@ def test_put_large_file(self, dbfs_api, tmpdir): test_handle = 0 api_mock.create.return_value = {'handle': test_handle} dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - - assert api_mock.add_block.call_count > 1 + add_block_expected = 2254856676 / (2*820) + assert api_mock.add_block.call_count == add_block_expected assert test_handle == api_mock.add_block.call_args[0][0] - assert b64encode(b'\0').decode() == api_mock.add_block.call_args[0][1] assert api_mock.close.call_count == 1 assert test_handle == api_mock.close.call_args[0][0] From dcd51a7cc0d8e3df7ed0748e9c74699f69696e23 Mon Sep 17 00:00:00 2001 From: Ege E Date: Tue, 18 May 2021 12:46:06 +0100 Subject: [PATCH 14/26] Fix expected add_block counts. --- tests/dbfs/test_api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 81ba9aaa..7578ca80 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -22,6 +22,7 @@ # limitations under the License. # pylint:disable=redefined-outer-name +import math from base64 import b64encode import os @@ -144,7 +145,7 @@ def test_put_large_file(self, dbfs_api, tmpdir): test_handle = 0 api_mock.create.return_value = {'handle': test_handle} dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - add_block_expected = 2254856676 / (2*820) + add_block_expected = math.ceil(2254856676 / (2*20)) assert api_mock.add_block.call_count == add_block_expected assert test_handle == api_mock.add_block.call_args[0][0] assert api_mock.close.call_count == 1 From 1abe4ff7a83554870516c536477168d5d1a25d8c Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 19 May 2021 10:18:48 +0100 Subject: [PATCH 15/26] Fix typo in power operator. --- tests/dbfs/test_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 7578ca80..6298651e 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -145,7 +145,7 @@ def test_put_large_file(self, dbfs_api, tmpdir): test_handle = 0 api_mock.create.return_value = {'handle': test_handle} dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - add_block_expected = math.ceil(2254856676 / (2*20)) + add_block_expected = math.ceil(2254856676 / (2**20)) assert api_mock.add_block.call_count == add_block_expected assert test_handle == api_mock.add_block.call_args[0][0] assert api_mock.close.call_count == 1 From a62e262b4bd93d81de9e78358496d2764cbcca71 Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 19 May 2021 21:39:33 +0100 Subject: [PATCH 16/26] Edit test endpoint. --- databricks_cli/sdk/service.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 3c4b3410..3820f9a4 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -23,7 +23,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import base64 import os @@ -544,19 +543,14 @@ def put_test(self, path, src_path=None, contents=None, overwrite=None, headers=N if path is not None: _data['path'] = path if contents is not None: - # Because terminal might add trailing newlines, they need to be encoded properly. - encoded_contents = base64.b64encode(contents.encode('utf-8')) - _data['contents'] = encoded_contents.decode("utf-8") + _data['contents'] = contents if overwrite is not None: _data['overwrite'] = overwrite if src_path is not None: - # @self.client sets Content-Type 'text/json' by default. - # For multipart/form-data POST Content-Type should be set automatically - # to decode 'Boundary' parameter. headers = {'Content-Type': None} filename = os.path.basename(src_path) _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} - return self.client.perform_query('POST', '/dbfs/put', data=_data, files=_files, headers=headers) + return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files) def mkdirs(self, path, headers=None): _data = {} From de5cbb7d58288b01b825067af1cc3a0a88222c03 Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 20 May 2021 10:51:23 +0100 Subject: [PATCH 17/26] Remove large file test because of failure. --- tests/dbfs/test_api.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 6298651e..5fb1d85c 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -135,21 +135,22 @@ def test_put_file(self, dbfs_api, tmpdir): api_mock.create.return_value = {'handle': test_handle} dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - # Files > 2GB should use open, add_block, close stream upload. - def test_put_large_file(self, dbfs_api, tmpdir): - test_file_path = os.path.join(tmpdir.strpath, 'test') - with open(test_file_path, 'wt') as f: - # 2.1 GB file. - f.write('\0' * 2254856676) - api_mock = dbfs_api.client - test_handle = 0 - api_mock.create.return_value = {'handle': test_handle} - dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - add_block_expected = math.ceil(2254856676 / (2**20)) - assert api_mock.add_block.call_count == add_block_expected - assert test_handle == api_mock.add_block.call_args[0][0] - assert api_mock.close.call_count == 1 - assert test_handle == api_mock.close.call_args[0][0] + # # Files > 2GB should use open, add_block, close stream upload. + # def test_put_large_file(self, dbfs_api, tmpdir): + # test_file_path = os.path.join(tmpdir.strpath, 'test') + # with open(test_file_path, 'wt') as f: + # # 2.1 GB file. + # f.write('\0' * 1) + # api_mock = dbfs_api.client + # test_handle = 0 + # api_mock.create.return_value = {'handle': test_handle} + # dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) + # add_block_expected = math.ceil(2254856676 / (2**20)) + # assert True + # # assert api_mock.add_block.call_count == add_block_expected + # # assert test_handle == api_mock.add_block.call_args[0][0] + # # assert api_mock.close.call_count == 1 + # # assert test_handle == api_mock.close.call_args[0][0] def test_get_file_check_overwrite(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') From 968731290aacb11b567bc42eff22cd866a6e32b4 Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 20 May 2021 12:02:38 +0100 Subject: [PATCH 18/26] Nit changes. --- databricks_cli/sdk/api_client.py | 4 ++-- databricks_cli/sdk/service.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index ba07a082..f1af7fec 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -130,8 +130,8 @@ def perform_query(self, method, path, data = {}, headers = None, files=None): verify = self.verify, headers = headers) else: # Multipart file upload - resp = self.session.request(method, self.url + path, files=files, data=data, - verify=self.verify, headers=headers) + resp = self.session.request(method, self.url + path, files = files, data = data, + verify = self.verify, headers = headers) try: resp.raise_for_status() except requests.exceptions.HTTPError as e: diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 3820f9a4..9489a760 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -537,7 +537,7 @@ def put(self, path, contents=None, overwrite=None, headers=None, src_path=None): _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files) - def put_test(self, path, src_path=None, contents=None, overwrite=None, headers=None): + def put_test(self, path, contents=None, overwrite=None, headers=None, src_path=None): _data = {} _files = None if path is not None: From c724a1ee330c43bc4f8446d42620d2e1e2c3e3ef Mon Sep 17 00:00:00 2001 From: Ege E Date: Mon, 24 May 2021 12:08:52 +0100 Subject: [PATCH 19/26] Lint fixes. --- databricks_cli/dbfs/api.py | 6 ++++-- tests/dbfs/test_api.py | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index b051ffe4..bd1e330d 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -113,7 +113,8 @@ def get_status(self, dbfs_path, headers=None): json = self.client.get_status(dbfs_path.absolute_path, headers=headers) return FileInfo.from_json(json) - # Single variation of put implemented. See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put + # Single variation of put implemented. + # See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put # @put_file() is for multipart file upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): # If file size is >2Gb use streaming upload. @@ -121,7 +122,8 @@ def put_file(self, src_path, dbfs_path, overwrite, headers=None): self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) else: - handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle'] + handle = self.client.create(dbfs_path.absolute_path, overwrite, + headers=headers)['handle'] with open(src_path, 'rb') as local_file: while True: contents = local_file.read(BUFFER_SIZE_BYTES) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 5fb1d85c..0d98dbbf 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -22,7 +22,6 @@ # limitations under the License. # pylint:disable=redefined-outer-name -import math from base64 import b64encode import os From 712e7d946b478abf8591cd2836f100ac133715ad Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 26 May 2021 12:11:03 +0100 Subject: [PATCH 20/26] Code review changes. --- databricks_cli/dbfs/api.py | 9 +++++---- tests/dbfs/test_api.py | 32 ++++++++++++++++---------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index bd1e330d..a6000185 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -86,6 +86,8 @@ class DbfsErrorCodes(object): class DbfsApi(object): + MULTIPART_UPLOAD_LIMIT = 2147483648 + def __init__(self, api_client): self.client = DbfsService(api_client) @@ -113,12 +115,11 @@ def get_status(self, dbfs_path, headers=None): json = self.client.get_status(dbfs_path.absolute_path, headers=headers) return FileInfo.from_json(json) - # Single variation of put implemented. - # See https://docs.databricks.com/dev-tools/api/latest/dbfs.html#put - # @put_file() is for multipart file upload. + # Method makes multipart/form-data file upload for files <2GB. + # Otherwise uses open, add-block, close methods for streaming upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): # If file size is >2Gb use streaming upload. - if os.path.getsize(src_path) <= 2147483648: + if os.path.getsize(src_path) < self.MULTIPART_UPLOAD_LIMIT: self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) else: diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 0d98dbbf..8e08eebf 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -134,22 +134,22 @@ def test_put_file(self, dbfs_api, tmpdir): api_mock.create.return_value = {'handle': test_handle} dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - # # Files > 2GB should use open, add_block, close stream upload. - # def test_put_large_file(self, dbfs_api, tmpdir): - # test_file_path = os.path.join(tmpdir.strpath, 'test') - # with open(test_file_path, 'wt') as f: - # # 2.1 GB file. - # f.write('\0' * 1) - # api_mock = dbfs_api.client - # test_handle = 0 - # api_mock.create.return_value = {'handle': test_handle} - # dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) - # add_block_expected = math.ceil(2254856676 / (2**20)) - # assert True - # # assert api_mock.add_block.call_count == add_block_expected - # # assert test_handle == api_mock.add_block.call_args[0][0] - # # assert api_mock.close.call_count == 1 - # # assert test_handle == api_mock.close.call_args[0][0] + # Should not call add-block since file is < 2GB + assert api_mock.add_block.call_count == 0 + + # Files > 2GB should use open, add_block, close stream upload. + def test_put_large_file(self, dbfs_api, tmpdir): + test_file_path = os.path.join(tmpdir.strpath, 'test') + with open(test_file_path, 'wt') as f: + f.write('\0' * 2) + api_mock = dbfs_api.client + # Make streaming upload threshold 2 bytes for testing. + dbfs_api.MULTIPART_UPLOAD_LIMIT = 2 + test_handle = 0 + api_mock.create.return_value = {'handle': test_handle} + dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) + assert api_mock.add_block.call_count == 1 + assert api_mock.close.call_count == 1 def test_get_file_check_overwrite(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') From 1dad15ef1a956341adb0883bc793413f189a813d Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 26 May 2021 23:09:24 +0100 Subject: [PATCH 21/26] Fix build. --- databricks_cli/dbfs/api.py | 2 ++ databricks_cli/dbfs/cli.py | 16 ++++++++++++++++ databricks_cli/sdk/api_client.py | 1 + databricks_cli/sdk/service.py | 1 + 4 files changed, 20 insertions(+) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index a6000185..9f3cf694 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -120,9 +120,11 @@ def get_status(self, dbfs_path, headers=None): def put_file(self, src_path, dbfs_path, overwrite, headers=None): # If file size is >2Gb use streaming upload. if os.path.getsize(src_path) < self.MULTIPART_UPLOAD_LIMIT: + print("using multipart") self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) else: + print("using streaming") handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle'] with open(src_path, 'rb') as local_file: diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 4355b6cc..68cf8400 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -158,6 +158,21 @@ def cat_cli(api_client, src): """ DbfsApi(api_client).cat(src) +@click.command(context_settings=CONTEXT_SETTINGS) +@click.argument('src_path', type=click.Path(exists=True, resolve_path=True)) +@click.argument('dbfs_path', type=DbfsPathClickType()) +@click.option('--overwrite', is_flag=True, default=False) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def put_file_cli(api_client, src_path, dbfs_path, overwrite, headers=None): + """ + Put file to DBFS with multipart form post. + """ + print(src_path) + DbfsApi(api_client).put_file(src_path, dbfs_path, overwrite, headers=headers) + dbfs_group.add_command(configure_cli, name='configure') dbfs_group.add_command(ls_cli, name='ls') @@ -166,3 +181,4 @@ def cat_cli(api_client, src): dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') dbfs_group.add_command(cat_cli, name='cat') +dbfs_group.add_command(put_file_cli, name='put_file') diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index f1af7fec..7acaf064 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -130,6 +130,7 @@ def perform_query(self, method, path, data = {}, headers = None, files=None): verify = self.verify, headers = headers) else: # Multipart file upload + print("sending file") resp = self.session.request(method, self.url + path, files = files, data = data, verify = self.verify, headers = headers) try: diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 9489a760..5f0b94b8 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -535,6 +535,7 @@ def put(self, path, contents=None, overwrite=None, headers=None, src_path=None): headers = {'Content-Type': None} filename = os.path.basename(src_path) _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} + print("adding source file") return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files) def put_test(self, path, contents=None, overwrite=None, headers=None, src_path=None): From 28451f011130c7534029621dc1c18b1506bb5464 Mon Sep 17 00:00:00 2001 From: Ege E Date: Wed, 26 May 2021 23:22:29 +0100 Subject: [PATCH 22/26] Revert "Fix build." This reverts commit 1dad15ef1a956341adb0883bc793413f189a813d. --- databricks_cli/dbfs/api.py | 2 -- databricks_cli/dbfs/cli.py | 16 ---------------- databricks_cli/sdk/api_client.py | 1 - databricks_cli/sdk/service.py | 1 - 4 files changed, 20 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index 9f3cf694..a6000185 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -120,11 +120,9 @@ def get_status(self, dbfs_path, headers=None): def put_file(self, src_path, dbfs_path, overwrite, headers=None): # If file size is >2Gb use streaming upload. if os.path.getsize(src_path) < self.MULTIPART_UPLOAD_LIMIT: - print("using multipart") self.client.put(dbfs_path.absolute_path, src_path=src_path, overwrite=overwrite, headers=headers) else: - print("using streaming") handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle'] with open(src_path, 'rb') as local_file: diff --git a/databricks_cli/dbfs/cli.py b/databricks_cli/dbfs/cli.py index 68cf8400..4355b6cc 100644 --- a/databricks_cli/dbfs/cli.py +++ b/databricks_cli/dbfs/cli.py @@ -158,21 +158,6 @@ def cat_cli(api_client, src): """ DbfsApi(api_client).cat(src) -@click.command(context_settings=CONTEXT_SETTINGS) -@click.argument('src_path', type=click.Path(exists=True, resolve_path=True)) -@click.argument('dbfs_path', type=DbfsPathClickType()) -@click.option('--overwrite', is_flag=True, default=False) -@debug_option -@profile_option -@eat_exceptions -@provide_api_client -def put_file_cli(api_client, src_path, dbfs_path, overwrite, headers=None): - """ - Put file to DBFS with multipart form post. - """ - print(src_path) - DbfsApi(api_client).put_file(src_path, dbfs_path, overwrite, headers=headers) - dbfs_group.add_command(configure_cli, name='configure') dbfs_group.add_command(ls_cli, name='ls') @@ -181,4 +166,3 @@ def put_file_cli(api_client, src_path, dbfs_path, overwrite, headers=None): dbfs_group.add_command(cp_cli, name='cp') dbfs_group.add_command(mv_cli, name='mv') dbfs_group.add_command(cat_cli, name='cat') -dbfs_group.add_command(put_file_cli, name='put_file') diff --git a/databricks_cli/sdk/api_client.py b/databricks_cli/sdk/api_client.py index 7acaf064..f1af7fec 100644 --- a/databricks_cli/sdk/api_client.py +++ b/databricks_cli/sdk/api_client.py @@ -130,7 +130,6 @@ def perform_query(self, method, path, data = {}, headers = None, files=None): verify = self.verify, headers = headers) else: # Multipart file upload - print("sending file") resp = self.session.request(method, self.url + path, files = files, data = data, verify = self.verify, headers = headers) try: diff --git a/databricks_cli/sdk/service.py b/databricks_cli/sdk/service.py index 5f0b94b8..9489a760 100755 --- a/databricks_cli/sdk/service.py +++ b/databricks_cli/sdk/service.py @@ -535,7 +535,6 @@ def put(self, path, contents=None, overwrite=None, headers=None, src_path=None): headers = {'Content-Type': None} filename = os.path.basename(src_path) _files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')} - print("adding source file") return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files) def put_test(self, path, contents=None, overwrite=None, headers=None, src_path=None): From 23d7f51125181c8519282febbabf60a3fad1b5d7 Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 27 May 2021 15:53:35 +0100 Subject: [PATCH 23/26] Revert some test changes. --- tests/dbfs/test_api.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 8e08eebf..d7492d84 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -129,6 +129,7 @@ def test_put_file(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'wt') as f: f.write('test') + api_mock = dbfs_api.client test_handle = 0 api_mock.create.return_value = {'handle': test_handle} @@ -141,7 +142,7 @@ def test_put_file(self, dbfs_api, tmpdir): def test_put_large_file(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'wt') as f: - f.write('\0' * 2) + f.write('test') api_mock = dbfs_api.client # Make streaming upload threshold 2 bytes for testing. dbfs_api.MULTIPART_UPLOAD_LIMIT = 2 @@ -149,7 +150,10 @@ def test_put_large_file(self, dbfs_api, tmpdir): api_mock.create.return_value = {'handle': test_handle} dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True) assert api_mock.add_block.call_count == 1 + assert test_handle == api_mock.add_block.call_args[0][0] + assert b64encode(b'test').decode() == api_mock.add_block.call_args[0][1] assert api_mock.close.call_count == 1 + assert test_handle == api_mock.close.call_args[0][0] def test_get_file_check_overwrite(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') From 5efe7990c92f569a43033b20ec876623d685cf79 Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 27 May 2021 15:54:47 +0100 Subject: [PATCH 24/26] Apply suggestions from code review Co-authored-by: Bogdan Ghita <57367018+bogdanghita-db@users.noreply.github.com> --- databricks_cli/dbfs/api.py | 2 +- tests/dbfs/test_api.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index a6000185..c18ce96e 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -116,7 +116,7 @@ def get_status(self, dbfs_path, headers=None): return FileInfo.from_json(json) # Method makes multipart/form-data file upload for files <2GB. - # Otherwise uses open, add-block, close methods for streaming upload. + # Otherwise uses create, add-block, close methods for streaming upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): # If file size is >2Gb use streaming upload. if os.path.getsize(src_path) < self.MULTIPART_UPLOAD_LIMIT: diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index d7492d84..73891a15 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -138,7 +138,7 @@ def test_put_file(self, dbfs_api, tmpdir): # Should not call add-block since file is < 2GB assert api_mock.add_block.call_count == 0 - # Files > 2GB should use open, add_block, close stream upload. + # Files > 2GB should use create, add_block, close stream upload. def test_put_large_file(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'wt') as f: From 40af579dfdaf4b8884a1b1616d2639483bde8a75 Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 27 May 2021 16:28:59 +0100 Subject: [PATCH 25/26] Fix comments characters. --- databricks_cli/dbfs/api.py | 2 +- tests/dbfs/test_api.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/databricks_cli/dbfs/api.py b/databricks_cli/dbfs/api.py index c18ce96e..0be47f57 100644 --- a/databricks_cli/dbfs/api.py +++ b/databricks_cli/dbfs/api.py @@ -116,7 +116,7 @@ def get_status(self, dbfs_path, headers=None): return FileInfo.from_json(json) # Method makes multipart/form-data file upload for files <2GB. - # Otherwise uses create, add-block, close methods for streaming upload. + # Otherwise uses create, add-block, close methods for streaming upload. def put_file(self, src_path, dbfs_path, overwrite, headers=None): # If file size is >2Gb use streaming upload. if os.path.getsize(src_path) < self.MULTIPART_UPLOAD_LIMIT: diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 73891a15..21453e5b 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -138,7 +138,7 @@ def test_put_file(self, dbfs_api, tmpdir): # Should not call add-block since file is < 2GB assert api_mock.add_block.call_count == 0 - # Files > 2GB should use create, add_block, close stream upload. + # Files > 2GB should use create, add_block, close stream upload. def test_put_large_file(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'wt') as f: From 795ba9fc2dab08543ef642462c2cc59963962fe7 Mon Sep 17 00:00:00 2001 From: Ege E Date: Thu, 27 May 2021 16:29:44 +0100 Subject: [PATCH 26/26] Add = char to file comment. --- tests/dbfs/test_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbfs/test_api.py b/tests/dbfs/test_api.py index 21453e5b..ded0c976 100644 --- a/tests/dbfs/test_api.py +++ b/tests/dbfs/test_api.py @@ -138,7 +138,7 @@ def test_put_file(self, dbfs_api, tmpdir): # Should not call add-block since file is < 2GB assert api_mock.add_block.call_count == 0 - # Files > 2GB should use create, add_block, close stream upload. + # Files >= 2GB should use create, add_block, close stream upload. def test_put_large_file(self, dbfs_api, tmpdir): test_file_path = os.path.join(tmpdir.strpath, 'test') with open(test_file_path, 'wt') as f: