Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.0] writing/reading fixed-shape arrays to chunks #842

Merged
merged 99 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from 80 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
57c0981
method skeleton
verbose-void May 5, 2021
6ec9e66
write chunks for array to memory provider (basic)
verbose-void May 5, 2021
379fa8e
start implementing chunk writing callstack
verbose-void May 5, 2021
e0bd95b
broke up `write` function into function calls
verbose-void May 5, 2021
b86ad9d
save index map with pickle to storage provider
verbose-void May 5, 2021
92bc409
very simple reading & writing to/from chunks
verbose-void May 5, 2021
49bb3fc
new write.py (not done) with better structure
verbose-void May 6, 2021
4364b26
write index map inside `write_array`
verbose-void May 6, 2021
c4c3a2a
rename functions more precisely (chunk_and_write...)
verbose-void May 6, 2021
812b6f1
add some batched tests
verbose-void May 6, 2021
9d5a138
started batched writing
verbose-void May 6, 2021
2e2cf19
require >= 1 cache & don't compress incomplete chunks (also mark them)
verbose-void May 6, 2021
96e5fc3
remove `no_cache` tests
verbose-void May 6, 2021
4e4cc9f
read from cache before storage & check for _incomplete chunks
verbose-void May 6, 2021
4369cde
add `normalize_and_batchify` util with tests
verbose-void May 6, 2021
fa427a5
use `normalize_and_batchify_shape` & update params
verbose-void May 6, 2021
1f40bcb
use "bytes_left_in_last_chunk` instead of `last_chunk_num_bytes`
verbose-void May 7, 2021
730120d
move cache/storage functions to `storage_chain.py`
verbose-void May 7, 2021
8d95e4a
some comments for @abhinav
verbose-void May 7, 2021
32a5aa9
removed all caching code
verbose-void May 7, 2021
eeb8040
make dummy compressors (chunk-wise/sample-wise)
verbose-void May 7, 2021
ac91aec
merged write_bytes & write_array to get logic working
verbose-void May 7, 2021
8b4b12b
works for single-chunk
verbose-void May 7, 2021
0d58bc4
move getting meta to another function
verbose-void May 7, 2021
aec1602
move index_map operations to index_map.py
verbose-void May 7, 2021
85d3e5a
moved tests into separate files (fixed/dynamic) & implement them fully
verbose-void May 8, 2021
9c79d86
remove redundant tests & change var names to be shorter
verbose-void May 8, 2021
57c6919
TODO
verbose-void May 8, 2021
151b3cf
add assertion message for failure case
verbose-void May 8, 2021
2162fd1
Merge branch 'release/2.0' of github.com:activeloopai/Hub into featur…
verbose-void May 8, 2021
ac8df10
add dtypes for random arrays
verbose-void May 8, 2021
fe1dbcd
mapper yield from mapper (instead of .items())
verbose-void May 9, 2021
fcf1e43
use actual memoryprovider
verbose-void May 9, 2021
24a841f
add compression / storage provider class in vars
verbose-void May 9, 2021
bcb2fed
remove unused imports
verbose-void May 9, 2021
cb39ef0
uncommented 5 batch param
verbose-void May 9, 2021
abae085
create storage provider in util
verbose-void May 9, 2021
5775c80
formatted
verbose-void May 9, 2021
32f2dc3
Merge branch 'release/2.0' of github.com:activeloopai/Hub into featur…
verbose-void May 9, 2021
a36a199
clear storage before test (in case last one failed)
verbose-void May 9, 2021
7d3ecc5
start adding multi-batch support
verbose-void May 10, 2021
2055644
moved meta validation & updating to functions within meta.py
verbose-void May 10, 2021
fda38d1
removed write_impl, reduce test shape sizes
verbose-void May 10, 2021
8234870
rewriting chunk array method & deferring more logic to
verbose-void May 10, 2021
733603a
reduce scope (no appending) & limit tests to this scope
verbose-void May 10, 2021
b5971a7
remove dead code
verbose-void May 10, 2021
6ba4f27
TODOs & remove dead code
verbose-void May 10, 2021
1ab17e2
add write_array docstring, create some general functions & typing
verbose-void May 10, 2021
07bb887
keep track of min/max shape for tensors & validate meta in tests
verbose-void May 10, 2021
8f63e92
`read_array` & accept slice as argument
verbose-void May 10, 2021
bc686b7
allow slice to be None for reading & return all samples
verbose-void May 10, 2021
7d19b7b
remove generator of decompressed chunks
verbose-void May 10, 2021
7841a0c
fixed typing issues
verbose-void May 10, 2021
b1116c2
default value isn't None, but is slice (for read index)
verbose-void May 10, 2021
fa16cc7
moved tests/util -> tests/common
verbose-void May 10, 2021
dbe62cb
get random chunk name
verbose-void May 10, 2021
c0f3111
rename generator -> chunker (also methods) & add join_chunks tests
verbose-void May 10, 2021
2922f3c
read_array returns in docstring & added join_chunks docstring
verbose-void May 10, 2021
36e6f7d
returns docstring
verbose-void May 10, 2021
831f3ea
docstrings, variable names, & removed index/meta modules
verbose-void May 11, 2021
065f98b
removed references to deleted modules
verbose-void May 11, 2021
fceaa5d
validate meta
verbose-void May 11, 2021
c0ba66d
remove unused import
verbose-void May 11, 2021
d33f311
update read docstring
verbose-void May 11, 2021
fa1b8c1
validate index_map incomplete chunks
verbose-void May 11, 2021
8d88fc4
add tests for validating actual chunk_sizes & incomplete_chunks
verbose-void May 11, 2021
1365527
fixed typing
verbose-void May 11, 2021
11cff64
move constants, delete unused tests, fixed test case
verbose-void May 11, 2021
f20683f
change the way chunk_sizes are being asserted
verbose-void May 11, 2021
20d4acf
extend last chunk if it is incomplete
verbose-void May 11, 2021
81d81a8
added join_chunks extra test case & pass in join_chunks
verbose-void May 11, 2021
ee24503
remove print
verbose-void May 11, 2021
4fdf386
raise ValueError instead of assert False
verbose-void May 11, 2021
1c972c5
fixed mypy types for last_chunk/chunk_name
verbose-void May 11, 2021
2208911
update docstrings
verbose-void May 11, 2021
8399fb2
Merge branch 'release/2.0' of github.com:activeloopai/Hub into featur…
verbose-void May 12, 2021
1968f9d
write pytest-benchmarks for 1GB write/read
verbose-void May 12, 2021
46b8b2e
moved get_random_array to util & write tests, also write/read bench
verbose-void May 12, 2021
5da1e1f
skip benchmarks when running `pytest .`
verbose-void May 12, 2021
2ca88f9
use --benchmark-enable in circleci (override setup.cfg)
verbose-void May 12, 2021
84c5f2e
print array GBs
verbose-void May 12, 2021
56c7080
add static typing for tests/utils
verbose-void May 12, 2021
d61e78a
benchmark 16MB only
verbose-void May 12, 2021
d95fe38
separate benchmark runs from testing runs
verbose-void May 12, 2021
74ad008
broke `write_array` into 2 functions & added docstrings/minor TODOs
verbose-void May 12, 2021
bf3cecb
expose core to user
verbose-void May 13, 2021
3910d7c
remove core exposure
verbose-void May 13, 2021
c80f0b7
Merge branch 'release/2.0' of github.com:activeloopai/Hub into featur…
verbose-void May 13, 2021
1db82de
fixed list failure when key empty
verbose-void May 13, 2021
015be2d
run tests with s3 provider & clear only memory providers
verbose-void May 13, 2021
23ec487
fixed benchmark error
verbose-void May 13, 2021
54eaac7
ignore pytest_cases for mypy & add static types
verbose-void May 13, 2021
eca8355
don't clear memory after writing for benchmarks
verbose-void May 13, 2021
36b1de7
don't clear after read benchmark
verbose-void May 13, 2021
07e0e68
don't use extend, use b"".join(...)
verbose-void May 13, 2021
0d0a40a
move get_random_array -> tests/common.py
verbose-void May 13, 2021
a2765c4
remove &
verbose-void May 13, 2021
cf833dd
added args for _get_last_chunk docstring
verbose-void May 13, 2021
02c87b6
add TODO
verbose-void May 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ commands:
command: |
$Env:GOOGLE_APPLICATION_CREDENTIALS = $Env:CI_GCS_PATH
setx /m GOOGLE_APPLICATION_CREDENTIALS "$Env:GOOGLE_APPLICATION_CREDENTIALS"
python3 -m pytest --cov-report=xml --cov=./ --benchmark-autosave
python3 -m pytest --cov-report=xml --cov=./ --benchmark-enable --benchmark-autosave
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
- when:
condition: << parameters.unix-like >>
steps:
- run:
name: "Running tests - Unix"
command: |
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json
python3 -m pytest --cov-report=xml --cov=./ --benchmark-autosave
python3 -m pytest --cov-report=xml --cov=./ --benchmark-enable --benchmark-autosave
codecov-upload:
steps:
- codecov/upload:
Expand Down
3 changes: 3 additions & 0 deletions hub/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
BYTE_PADDING = b"\0"
CHUNKS_FOLDER = "chunks"
META_FILENAME = "meta.json"
INDEX_MAP_FILENAME = "index_map.json"
3 changes: 3 additions & 0 deletions hub/core/chunk_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .chunker import generate_chunks, join_chunks
from .write import write_array
from .read import read_array
99 changes: 99 additions & 0 deletions hub/core/chunk_engine/chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import numpy as np
from typing import Generator, Optional, List

from hub.util.exceptions import ChunkSizeTooSmallError


def generate_chunks(
content_bytes: bytes,
chunk_size: int,
bytes_left_in_last_chunk: int = 0,
) -> Generator[bytes, None, None]:
"""Generator function that chunks bytes.

Chunking is the process of taking the input `content_bytes` & breaking it up into a sequence of smaller bytes called "chunks".
The sizes of each chunk are <= `chunk_size`.

Example:
content_bytes = b"1234567890123"
chunk_size = 4
yields:
b"1234"
b"5678"
b"9012"
b"3"

Args:
content_bytes (bytes): Bytes object with the data to be chunked.
chunk_size (int): Each individual chunk will be assigned this many bytes maximum.
bytes_left_in_last_chunk (int): If chunks were created already, `bytes_left_in_last_chunk`
should be set to the `chunk_size - len(last_chunk)`. This is so the generator's
first output will be enough bytes to fill that chunk up to `chunk_size`.

Yields:
bytes: Chunk of the `content_bytes`. Will have length on the interval (1, `chunk_size`].

Raises:
ChunkSizeTooSmallError: If `chunk_size` <= 0
ValueError: If `bytes_left_in_last_chunk` < 0
"""

# validate inputs
if chunk_size <= 0:
raise ChunkSizeTooSmallError()
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
if bytes_left_in_last_chunk < 0:
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Bytes left in last chunk must be >= 0.")
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
if len(content_bytes) <= 0:
return

# yield the remainder of the last chunk (provided as `last_chunk_num_bytes`)
total_bytes_yielded = 0
if bytes_left_in_last_chunk > 0:
chunk = content_bytes[:bytes_left_in_last_chunk]
yield chunk
total_bytes_yielded += bytes_left_in_last_chunk

# yield all new chunks
while total_bytes_yielded < len(content_bytes):
end = total_bytes_yielded + chunk_size
chunk = content_bytes[total_bytes_yielded:end]

yield chunk
total_bytes_yielded += len(chunk)


def join_chunks(chunks: List[bytes], start_byte: int, end_byte: int) -> bytes:
"""Given a list of bytes that represent sequential chunks, join them into one bytes object.
For more on chunking, see the `generate_chunks` method.

Example:
chunks = [b"123", b"456", b"789"]
start_byte = 1
end_byte = 2
returns:
b"2345678"

Args:
chunks (list[bytes]): Sequential list of bytes objects that represent chunks.
start_byte (int): The first chunk in the sequence will ignore the bytes before `start_byte`. If 0, all bytes are included.
end_byte (int): The last chunk in the sequence will ignore the bytes at and after `end_byte-1`. If None, all bytes are included.

Notes:
Bytes are indexed using: chunk[start_byte:end_byte]. That is why `chunk[end_byte]` will not be included in `chunk[start_byte:end_byte]`.
If `len(chunks) == 1`, `start_byte`:`end_byte` will be applied to the same chunk (the first & last one).

Returns:
bytes: The chunks joined as one bytes object.
"""

joined_bytearray = bytearray()
for i, chunk in enumerate(chunks):
actual_start_byte, actual_end_byte = 0, len(chunk)

if i <= 0:
actual_start_byte = start_byte
if i >= len(chunks) - 1:
actual_end_byte = end_byte

joined_bytearray.extend(chunk[actual_start_byte:actual_end_byte])
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
return bytes(joined_bytearray)
64 changes: 0 additions & 64 deletions hub/core/chunk_engine/generator.py

This file was deleted.

50 changes: 50 additions & 0 deletions hub/core/chunk_engine/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import pickle
import numpy as np

from .chunker import join_chunks
from .util import get_meta_key, get_index_map_key

from hub.core.typing import Provider
from typing import Callable, List, Union


def read_array(
key: str,
storage: Provider,
array_slice: slice = slice(None),
) -> np.ndarray:
"""Read & join chunks into an array from storage.

Args:
key (str): Key for where the chunks, index_map, & meta are located in `storage` relative to it's root.
array_slice (slice): Slice that represents which samples to read. Default = slice representing all samples.
storage (Provider): Provider for reading the chunks, index_map, & meta.

Returns:
np.ndarray: Array containing the sample(s) in the `array_slice` slice.
"""

# TODO: don't use pickle
meta = pickle.loads(storage[get_meta_key(key)])
index_map = pickle.loads(storage[get_index_map_key(key)])

samples = []
for index_entry in index_map[array_slice]:
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
chunks = []
for chunk_name in index_entry["chunk_names"]:
chunk_key = os.path.join(key, "chunks", chunk_name)
chunk = storage[chunk_key]

chunks.append(chunk)

combined_bytes = join_chunks(
chunks,
index_entry["start_byte"],
index_entry["end_byte"],
)

out_array = np.frombuffer(combined_bytes, dtype=meta["dtype"])
samples.append(out_array.reshape(index_entry["shape"]))

return np.array(samples)
152 changes: 152 additions & 0 deletions hub/core/chunk_engine/tests/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import numpy as np
import pickle

from hub.core.chunk_engine import write_array, read_array
from hub.core.chunk_engine.util import (
normalize_and_batchify_shape,
get_meta_key,
get_index_map_key,
get_chunk_key,
get_random_array,
)
from hub.core.storage import MappedProvider
from hub.core.typing import Provider

from typing import List, Tuple


TENSOR_KEY = "TEST_TENSOR"


STORAGE_PROVIDERS = (
MappedProvider(),
) # TODO: replace MappedProvider with MemoryProvider


CHUNK_SIZES = (
128,
4096,
16000000, # 16MB
)


DTYPES = (
"uint8",
"int64",
"float64",
"bool",
)


def get_min_shape(batch: np.ndarray) -> Tuple:
return tuple(np.minimum.reduce([sample.shape for sample in batch]))


def get_max_shape(batch: np.ndarray) -> Tuple:
return tuple(np.maximum.reduce([sample.shape for sample in batch]))


def assert_meta_is_valid(meta: dict, expected_meta: dict):
for k, v in expected_meta.items():
assert k in meta
assert v == meta[k]


def assert_chunk_sizes(key: str, index_map: List, chunk_size: int, storage: Provider):
incomplete_chunk_names = set()
complete_chunk_count = 0
total_chunks = 0
for i, entry in enumerate(index_map):
for j, chunk_name in enumerate(entry["chunk_names"]):
chunk_key = get_chunk_key(key, chunk_name)
chunk_length = len(storage[chunk_key])

# exceeding chunk_size is never acceptable
assert (
verbose-void marked this conversation as resolved.
Show resolved Hide resolved
chunk_length <= chunk_size
), 'Chunk "%s" exceeded chunk_size=%i (got %i) @ [%i, %i].' % (
chunk_name,
chunk_size,
chunk_length,
i,
j,
)

if chunk_length < chunk_size:
incomplete_chunk_names.add(chunk_name)
if chunk_length == chunk_size:
complete_chunk_count += 1

total_chunks += 1

incomplete_chunk_count = len(incomplete_chunk_names)
assert (
incomplete_chunk_count <= 1
), "Incomplete chunk count should never exceed 1. Incomplete count: %i. Complete count: %i. Total: %i.\nIncomplete chunk names: %s" % (
incomplete_chunk_count,
complete_chunk_count,
total_chunks,
str(incomplete_chunk_names),
)


def run_engine_test(arrays, storage, batched, chunk_size):
storage.clear()
verbose-void marked this conversation as resolved.
Show resolved Hide resolved

for i, a_in in enumerate(arrays):
write_array(
a_in,
TENSOR_KEY,
chunk_size,
storage,
batched=batched,
)

index_map_key = get_index_map_key(TENSOR_KEY)
index_map = pickle.loads(storage[index_map_key])

assert_chunk_sizes(TENSOR_KEY, index_map, chunk_size, storage)

# `write_array` implicitly normalizes/batchifies shape
a_in = normalize_and_batchify_shape(a_in, batched=batched)

a_out = read_array(TENSOR_KEY, storage)

meta_key = get_meta_key(TENSOR_KEY)
assert meta_key in storage, "Meta was not found."
meta = pickle.loads(storage[meta_key])

assert_meta_is_valid(
meta,
{
"chunk_size": chunk_size,
"length": a_in.shape[0],
"dtype": a_in.dtype.name,
"min_shape": get_min_shape(a_in),
"max_shape": get_max_shape(a_in),
},
)

assert np.array_equal(a_in, a_out), "Array not equal @ batch_index=%i." % i

storage.clear()


def benchmark_write(arrays, chunk_size, storage, batched, clear_after_write=True):
storage.clear()

for a_in in arrays:
write_array(
a_in,
TENSOR_KEY,
chunk_size,
storage,
batched=batched,
)

if clear_after_write:
storage.clear()


def benchmark_read(storage):
read_array(TENSOR_KEY, storage)