Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: parallel file uploading and index creation #4212

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions codalab/worker/un_gzip_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import zlib

from collections import deque
from io import BytesIO
from io import BytesIO, SEEK_END, SEEK_SET, UnsupportedOperation
from typing import Optional
from zipfile import ( # type: ignore
BadZipFile,
Expand Down Expand Up @@ -235,12 +235,17 @@ class BytesBuffer(BytesIO):
"""
A class for a buffer of bytes. Unlike io.BytesIO(), this class
keeps track of the buffer's size (in bytes).

If extra_buffer_size is passed in during initialization, this class
will hold on to extra_buffer_size bytes before the current position,
allowing for limited seeking backwards up to that point.
"""

def __init__(self):
self.__buf = deque()
def __init__(self, extra_buffer_size = 0, deque_maxlen = None):
self.__buf = deque(maxlen=deque_maxlen)
self.__size = 0
self.__pos = 0
self.__extra_buf = BytesBuffer(deque_maxlen = extra_buffer_size) if extra_buffer_size > 0 else None

def __len__(self):
return self.__size
Expand All @@ -261,10 +266,42 @@ def read(self, size: Optional[int] = None):
ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
self.__buf.appendleft(remainder)
ret = b''.join(ret_list)
if self.__extra_buf:
for i in ret:
# Write one character at a time, for simplicity.
self.__extra_buf.write(bytes([i]))
self.__size -= len(ret)
self.__pos += len(ret)
return ret

def seek(self, offset: int, whence = SEEK_SET):
# Only support absolute file positioning (SEEK_SET).
if whence != SEEK_SET:
if whence == SEEK_END:
# Silently fail for now.
return
raise UnsupportedOperation
if offset < self.tell():
# Seek backwards.
if not self.__extra_buf:
raise UnsupportedOperation
bytes_to_seek_back = self.tell() - offset
if bytes_to_seek_back > len(self.__extra_buf.__buf):
raise UnsupportedOperation(f"Invalid seek offset {offset}. Can't seek back {bytes_to_seek_back}, buf len is only {len(self.__extra_buf.__buf)}.")
for _ in range(0, bytes_to_seek_back):
element = self.__extra_buf.__buf.pop()
self.__buf.appendleft(element)
self.__extra_buf.__size -= 1
self.__pos -= 1
self.__size += 1
assert self.__pos == offset, (self.__pos, offset)
else:
# Seek forward.
self.read(offset - self.tell())

def seekable(self):
return True if self.__extra_buf else False

def flush(self):
pass

Expand Down
55 changes: 55 additions & 0 deletions tests/unit/worker/un_gzip_stream_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

from io import UnsupportedOperation
import tempfile
import unittest

from codalab.worker.un_gzip_stream import BytesBuffer

class BytesBufferTest(unittest.TestCase):
def test_read_basic(self):
"""Test basic operations: read(), tell(), write()."""
for kwargs in [
dict(),
dict(extra_buffer_size=20)
]:
b = BytesBuffer(**kwargs)
b.write(b"hello")
self.assertEqual(b.tell(), 0)
self.assertEqual(b.read(), b"hello")
b.write(b"h")
b.write(b"e")
self.assertEqual(b.read(1), b"h")
self.assertEqual(b.read(1), b"e")
self.assertEqual(b.read(1), b"")
self.assertEqual(b.read(), b"")
self.assertEqual(b.tell(), 7)
b.write(b"he")
self.assertEqual(b.read(1), b"h")
b.write(b"llo")
self.assertEqual(b.read(), b"ello")
self.assertEqual(b.tell(), 12)

def test_extra_buffer_seek(self):
"""Test initializing BytesBuffer with extra_buffer_size,
allowing for limited seeking."""
b = BytesBuffer(extra_buffer_size=20)
b.write(b"1hello2hello")
self.assertEqual(b.read(), b"1hello2hello")
b.seek(0)
self.assertEqual(b.read(), b"1hello2hello")
b.seek(1)
b.seek(2)
b.seek(3)
b.seek(2)
self.assertEqual(b.read(), b"ello2hello")
b.seek(6)
self.assertEqual(b.read(), b"2hello")

def test_extra_buffer_seek_size_exceeded(self):
"""Once extra_buffer_size is exceeded, seeking should no longer work."""
b = BytesBuffer(extra_buffer_size=2)
b.write(b"1hello2hello")
self.assertEqual(b.read(), b"1hello2hello")
b.seek(10)
with self.assertRaises(UnsupportedOperation):
b.seek(9)
150 changes: 150 additions & 0 deletions tst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from io import BytesIO, BufferedReader, UnsupportedOperation
import io
import os
import shutil
import tempfile
from threading import Lock, Thread

from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from typing import Any, Dict, Union, Tuple, IO, cast
from contextlib import closing

from codalab.common import UsageError, StorageType, urlopen_with_retry, parse_linked_bundle_url
from codalab.worker.file_util import OpenFile, tar_gzip_directory, GzipStream
from codalab.worker.bundle_state import State
from codalab.lib import file_util, path_util, zip_util
from codalab.objects.bundle import Bundle
from codalab.lib.zip_util import ARCHIVE_EXTS_DIR
from codalab.lib.print_util import FileTransferProgress
from codalab.worker.un_gzip_stream import BytesBuffer

import indexed_gzip
from ratarmountcore import SQLiteIndexedTar


file_path = 'mkdocs.yml'

class FileStream(BytesIO):
NUM_READERS = 2
EXTRA_BUFFER_SIZE = 1024
def __init__(self, fileobj):
self._bufs = [BytesBuffer(extra_buffer_size=self.EXTRA_BUFFER_SIZE) for _ in range(0, self.NUM_READERS)]
self._pos = [0 for _ in range(0, self.NUM_READERS)]
self._fileobj = fileobj
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers.

class FileStreamReader(BytesIO):
def __init__(s, index):
s._index = index

def read(s, num_bytes=None):
return self.read(s._index, num_bytes)

def seek(s, *args, **kwargs):
return self.seek(s._index, *args, **kwargs)

def tell(s):
return self.tell(s._index)

self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)]


def read(self, index: int, num_bytes=None):
"""Read the specified number of bytes from the associated file.
index: index that specifies which reader is reading.
"""
with self._lock:
while num_bytes is None or len(self._bufs[index]) < num_bytes:
s = self._fileobj.read(num_bytes)
if not s:
break
for i in range(0, self.NUM_READERS):
self._bufs[i].write(s)
if num_bytes is None:
num_bytes = len(self._bufs[index])
s = self._bufs[index].read(num_bytes)
self._pos[index] += len(s)
return s


def seek(self, index: int, *args, **kwargs):
return self._bufs[index].seek(*args, **kwargs)

def tell(self, index: int):
return self._bufs[index].tell()

def close(self):
self.__input.close()

def upload(file_path, bundle_path = 'azfs://devstoreaccount1/bundles/0x1234/contents.gz'):
source_fileobj = open(file_path, 'rb')
output_fileobj = GzipStream(source_fileobj)
CHUNK_SIZE = 4 * 1024

TEST_CONN_STR = (
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
"BlobEndpoint=http://localhost:10000/devstoreaccount1;"
)

os.environ['AZURE_STORAGE_CONNECTION_STRING'] = TEST_CONN_STR

# stream_file = tempfile.NamedTemporaryFile(suffix=".gz")
stream_file = FileStream(output_fileobj)
reader1 = stream_file.readers[0]
reader2 = stream_file.readers[1]

def upload_file():
print("Upload file")
bytes_uploaded = 0
with FileSystems.create(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as out:
while True:
to_send = reader1.read(CHUNK_SIZE)
if not to_send:
break
out.write(to_send)
bytes_uploaded += len(to_send)

def create_index():
print("Create index")
with tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file:
SQLiteIndexedTar(
fileObject=reader2,
tarFileName="contents", # If saving a single file as a .gz archive, this file can be accessed by the "/contents" entry in the index.
writeIndex=True,
clearIndexCache=True,
indexFilePath=tmp_index_file.name,
isGnuIncremental=False,
)

bytes_uploaded = 0
with FileSystems.create(
parse_linked_bundle_url(bundle_path).index_path,
compression_type=CompressionTypes.UNCOMPRESSED,
) as out_index_file, open(tmp_index_file.name, "rb") as tif:
while True:
to_send = tif.read(CHUNK_SIZE)
if not to_send:
break
out_index_file.write(to_send)
bytes_uploaded += len(to_send)

threads = [
Thread(target=upload_file),
Thread(target=create_index)
]

for thread in threads:
thread.start()

for thread in threads:
thread.join()

with OpenFile(bundle_path) as f:
print(f.read())


upload(file_path)