Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding back transforms for parallel dataset uploads #1086

Merged
merged 47 commits into from
Jul 31, 2021
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
9fb5068
restore old transform code
AbhinavTuli Jul 13, 2021
c17a365
better checks on ds_out
AbhinavTuli Jul 13, 2021
f386183
Merge remote-tracking branch 'origin/main' into update/2.0/transforms
AbhinavTuli Jul 16, 2021
2373cf7
Merge remote-tracking branch 'origin/main' into update/2.0/transforms
AbhinavTuli Jul 21, 2021
d8ae83e
updated transform code
AbhinavTuli Jul 22, 2021
4e58950
Merge remote-tracking branch 'origin/main' into update/2.0/transforms
AbhinavTuli Jul 22, 2021
e670495
threaded transforms work, processed fails
AbhinavTuli Jul 26, 2021
d6b36e4
multiprocessing fixed
AbhinavTuli Jul 26, 2021
d8c0c1f
removed logs on dataset slcing
AbhinavTuli Jul 26, 2021
2819456
moved chunk engine initialization into store_shard
AbhinavTuli Jul 26, 2021
3245098
cache TODO
AbhinavTuli Jul 26, 2021
8fb90d3
added shardwise cache
AbhinavTuli Jul 26, 2021
9108e12
added TODO for complexity
AbhinavTuli Jul 26, 2021
5b28118
new transform api works, sample_out left
AbhinavTuli Jul 26, 2021
92b2e2c
removed commented out code
AbhinavTuli Jul 26, 2021
9d25165
update docstring
AbhinavTuli Jul 26, 2021
69d8f8c
samples_out syntax works
AbhinavTuli Jul 27, 2021
3c3e21e
removes old test
AbhinavTuli Jul 27, 2021
3abfa96
fix pickling
AbhinavTuli Jul 27, 2021
3a68296
removed commented out code, removed memcopies
AbhinavTuli Jul 27, 2021
51f97ba
renamed hub.parallel to hub.compute
AbhinavTuli Jul 27, 2021
11e2805
fixes transform issues with hub.read
AbhinavTuli Jul 28, 2021
0e936aa
change from numpy->numpy_compressed in transforms
AbhinavTuli Jul 28, 2021
22be630
Merge remote-tracking branch 'origin/main' into update/2.0/transforms
AbhinavTuli Jul 29, 2021
f9714b7
fix transforms after merge
AbhinavTuli Jul 29, 2021
f7b85fc
lint fixes
AbhinavTuli Jul 29, 2021
b3f9a53
lint fixes
AbhinavTuli Jul 29, 2021
d0ab673
refactors, workers changed to num_workers
AbhinavTuli Jul 29, 2021
378ca00
refactoring transform code
AbhinavTuli Jul 29, 2021
cb8ec3b
more refactors
AbhinavTuli Jul 29, 2021
00cce12
docstrings updated
AbhinavTuli Jul 29, 2021
a65d813
lint fix
AbhinavTuli Jul 29, 2021
4f8abb2
comments and nitpicks
AbhinavTuli Jul 30, 2021
98b1b38
added serial provider
AbhinavTuli Jul 30, 2021
0032e30
renames log_loading to verbose
AbhinavTuli Jul 30, 2021
36a1c2f
fixed serial provider
AbhinavTuli Jul 30, 2021
5389fda
updating exceptions
AbhinavTuli Jul 30, 2021
25c7dd0
mem_cache->meta_cache in chunk_engine
AbhinavTuli Jul 30, 2021
2db2edc
update hub compose exception
AbhinavTuli Jul 30, 2021
6ee893a
rename TransformDatasetShard->TransformDataset, TransformDatasetTenso…
AbhinavTuli Jul 30, 2021
faefc28
improved exception
AbhinavTuli Jul 30, 2021
42e4de7
paremetrized test
AbhinavTuli Jul 30, 2021
66e4f6c
lint fix
AbhinavTuli Jul 30, 2021
390649b
added test for hub like
AbhinavTuli Jul 30, 2021
ac346c7
shifted util code into encoder.py
AbhinavTuli Jul 30, 2021
b473153
lint fix encoder
AbhinavTuli Jul 30, 2021
0b3e255
simplified chunk_id merge
AbhinavTuli Jul 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion hub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,23 @@

from .api.dataset import dataset
from .api.read import read
from .core.transform import compute, compose
from .util.bugout_reporter import hub_reporter

load = dataset.load
empty = dataset.empty
like = dataset.like
__all__ = ["dataset", "read", "__version__", "load", "empty", "like"]

__all__ = [
"dataset",
"read",
"__version__",
"load",
"empty",
"compute",
"compose",
"like",
]

__version__ = "2.0.2"
__encoded_version__ = np.array(__version__)
Expand Down
30 changes: 22 additions & 8 deletions hub/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def is_uniform_sequence(samples):

class ChunkEngine:
def __init__(
self, key: str, cache: LRUCache, max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE
self,
key: str,
cache: LRUCache,
max_chunk_size: int = DEFAULT_MAX_CHUNK_SIZE,
memory_cache: LRUCache = None,
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved
):
"""Handles creating `Chunk`s and filling them with incoming samples.

Expand Down Expand Up @@ -98,6 +102,7 @@ def __init__(
key (str): Tensor key.
cache (LRUCache): Cache for which chunks and the metadata are stored.
max_chunk_size (int): Chunks generated by this instance will never exceed this size. Defaults to DEFAULT_MAX_CHUNK_SIZE.
memory_cache (LRUCache): Cache used for storing non chunk data such as tensor meta and chunk id encoder during transforms in memory.

Raises:
ValueError: If invalid max chunk size.
Expand All @@ -113,6 +118,7 @@ def __init__(

# only the last chunk may be less than this
self.min_chunk_size = self.max_chunk_size // 2
self.mem_cache = memory_cache

@property
def chunk_id_encoder(self) -> ChunkIdEncoder:
Expand All @@ -126,6 +132,7 @@ def chunk_id_encoder(self) -> ChunkIdEncoder:
ChunkIdEncoder: The chunk ID encoder handles the mapping between sample indices
and their corresponding chunks.
"""
cache = self.mem_cache or self.cache

key = get_chunk_id_encoder_key(self.key)
if not self.chunk_id_encoder_exists:
Expand All @@ -137,15 +144,21 @@ def chunk_id_encoder(self) -> ChunkIdEncoder:
)

enc = ChunkIdEncoder()
self.cache[key] = enc
cache[key] = enc
return enc

enc = self.cache.get_cachable(key, ChunkIdEncoder)
enc = cache.get_cachable(key, ChunkIdEncoder)
return enc

@property
def chunk_id_encoder_exists(self) -> bool:
return get_chunk_id_encoder_key(self.key) in self.cache
cache = self.mem_cache or self.cache
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved
try:
key = get_chunk_id_encoder_key(self.key)
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved
cache[key]
return True
except KeyError:
return False

@property
def num_chunks(self) -> int:
Expand Down Expand Up @@ -174,8 +187,9 @@ def last_chunk_key(self) -> str:

@property
def tensor_meta(self):
cache = self.mem_cache or self.cache
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved
tensor_meta_key = get_tensor_meta_key(self.key)
return self.cache.get_cachable(tensor_meta_key, TensorMeta)
return cache.get_cachable(tensor_meta_key, TensorMeta)

def _append_bytes(self, buffer: memoryview, shape: Tuple[int], dtype: np.dtype):
"""Treat `buffer` as a single sample and place them into `Chunk`s. This function implements the algorithm for
Expand Down Expand Up @@ -209,19 +223,19 @@ def _synchronize_cache(self):

# TODO implement tests for cache size compute
# TODO: optimize this by storing all of these keys in the chunk engine's state (posixpath.joins are pretty slow)

cache = self.mem_cache or self.cache
# synchronize last chunk
last_chunk_key = self.last_chunk_key
last_chunk = self.last_chunk
self.cache.update_used_cache_for_path(last_chunk_key, last_chunk.nbytes) # type: ignore

# synchronize tensor meta
tensor_meta_key = get_tensor_meta_key(self.key)
self.cache[tensor_meta_key] = self.tensor_meta
cache[tensor_meta_key] = self.tensor_meta

# synchronize chunk ID encoder
chunk_id_key = get_chunk_id_encoder_key(self.key)
self.cache[chunk_id_key] = self.chunk_id_encoder
cache[chunk_id_key] = self.chunk_id_encoder

def _try_appending_to_last_chunk(
self, buffer: memoryview, shape: Tuple[int]
Expand Down
8 changes: 7 additions & 1 deletion hub/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
read_only: bool = False,
public: Optional[bool] = True,
token: Optional[str] = None,
log_loading: bool = True,
):
"""Initializes a new or existing dataset.

Expand All @@ -48,6 +49,7 @@ def __init__(
Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
public (bool, optional): Applied only if storage is Hub cloud storage and a new Dataset is being created. Defines if the dataset will have public access.
token (str, optional): Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.
log_loading (bool): Logs the loading of the dataset. Defaults to True.
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved

Raises:
ValueError: If an existing local path is given, it must be a directory.
Expand All @@ -65,6 +67,7 @@ def __init__(
self.tensors: Dict[str, Tensor] = {}
self._token = token
self.public = public
self.log_loading = log_loading

self._set_derived_attributes()

Expand Down Expand Up @@ -104,6 +107,7 @@ def __getstate__(self) -> Dict[str, Any]:
"public": self.public,
"storage": self.storage,
"_token": self.token,
"log_loading": self.log_loading,
}

def __setstate__(self, state: Dict[str, Any]):
Expand Down Expand Up @@ -133,6 +137,7 @@ def __getitem__(
index=self.index[item],
read_only=self.read_only,
token=self._token,
log_loading=False,
)
else:
raise InvalidKeyTypeError(item)
Expand Down Expand Up @@ -252,7 +257,8 @@ def _load_meta(self):
meta_key = get_dataset_meta_key()

if dataset_exists(self.storage):
logger.info(f"{self.path} loaded successfully.")
if self.log_loading:
logger.info(f"{self.path} loaded successfully.")
self.meta = self.storage.get_cachable(meta_key, DatasetMeta)

for tensor_name in self.meta.tensors:
Expand Down
2 changes: 1 addition & 1 deletion hub/core/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def compressed_bytes(self, compression: str) -> bytes:
bytes: Bytes for the compressed sample. Contains all metadata required to decompress within these bytes.
"""

compression = compression.lower()
# compression = compression.lower()
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved

if compression is None:
return self.uncompressed_bytes()
Expand Down
2 changes: 1 addition & 1 deletion hub/core/storage/cachable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def nbytes(self):
raise NotImplementedError

def __getstate__(self) -> Dict[str, Any]:
raise NotImplementedError
return self.__dict__
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved

def __setstate__(self, state: Dict[str, Any]):
self.__dict__.update(state)
Expand Down
1 change: 1 addition & 0 deletions hub/core/transform/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from hub.core.transform.transform import compute, compose
Loading