Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
823e844
Bump version to 0.4.3.dev0
stormwindy Feb 26, 2021
dd9c750
Add draft changes.
stormwindy May 5, 2021
df78ca1
Add working prototype with two interfaces.
stormwindy May 7, 2021
6b1c654
Merge branch 'master' of github.com:databricks/databricks-cli into db…
stormwindy May 7, 2021
ff1c4f1
Update put_test.
stormwindy May 7, 2021
d396ec9
Remove interfaces.
stormwindy May 7, 2021
24da3ad
Remove asserts in put test for open, add_block, close.
stormwindy May 10, 2021
270d6d1
Remove usesr facing add_command additions. Nit changes.
stormwindy May 10, 2021
46dd267
Remove extra empty line.
stormwindy May 10, 2021
c6216da
Draft changes for autogen
stormwindy May 11, 2021
52437ac
Paste auto-generated put(...) method to service.py. Fix api_client.py…
stormwindy May 12, 2021
08f451b
Add fall-back to put method if files are larger than 2gb.
stormwindy May 13, 2021
09d61d2
Update put_file tests.
stormwindy May 17, 2021
947389e
Edit test assert for large file upload.
stormwindy May 17, 2021
dcd51a7
Fix expected add_block counts.
stormwindy May 18, 2021
1abe4ff
Fix typo in power operator.
stormwindy May 19, 2021
a62e262
Edit test endpoint.
stormwindy May 19, 2021
de5cbb7
Remove large file test because of failure.
stormwindy May 20, 2021
9687312
Nit changes.
stormwindy May 20, 2021
c724a1e
Lint fixes.
stormwindy May 24, 2021
712e7d9
Code review changes.
stormwindy May 26, 2021
1dad15e
Fix build.
stormwindy May 26, 2021
28451f0
Revert "Fix build."
stormwindy May 26, 2021
7d3fe8e
Merge branch 'master' of github.com:databricks/databricks-cli into db…
stormwindy May 26, 2021
23d7f51
Revert some test changes.
stormwindy May 27, 2021
5efe799
Apply suggestions from code review
May 27, 2021
40af579
Fix comments characters.
stormwindy May 27, 2021
795ba9f
Add = char to file comment.
stormwindy May 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions databricks_cli/dbfs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class DbfsErrorCodes(object):


class DbfsApi(object):
MULTIPART_UPLOAD_LIMIT = 2147483648

def __init__(self, api_client):
self.client = DbfsService(api_client)

Expand Down Expand Up @@ -113,16 +115,24 @@ def get_status(self, dbfs_path, headers=None):
json = self.client.get_status(dbfs_path.absolute_path, headers=headers)
return FileInfo.from_json(json)

# Method makes multipart/form-data file upload for files <2GB.
# Otherwise uses create, add-block, close methods for streaming upload.
def put_file(self, src_path, dbfs_path, overwrite, headers=None):
handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle']
with open(src_path, 'rb') as local_file:
while True:
contents = local_file.read(BUFFER_SIZE_BYTES)
if len(contents) == 0:
break
# add_block should not take a bytes object.
self.client.add_block(handle, b64encode(contents).decode(), headers=headers)
self.client.close(handle, headers=headers)
# If file size is >2Gb use streaming upload.
if os.path.getsize(src_path) < self.MULTIPART_UPLOAD_LIMIT:
self.client.put(dbfs_path.absolute_path, src_path=src_path,
overwrite=overwrite, headers=headers)
else:
handle = self.client.create(dbfs_path.absolute_path, overwrite,
headers=headers)['handle']
with open(src_path, 'rb') as local_file:
while True:
contents = local_file.read(BUFFER_SIZE_BYTES)
if len(contents) == 0:
break
# add_block should not take a bytes object.
self.client.add_block(handle, b64encode(contents).decode(), headers=headers)
self.client.close(handle, headers=headers)

def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
if os.path.exists(dst_path) and not overwrite:
Expand Down
11 changes: 8 additions & 3 deletions databricks_cli/sdk/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def close(self):

# helper functions starting here

def perform_query(self, method, path, data = {}, headers = None):
def perform_query(self, method, path, data = {}, headers = None, files=None):
"""set up connection and perform query"""
if headers is None:
headers = self.default_headers
Expand All @@ -125,8 +125,13 @@ def perform_query(self, method, path, data = {}, headers = None):
resp = self.session.request(method, self.url + path, params = translated_data,
verify = self.verify, headers = headers)
else:
resp = self.session.request(method, self.url + path, data = json.dumps(data),
verify = self.verify, headers = headers)
if files is None:
resp = self.session.request(method, self.url + path, data = json.dumps(data),
verify = self.verify, headers = headers)
else:
# Multipart file upload
resp = self.session.request(method, self.url + path, files = files, data = data,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see for this case we're passing data directly instead of json.dumps(data) like we do above. I'm just curious to know if this is what's expected for multi-part upload. Is data actually used in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With json.dumps it used to fail creating the correct request. I saw somewhere the solution to the error was to pass the data object directly and request library handles it itself. I will try to find the thread about it.

verify = self.verify, headers = headers)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
Expand Down
21 changes: 17 additions & 4 deletions databricks_cli/sdk/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os


class JobsService(object):
def __init__(self, client):
self.client = client
Expand Down Expand Up @@ -519,25 +522,35 @@ def list_test(self, path, headers=None):
_data['path'] = path
return self.client.perform_query('GET', '/dbfs-testing/list', data=_data, headers=headers)

def put(self, path, contents=None, overwrite=None, headers=None):
def put(self, path, contents=None, overwrite=None, headers=None, src_path=None):
_data = {}
_files = None
if path is not None:
_data['path'] = path
if contents is not None:
_data['contents'] = contents
if overwrite is not None:
_data['overwrite'] = overwrite
return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers)
if src_path is not None:
headers = {'Content-Type': None}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right, The content Type is expected to be something of this format
Content-Type: multipart/form-data; boundary=something
See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type

See how we check whether a request is a multipart upload or not // https://livegrep.dev.databricks.com/view/databricks/universe/daemon/data/daemon/src/main/scala/com/databricks/backend/daemon/data/server/meta/DbfsFileUploadDownloadBackend.scala#L305

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked this quite a lot. If we set the content-type manually, requests library forces programmer to define other required fields such as boundary. If the content-type is not set (or None), requests library automatically fills them. If a files parameter is passed to the call, it will automatically generate Content-Type: multipart/form-data; boundary=something.

A lot of answers on stack I checked were against setting 'content-type' in this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a detailed comment explaining this and assert that in a test to make sure that is true?

filename = os.path.basename(src_path)
_files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this struct/tuple you're passing for files? Is this format defined somewhere or did you create it? How does the request know to send these files as a multipart upload?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you go to perform_query, it passes a file= argument to the request. When that is the case, POST requests becomes a multipart upload. It is explained in requests docs: https://docs.python-requests.org/en/master/user/quickstart/#post-a-multipart-encoded-file.

return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files)

def put_test(self, path, contents=None, overwrite=None, headers=None):
def put_test(self, path, contents=None, overwrite=None, headers=None, src_path=None):
_data = {}
_files = None
if path is not None:
_data['path'] = path
if contents is not None:
_data['contents'] = contents
if overwrite is not None:
_data['overwrite'] = overwrite
return self.client.perform_query('POST', '/dbfs-testing/put', data=_data, headers=headers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What path is this exactly? I don't see a mention of this in the codebase except in service.proto ? I don't think these are used. We should create a ticket to clean this up from universe? cc @bogdanghita-db

  rpc putTest(Put) returns (Put.Response) {
    option (rpc) = {
      endpoints: {
        method: "POST",
        path: "/dbfs-testing/put",
        since: { major: 2, minor: 0 },
      },
      visibility: PUBLIC,
    };
  }

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is generated based on the proto definitions in universe. It's not intended to be edited manually.

@gotibhai The dbfs-testing/... definitions will be deleted from service.proto as part of SC-50539.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for letting me know about this. @bogdanghita-db should I edit the service.proto file to have new parameters added? I will have to add src_path if we want to keep current design choices on how to implement new put. (the ones I did.)

if src_path is not None:
headers = {'Content-Type': None}
filename = os.path.basename(src_path)
_files = {'file': (filename, open(src_path, 'rb'), 'multipart/form-data')}
return self.client.perform_query('POST', '/dbfs/put', data=_data, headers=headers, files=_files)

def mkdirs(self, path, headers=None):
_data = {}
Expand Down
14 changes: 14 additions & 0 deletions tests/dbfs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,20 @@ def test_put_file(self, dbfs_api, tmpdir):
api_mock.create.return_value = {'handle': test_handle}
dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test for both ways of doing a put? with contents and with a file?


# Should not call add-block since file is < 2GB
assert api_mock.add_block.call_count == 0

# Files >= 2GB should use create, add_block, close stream upload.
def test_put_large_file(self, dbfs_api, tmpdir):
test_file_path = os.path.join(tmpdir.strpath, 'test')
with open(test_file_path, 'wt') as f:
f.write('test')
api_mock = dbfs_api.client
# Make streaming upload threshold 2 bytes for testing.
dbfs_api.MULTIPART_UPLOAD_LIMIT = 2
test_handle = 0
api_mock.create.return_value = {'handle': test_handle}
dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True)
assert api_mock.add_block.call_count == 1
Comment thread
stormwindy marked this conversation as resolved.
assert test_handle == api_mock.add_block.call_args[0][0]
assert b64encode(b'test').decode() == api_mock.add_block.call_args[0][1]
Expand Down