Skip to content

Commit

Permalink
Merge pull request #910 from activeloopai/feature/2.0/integrate_cache
Browse files Browse the repository at this point in the history
[2.0] Integrate cache with Dataset API
  • Loading branch information
imshashank committed Jun 4, 2021
2 parents 48d34da + 3e93cef commit 9783e0e
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 66 deletions.
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ def _get_s3_provider(request):
return _get_storage_provider(request, S3)


def _get_dataset(provider: StorageProvider):
return Dataset(provider=provider)
def _get_dataset(storage: StorageProvider):
return Dataset(storage=storage)


@pytest.fixture
Expand Down
74 changes: 51 additions & 23 deletions hub/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,29 @@

from hub.core.typing import StorageProvider
from hub.util.index import Index
from hub.util.path import provider_from_path
from hub.util.exceptions import (
InvalidKeyTypeError,
TensorAlreadyExistsError,
TensorDoesNotExistError,
UnsupportedTensorTypeError,
)
from hub.util.path import provider_from_path
from hub.util.cache_chain import generate_chain
from hub.util.path import storage_provider_from_path
from hub.constants import DEFAULT_MEMORY_CACHE_SIZE, DEFAULT_LOCAL_CACHE_SIZE, MB

# Used to distinguish between attributes and items (tensors)
DATASET_RESERVED_ATTRIBUTES = ["path", "mode", "index", "provider", "tensors"]
DATASET_RESERVED_ATTRIBUTES = ["path", "mode", "index", "storage", "tensors"]


class Dataset:
def __init__(
self,
path: str = "",
mode: str = "a",
provider: Optional[StorageProvider] = None,
index: Union[int, slice, Index] = None,
memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE,
local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE,
storage: Optional[StorageProvider] = None,
):
"""Initialize a new or existing dataset.
Expand All @@ -46,32 +49,38 @@ def __init__(
mode (str): Mode in which the dataset is opened.
Supported modes include ("r", "w", "a") plus an optional "+" suffix.
Defaults to "a".
provider (StorageProvider, optional): The storage provider used to access
the data stored by this dataset.
index: The Index object restricting the view of this dataset's tensors.
Can be an int, slice, or (used internally) an Index object.
memory_cache_size (int): The size of the memory cache to be used in MB.
local_cache_size (int): The size of the local filesystem cache to be used in MB.
storage (StorageProvider, optional): The storage provider used to access
the data stored by this dataset. If this is specified, the path given is ignored.
Raises:
ValueError: If an existing local path is given, it must be a directory.
UserWarning: Both a path and provider should not be given.
UserWarning: Both path and storage should not be given.
"""
self.mode = mode
self.index = Index(index)

if provider is not None and path:
if storage is not None and path:
warnings.warn(
"Dataset should not be constructed with both provider and path."
"Dataset should not be constructed with both storage and path. Ignoring path and using storage."
)
self.provider = provider or provider_from_path(path)

base_storage = storage or storage_provider_from_path(path)
memory_cache_size_bytes = memory_cache_size * MB
local_cache_size_bytes = local_cache_size * MB
self.storage = generate_chain(
base_storage, memory_cache_size_bytes, local_cache_size_bytes, path
)
self.tensors: Dict[str, Tensor] = {}

if dataset_exists(self.provider):
ds_meta = read_dataset_meta(self.provider)
if dataset_exists(self.storage):
ds_meta = read_dataset_meta(self.storage)
for tensor_name in ds_meta["tensors"]:
self.tensors[tensor_name] = Tensor(tensor_name, self.provider)
self.tensors[tensor_name] = Tensor(tensor_name, self.storage)
else:
write_dataset_meta(self.provider, {"tensors": []})
write_dataset_meta(self.storage, {"tensors": []})

def __len__(self):
"""Return the greatest length of tensors"""
Expand All @@ -85,28 +94,25 @@ def __getitem__(self, item: Union[str, int, slice, Index]):
return self.tensors[item][self.index]
elif isinstance(item, (int, slice, Index)):
new_index = self.index[Index(item)]
return Dataset(mode=self.mode, provider=self.provider, index=new_index)
return Dataset(mode=self.mode, storage=self.storage, index=new_index)
else:
raise InvalidKeyTypeError(item)

def __setitem__(self, item: Union[slice, str], value):
if isinstance(item, str):
tensor_key = item

if tensor_exists(tensor_key, self.provider):
if tensor_exists(tensor_key, self.storage):
raise TensorAlreadyExistsError(tensor_key)

if isinstance(value, np.ndarray):
tensor_meta = tensor_meta_from_array(value, batched=True)

ds_meta = read_dataset_meta(self.provider)
ds_meta = read_dataset_meta(self.storage)
ds_meta["tensors"].append(tensor_key)
write_dataset_meta(self.provider, ds_meta)

tensor = Tensor(tensor_key, self.provider, tensor_meta=tensor_meta)
write_dataset_meta(self.storage, ds_meta)
tensor = Tensor(tensor_key, self.storage, tensor_meta=tensor_meta)
self.tensors[tensor_key] = tensor
tensor.append(value, batched=True)

return tensor
else:
raise UnsupportedTensorTypeError(item)
Expand All @@ -126,6 +132,28 @@ def __iter__(self):
for i in range(len(self)):
yield self[i]

def flush(self):
"""Necessary operation after writes if caches are being used.
Writes all the dirty data from the cache layers (if any) to the underlying storage.
Here dirty data corresponds to data that has been changed/assigned and but hasn't yet been sent to the underlying storage.
"""
self.storage.flush()

def clear_cache(self):
"""Flushes (see Dataset.flush documentation) the contents of the cache layers (if any) and then deletes contents of all the layers of it.
This doesn't delete data from the actual storage.
This is useful if you have multiple datasets with memory caches open, taking up too much RAM.
Also useful when local cache is no longer needed for certain datasets and is taking up storage space.
"""
if hasattr(self.storage, "clear_cache"):
self.storage.clear_cache()

def delete(self):
"""Deletes the entire dataset from the cache layers (if any) and the underlying storage.
This is an IRREVERSIBLE operation. Data once deleted can not be recovered.
"""
self.storage.clear()

@staticmethod
def from_path(path: str):
"""Create a local hub dataset from unstructured data.
Expand Down
24 changes: 11 additions & 13 deletions hub/api/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Tensor:
def __init__(
self,
key: str,
provider: StorageProvider,
storage: StorageProvider,
tensor_meta: dict = None,
index: Union[int, slice, Index] = None,
):
Expand All @@ -32,7 +32,7 @@ def __init__(
Args:
key (str): The internal identifier for this tensor.
provider (StorageProvider): The storage provider for the parent dataset.
storage (StorageProvider): The storage provider for the parent dataset.
tensor_meta (dict): For internal use only. If a tensor with `key` doesn't exist, a new tensor is created with this meta.
index: The Index object restricting the view of this tensor.
Can be an int, slice, or (used internally) an Index object.
Expand All @@ -41,35 +41,33 @@ def __init__(
TensorDoesNotExistError: If no tensor with `key` exists and a `tensor_meta` was not provided.
"""
self.key = key
self.provider = provider
self.storage = storage
self.index = Index(index)

if tensor_exists(self.key, self.provider):
if tensor_exists(self.key, self.storage):
if tensor_meta is not None:
warnings.warn(
"Tensor should not be constructed with tensor_meta if a tensor already exists. Ignoring incoming tensor_meta. Key: {}".format(
self.key
)
)

else:
if tensor_meta is None:
raise TensorDoesNotExistError(self.key)

create_tensor(self.key, self.provider, tensor_meta)
create_tensor(self.key, self.storage, tensor_meta)

def append(self, array: np.ndarray, batched: bool):
# TODO: split into `append`/`extend`
add_samples_to_tensor(
array,
self.key,
storage=self.provider,
storage=self.storage,
batched=batched,
)

@property
def meta(self):
return read_tensor_meta(self.key, self.provider)
return read_tensor_meta(self.key, self.storage)

@property
def shape(self):
Expand All @@ -81,7 +79,7 @@ def __len__(self):
return self.meta["length"]

def __getitem__(self, item: Union[int, slice, Index]):
return Tensor(self.key, self.provider, index=self.index[item])
return Tensor(self.key, self.storage, index=self.index[item])

def __setitem__(self, item: Union[int, slice], value: np.ndarray):
sliced_self = self[item]
Expand All @@ -90,13 +88,13 @@ def __setitem__(self, item: Union[int, slice], value: np.ndarray):
"Assignment to Tensor subsections not currently supported!"
)
else:
if tensor_exists(self.key, self.provider):
if tensor_exists(self.key, self.storage):
raise TensorAlreadyExistsError(self.key)

add_samples_to_tensor(
array=value,
key=self.key,
storage=self.provider,
storage=self.storage,
batched=True,
)

Expand All @@ -110,4 +108,4 @@ def numpy(self):
Returns:
A numpy array containing the data represented by this tensor.
"""
return read_samples_from_tensor(self.key, self.provider, self.index)
return read_samples_from_tensor(self.key, self.storage, self.index)
23 changes: 19 additions & 4 deletions hub/api/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,39 @@
from hub.core.tests.common import parametrize_all_dataset_storages


def test_persist_local(local_storage):
def test_persist_local_flush(local_storage):
if local_storage is None:
pytest.skip()

ds = Dataset(local_storage.root)
ds = Dataset(local_storage.root, local_cache_size=512)
ds.image = np.ones((4, 4096, 4096))
ds.flush()
ds_new = Dataset(local_storage.root)
assert len(ds_new) == 4
assert ds_new.image.shape == (4096, 4096)
np.testing.assert_array_equal(ds_new.image.numpy(), np.ones((4, 4096, 4096)))
ds.delete()


def test_persist_local_clear_cache(local_storage):
if local_storage is None:
pytest.skip()

ds = Dataset(local_storage.root, local_cache_size=512)
ds.image = np.ones((4, 4096, 4096))
ds.clear_cache()
ds_new = Dataset(local_storage.root)
assert len(ds_new) == 4
assert ds_new.image.shape == (4096, 4096)
np.testing.assert_array_equal(ds_new.image.numpy(), np.ones((4, 4096, 4096)))
ds.delete()


@parametrize_all_dataset_storages
def test_populate_dataset(ds):
assert read_dataset_meta(ds.provider) == {"tensors": []}
assert read_dataset_meta(ds.storage) == {"tensors": []}
ds.image = np.ones((4, 28, 28))
assert read_dataset_meta(ds.provider) == {"tensors": ["image"]}
assert read_dataset_meta(ds.storage) == {"tensors": ["image"]}
assert len(ds) == 4


Expand Down
4 changes: 4 additions & 0 deletions hub/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
MIN_FIRST_CACHE_SIZE = 32 * MB
MIN_SECOND_CACHE_SIZE = 160 * MB

# without MB multiplication, meant for the Dataset API that takes cache size in MBs
DEFAULT_MEMORY_CACHE_SIZE = 256
DEFAULT_LOCAL_CACHE_SIZE = 0

CHUNKS_FOLDER = "chunks"
DATASET_META_FILENAME = "dataset_meta.json"
TENSOR_META_FILENAME = "tensor_meta.json"
Expand Down
3 changes: 1 addition & 2 deletions hub/core/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,12 @@ def _check_is_file(self, path: str):
DirectoryAtPathException: If a directory is found at the path.
"""
full_path = os.path.join(self.root, path)
full_path = os.path.expanduser(full_path)
if os.path.isdir(full_path):
raise DirectoryAtPathException
return full_path

def clear(self):
"""Deletes ALL data on the local machine (under self.root). Exercise caution!"""

# much faster than mapper.clear()
if os.path.exists(self.root):
shutil.rmtree(self.root)
24 changes: 23 additions & 1 deletion hub/core/storage/lru_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def flush(self):
for key in self.dirty_keys:
self.next_storage[key] = self.cache_storage[key]
self.dirty_keys.clear()

self.next_storage.flush()

def __getitem__(self, path: str):
Expand Down Expand Up @@ -107,6 +106,29 @@ def __delitem__(self, path: str):
if not deleted_from_cache:
raise

def clear_cache(self):
"""Flushes the content of the cache and and then deletes contents of all the layers of it.
This doesn't delete data from the actual storage.
"""
self.flush()
self.cache_used = 0
self.lru_sizes.clear()
self.dirty_keys.clear()
self.cache_storage.clear()

if hasattr(self.next_storage, "clear_cache"):
self.next_storage.clear_cache()

def clear(self):
"""Deletes ALL the data from all the layers of the cache and the actual storage.
This is an IRREVERSIBLE operation. Data once deleted can not be recovered.
"""
self.cache_used = 0
self.lru_sizes.clear()
self.dirty_keys.clear()
self.cache_storage.clear()
self.next_storage.clear()

def __len__(self):
"""Returns the number of files present in the cache and the underlying storage.
Expand Down
2 changes: 1 addition & 1 deletion hub/core/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class MemoryProvider(StorageProvider):
"""Provider class for using the memory."""

def __init__(self, root):
def __init__(self, root=""):
self.dict = {}

def __getitem__(
Expand Down
2 changes: 0 additions & 2 deletions hub/core/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ def __iter__(self):

def clear(self):
"""Deletes ALL data on the s3 bucket (under self.root). Exercise caution!"""

# much faster than mapper.clear()
if self.resource is not None:
bucket = self.resource.Bucket(self.bucket)
bucket.objects.filter(Prefix=self.path).delete()
Expand Down
4 changes: 2 additions & 2 deletions hub/core/storage/tests/test_benchmark_storage_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from hub.constants import GB
from hub.constants import MB
from hub.tests.common_benchmark import (
parametrize_benchmark_chunk_sizes,
BENCHMARK_CHUNK_SIZES,
Expand All @@ -9,7 +9,7 @@
from hub.core.storage.tests.test_storage_provider import KEY # type: ignore


SIMULATED_DATA_SIZES = [1 * GB]
SIMULATED_DATA_SIZES = [128 * MB]

# caclulate the number of chunks needed for each entry in `SIMULATED_DATA_SIZES`
NUM_CHUNKS = []
Expand Down

0 comments on commit 9783e0e

Please sign in to comment.