Skip to content

Commit

Permalink
Improve notebook APIs: add timestamp, handle on-exists better. (#396)
Browse files Browse the repository at this point in the history
- Adds a timestamp parameter to the notebook downloaders to provide
  time travel directly from the notebook API.
- Sets default values for upload functions to avoid needing to pass
  every single parameter.
- Handles overwrite-on-exists case better; will not fail if the array
  doesn't yet exist.
  • Loading branch information
thetorpedodog committed May 1, 2023
1 parent 57903b9 commit f8f80df
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 82 deletions.
7 changes: 6 additions & 1 deletion src/tiledb/cloud/_common/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import base64
import datetime
import functools
import logging
import sys
import threading
import urllib.parse
from typing import Any, Callable, Optional, TypeVar
from typing import Any, Callable, Optional, TypeVar, Union

import cloudpickle
import urllib3
Expand Down Expand Up @@ -107,3 +108,7 @@ def release_connection(resp: urllib3.HTTPResponse) -> None:
"""
resp.drain_conn()
resp.release_conn()


def datetime_to_msec(t: Union[datetime.datetime, int, None]) -> Optional[int]:
return int(t.timestamp() * 1000) if isinstance(t, datetime.datetime) else t
150 changes: 71 additions & 79 deletions src/tiledb/cloud/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
is assumed to be encoded as UTF-8.
"""
import datetime
import enum
import posixpath
import time
from typing import Optional, Tuple
from typing import Optional, Tuple, Union

import numpy

Expand All @@ -15,6 +16,7 @@
from tiledb.cloud import client
from tiledb.cloud import rest_api
from tiledb.cloud import tiledb_cloud_error
from tiledb.cloud._common import utils
from tiledb.cloud.rest_api import ApiException as GenApiException

RESERVED_NAMESPACES = frozenset(["cloud", "owned", "public", "shared"])
Expand Down Expand Up @@ -50,7 +52,7 @@ def rename_notebook(
return api_instance.update_notebook_name(
namespace=namespace,
array=current_notebook_name,
notebook_metadata=rest_api.models.ArrayInfoUpdate(
notebook_metadata=rest_api.ArrayInfoUpdate(
name=notebook_name,
uri=tiledb_uri,
access_credentials_name=access_credentials_name,
Expand All @@ -64,71 +66,50 @@ def rename_notebook(
def download_notebook_to_file(
tiledb_uri: str,
ipynb_file_name: str,
*,
timestamp: Union[int, datetime.datetime, None] = None,
) -> None:
"""
Downloads a notebook file from TileDB Cloud to local disk.
"""Downloads a notebook file from TileDB Cloud to local disk.
:param tiledb_uri: such as "tiledb://TileDB-Inc/quickstart_dense".
:param ipynb_file_name: path to save to, such as "./mycopy.ipynb". Must be
local; no S3 URI support at present.
local; no other VFS backends are currently supported.
:param timestamp: If set, the timestamp to download the notebook as of.
"""
ipynb_file_contents = download_notebook_contents(
tiledb_uri,
)
vfs = tiledb.VFS(tiledb.cloud.Ctx().config())
with tiledb.FileIO(vfs, ipynb_file_name, mode="wb") as fio:
ipynb_file_contents = download_notebook_contents(tiledb_uri, timestamp=timestamp)
vfs = tiledb.VFS(ctx=client.Ctx())
with vfs.open(ipynb_file_name, mode="wb") as fio:
fio.write(bytes(ipynb_file_contents, "utf-8"))


def download_notebook_contents(
tiledb_uri: str,
tiledb_uri: str, *, timestamp: Union[int, datetime.datetime, None] = None
) -> str:
"""
Downloads a notebook file from TileDB Cloud to contents as a string,
nominally in JSON format.
:param tiledb_uri: such as "tiledb://TileDB-Inc/quickstart_dense".
:param timestamp: If set, the timestamp to download the notebook as of.
:return: contents of the notebook file as a string, nominally in JSON format.
"""
ctx = tiledb.cloud.Ctx({})
with tiledb.open(tiledb_uri, "r", ctx=ctx) as arr:
ctx = client.Ctx()
with tiledb.open(
tiledb_uri, "r", ctx=ctx, timestamp=utils.datetime_to_msec(timestamp)
) as arr:
size = arr.meta["file_size"]
data = arr.query(attrs=["contents"])[slice(0, size)]
json = data["contents"].tobytes().decode("utf-8")
return json


# TODO: auto-increment/overwrite logic
# If the destination array name already exists -- e.g. uploading 'foo.ipynb' to
# 'testing-upload' -- there are three options:
# 1. Fail the upload with 'already exists' and require the user to supply a
# different path. No clobbering
# 2. Auto-increment the array name, e.g. from 'testing-upload' to 'testing-upload-1'
# and then 'testing-upload-2' the next time, and so on.
# 3. Overwrite
#
# Thoughts:
# * Option 3 isn't a safe default -- for those who want it it's fine but for
# those who don't it can be seen as unwelcome data loss.
# * Option 2 is a not-bad default -- there is no data loss, but some users
# might be left feeling 'Why are you creating all these versions? I just
# want to update one notebook, not have twenty copies."
# * Option 1 is a safe default -- there is no data loss and no profusion of
# copies. However, it is more frictional for the user, requiring them to
# make the decision.
#
# Implementation:
#
# * We could have a force-overwrite argument, optional, default False.
# * We could have a behavior-on-exist argument, of enum type, 3 cases, one
# for each of the options above.
#
# Status: As of this writing: we have implemented option 1, and we don't have
# an overwrite/update-in-place flag.
def upload_notebook_from_file(
ipynb_file_name: str,
namespace: str,
*,
namespace: Optional[str] = None,
array_name: str,
storage_path: Optional[str],
storage_credential_name: Optional[str],
storage_path: Optional[str] = None,
storage_credential_name: Optional[str] = None,
on_exists: OnExists = OnExists.FAIL,
) -> str:
"""
Expand All @@ -145,26 +126,27 @@ def upload_notebook_from_file(
:return: TileDB array name, such as "tiledb://janedoe/testing-upload".
"""

vfs = tiledb.VFS(tiledb.cloud.Ctx().config())
with tiledb.FileIO(vfs, ipynb_file_name, mode="rb") as fio:
vfs = tiledb.VFS(ctx=client.Ctx())
with vfs.open(ipynb_file_name, mode="rb") as fio:
ipynb_file_contents = fio.read()

return upload_notebook_contents(
str(ipynb_file_contents, "utf-8"),
storage_path,
array_name,
namespace,
storage_credential_name,
on_exists,
namespace=namespace,
array_name=array_name,
storage_path=storage_path,
storage_credential_name=storage_credential_name,
on_exists=on_exists,
)


def upload_notebook_contents(
ipynb_file_contents: str,
storage_path: Optional[str],
*,
namespace: Optional[str] = None,
array_name: str,
namespace: str,
storage_credential_name: Optional[str],
storage_path: Optional[str] = None,
storage_credential_name: Optional[str] = None,
on_exists: OnExists,
) -> str:
"""
Expand All @@ -181,37 +163,47 @@ def upload_notebook_contents(
:return: TileDB array name, such as "tiledb://janedoe/testing-upload".
"""

if storage_credential_name is None:
storage_credential_name = (
tiledb.cloud.user_profile().default_s3_path_credentials_name
)
if storage_path is None:
storage_path = tiledb.cloud.user_profile().default_s3_path
storage_credential_name = storage_credential_name or (
client.default_user().default_s3_path_credentials_name
)
storage_path = storage_path or client.default_user().default_s3_path
namespace = namespace or client.default_user().username

if storage_credential_name is None:
if not storage_credential_name:
raise tiledb_cloud_error.TileDBCloudError(
f"No storage credentials found in account. Please add them there, or pass them in explicitly here."
) from e
if storage_path is None:
"No storage credentials found in account."
" Please add them there, or pass them in explicitly here."
)
if not storage_path:
raise tiledb_cloud_error.TileDBCloudError(
f"No storage path found in account. Please add it there, or pass it in explicitly here."
) from e
"No storage path found in account."
" Please add it there, or pass it in explicitly here."
)
if not namespace:
raise tiledb_cloud_error.TileDBCloudError("Namespace to store notebook unset.")

ctx = tiledb.cloud.Ctx(
{"rest.creation_access_credentials_name": storage_credential_name}
)
ctx = client.Ctx({"rest.creation_access_credentials_name": storage_credential_name})

if on_exists is OnExists.FAIL:
tiledb_uri = f"tiledb://{namespace}/{array_name}"
try:
arr = tiledb.open(tiledb_uri, ctx=ctx)
except tiledb.TileDBError:
already_exists = False
else:
arr.close()
already_exists = True
if already_exists:
if on_exists is OnExists.FAIL:
raise tiledb_cloud_error.TileDBCloudError(
f"Array {tiledb_uri!r} already exists"
)
else:
tiledb_uri, array_name = _create_notebook_array(
storage_path,
array_name,
namespace,
ctx,
)
else:
# The array should already exist
tiledb_uri = f"tiledb://{posixpath.join(namespace, array_name)}"

_write_notebook_to_array(tiledb_uri, ipynb_file_contents, ctx)

return tiledb_uri
Expand Down Expand Up @@ -258,14 +250,13 @@ def _create_notebook_array(
tries = 1 + retries # 1st + rest
while True:
try:
tiledb_uri, array_name = _create_notebook_array_retry_helper(
return _create_notebook_array_retry_helper(
storage_path,
array_name,
namespace,
dom,
ctx,
)
return (tiledb_uri, array_name)
except tiledb.TileDBError as e:
if "Error while listing with prefix" in str(e):
# It is possible to land here if user sets wrong default S3
Expand All @@ -275,7 +266,8 @@ def _create_notebook_array(
) from e
if "Cannot create array" in str(e) and "already exists" in str(e):
raise tiledb_cloud_error.TileDBCloudError(
f"Error creating file: {array_name!r} already exists in namespace {namespace!r}."
f"Error creating file: {array_name!r} already exists"
f" in namespace {namespace!r}."
)
# Retry other TileDB errors
tries -= 1
Expand All @@ -289,7 +281,7 @@ def _create_notebook_array_retry_helper(
namespace: str,
dom: tiledb.Domain,
ctx: tiledb.Ctx,
) -> Tuple[bool, str, str]:
) -> Tuple[str, str]:
"""
See _create_notebook_array -- exists only for retry logic.
:return: tuple of succeeded, tiledb_uri, and array_name
Expand Down Expand Up @@ -323,7 +315,7 @@ def _create_notebook_array_retry_helper(

array.update_file_properties(
uri=tiledb_uri,
file_type=tiledb.cloud.rest_api.models.FileType.NOTEBOOK,
file_type=rest_api.FileType.NOTEBOOK,
# If file_properties is empty, don't send anything at all.
file_properties=file_properties or None,
)
Expand Down Expand Up @@ -356,5 +348,5 @@ def _write_notebook_to_array(
with tiledb.open(tiledb_uri, mode="w", ctx=ctx) as arr:
arr[0 : len(contents_as_array)] = {"contents": contents_as_array}
arr.meta["file_size"] = len(contents_as_array)
arr.meta["type"] = file_type = tiledb.cloud.rest_api.models.FileType.NOTEBOOK
arr.meta["type"] = rest_api.FileType.NOTEBOOK
arr.meta["format"] = "json"
27 changes: 25 additions & 2 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import base64
import datetime
import pickle
import unittest

import pytz # Test-only dependency.

from tiledb.cloud._common import utils


class PickleTest(unittest.TestCase):
def test_roundtrip(self):
class UtilsTest(unittest.TestCase):
def test_pickle_roundtrip(self):
cases = (
None,
("a", 1),
Expand All @@ -18,6 +21,26 @@ def test_roundtrip(self):
unpickled = _b64_unpickle(pickled)
self.assertEqual(unpickled, c)

def test_datetime_to_msec(self):
cases = [
(0, 0),
(
datetime.datetime(
1970, 2, 3, microsecond=123999, tzinfo=datetime.timezone.utc
),
2851200123,
),
(
pytz.timezone("America/New_York").localize(
datetime.datetime(2023, 4, 28, 12, 34, 56, 789012)
),
1682699696789,
),
]
for inval, expected in cases:
with self.subTest(inval):
self.assertEqual(utils.datetime_to_msec(inval), expected)


def _b64_unpickle(x):
raw = base64.b64decode(x)
Expand Down

0 comments on commit f8f80df

Please sign in to comment.