Skip to content

Commit

Permalink
Merge pull request #842 from activeloopai/feature/2.0/chunk-engine
Browse files Browse the repository at this point in the history
[2.0] writing/reading fixed-shape arrays to chunks
  • Loading branch information
verbose-void committed May 13, 2021
2 parents 65cab57 + 02c87b6 commit 80074a4
Show file tree
Hide file tree
Showing 17 changed files with 868 additions and 96 deletions.
15 changes: 13 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,26 @@ commands:
command: |
$Env:GOOGLE_APPLICATION_CREDENTIALS = $Env:CI_GCS_PATH
setx /m GOOGLE_APPLICATION_CREDENTIALS "$Env:GOOGLE_APPLICATION_CREDENTIALS"
python3 -m pytest --cov-report=xml --cov=./ --benchmark-autosave
python3 -m pytest --cov-report=xml --cov=./ --benchmark-skip
- run:
name: "Running benchmarks - Windows"
command: |
$Env:GOOGLE_APPLICATION_CREDENTIALS = $Env:CI_GCS_PATH
setx /m GOOGLE_APPLICATION_CREDENTIALS "$Env:GOOGLE_APPLICATION_CREDENTIALS"
python3 -m pytest --cov-report=xml --cov=./ --benchmark-only --benchmark-autosave
- when:
condition: << parameters.unix-like >>
steps:
- run:
name: "Running tests - Unix"
command: |
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json
python3 -m pytest --cov-report=xml --cov=./ --benchmark-autosave
python3 -m pytest --cov-report=xml --cov=./ --benchmark-skip
- run:
name: "Running benchmarks - Unix"
command: |
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.secrets/gcs.json
python3 -m pytest --cov-report=xml --cov=./ --benchmark-only --benchmark-autosave
codecov-upload:
steps:
- codecov/upload:
Expand Down
3 changes: 3 additions & 0 deletions hub/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
BYTE_PADDING = b"\0"
CHUNKS_FOLDER = "chunks"
META_FILENAME = "meta.json"
INDEX_MAP_FILENAME = "index_map.json"
3 changes: 3 additions & 0 deletions hub/core/chunk_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .chunker import generate_chunks, join_chunks
from .write import write_array
from .read import read_array
100 changes: 100 additions & 0 deletions hub/core/chunk_engine/chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import numpy as np
from typing import Generator, Optional, List

from hub.util.exceptions import ChunkSizeTooSmallError


def generate_chunks(
content_bytes: bytes,
chunk_size: int,
bytes_left_in_last_chunk: int = 0,
) -> Generator[bytes, None, None]:
"""Generator function that chunks bytes.
Chunking is the process of taking the input `content_bytes` and breaking it up into a sequence of smaller bytes called "chunks".
The sizes of each chunk are <= `chunk_size`.
Example:
content_bytes = b"1234567890123"
chunk_size = 4
yields:
b"1234"
b"5678"
b"9012"
b"3"
Args:
content_bytes (bytes): Bytes object with the data to be chunked.
chunk_size (int): Each individual chunk will be assigned this many bytes maximum.
bytes_left_in_last_chunk (int): If chunks were created already, `bytes_left_in_last_chunk`
should be set to the `chunk_size - len(last_chunk)`. This is so the generator's
first output will be enough bytes to fill that chunk up to `chunk_size`.
Yields:
bytes: Chunk of the `content_bytes`. Will have length on the interval (1, `chunk_size`].
Raises:
ChunkSizeTooSmallError: If `chunk_size` <= 0
ValueError: If `bytes_left_in_last_chunk` < 0
"""

# validate inputs
if chunk_size <= 0:
raise ChunkSizeTooSmallError()
if bytes_left_in_last_chunk < 0:
# TODO: move error message to separate file
raise ValueError("Bytes left in last chunk must be >= 0.")
if len(content_bytes) <= 0:
return

# yield the remainder of the last chunk (provided as `last_chunk_num_bytes`)
total_bytes_yielded = 0
if bytes_left_in_last_chunk > 0:
chunk = content_bytes[:bytes_left_in_last_chunk]
yield chunk
total_bytes_yielded += bytes_left_in_last_chunk

# yield all new chunks
while total_bytes_yielded < len(content_bytes):
end = total_bytes_yielded + chunk_size
chunk = content_bytes[total_bytes_yielded:end]

yield chunk
total_bytes_yielded += len(chunk)


def join_chunks(chunks: List[bytes], start_byte: int, end_byte: int) -> bytes:
"""Given a list of bytes that represent sequential chunks, join them into one bytes object.
For more on chunking, see the `generate_chunks` method.
Example:
chunks = [b"123", b"456", b"789"]
start_byte = 1
end_byte = 2
returns:
b"2345678"
Args:
chunks (list[bytes]): Sequential list of bytes objects that represent chunks.
start_byte (int): The first chunk in the sequence will ignore the bytes before `start_byte`. If 0, all bytes are included.
end_byte (int): The last chunk in the sequence will ignore the bytes at and after `end_byte-1`. If None, all bytes are included.
Notes:
Bytes are indexed using: chunk[start_byte:end_byte]. That is why `chunk[end_byte]` will not be included in `chunk[start_byte:end_byte]`.
If `len(chunks) == 1`, `start_byte`:`end_byte` will be applied to the same chunk (the first and last one).
Returns:
bytes: The chunks joined as one bytes object.
"""

indexed_chunks = []
for i, chunk in enumerate(chunks):
actual_start_byte, actual_end_byte = 0, len(chunk)

if i <= 0:
actual_start_byte = start_byte
if i >= len(chunks) - 1:
actual_end_byte = end_byte

indexed_chunks.append(chunk[actual_start_byte:actual_end_byte])
return b"".join(indexed_chunks)
64 changes: 0 additions & 64 deletions hub/core/chunk_engine/generator.py

This file was deleted.

51 changes: 51 additions & 0 deletions hub/core/chunk_engine/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import pickle
import numpy as np

from .chunker import join_chunks
from .util import get_meta_key, get_index_map_key

from hub.core.typing import Provider
from typing import Callable, List, Union


def read_array(
key: str,
storage: Provider,
array_slice: slice = slice(None),
) -> np.ndarray:
"""Read and join chunks into an array from storage.
Args:
key (str): Key for where the chunks, index_map, and meta are located in `storage` relative to it's root.
array_slice (slice): Slice that represents which samples to read. Default = slice representing all samples.
storage (Provider): Provider for reading the chunks, index_map, and meta.
Returns:
np.ndarray: Array containing the sample(s) in the `array_slice` slice.
"""

# TODO: don't use pickle
meta = pickle.loads(storage[get_meta_key(key)])
index_map = pickle.loads(storage[get_index_map_key(key)])

# TODO: read samples in parallel
samples = []
for index_entry in index_map[array_slice]:
chunks = []
for chunk_name in index_entry["chunk_names"]:
chunk_key = os.path.join(key, "chunks", chunk_name)
chunk = storage[chunk_key]

chunks.append(chunk)

combined_bytes = join_chunks(
chunks,
index_entry["start_byte"],
index_entry["end_byte"],
)

out_array = np.frombuffer(combined_bytes, dtype=meta["dtype"])
samples.append(out_array.reshape(index_entry["shape"]))

return np.array(samples)

0 comments on commit 80074a4

Please sign in to comment.