Skip to content

Commit

Permalink
feat: implement merge
Browse files Browse the repository at this point in the history
  • Loading branch information
eiri committed Mar 27, 2024
1 parent 692ff3a commit 6eefc5d
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 38 deletions.
167 changes: 149 additions & 18 deletions src/py_bitcask/bitcask.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import io
import os
import shutil
import uuid
from dataclasses import dataclass
from functools import reduce
from io import BytesIO
from struct import pack, unpack
from typing import Any, Callable, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union
from zlib import crc32

from uuid_extensions import uuid7
Expand All @@ -28,10 +29,32 @@ class KeyRec:
tstamp: Union[uuid.UUID, str, int, bytes]


@dataclass
class Hint:
"""
Represents a hint item in a Bitcask hint file.

Attributes:
- tstamp (uuid.UUID): The timestamp associated with the key record as uuid7.
- key_sz (int): The size of the key in bytes.
- value_sz (int): The size of the value in bytes.
- value_pos (int): The position of the value within the corresponding data file.
- key (bytes): The key associated with the data value.
"""

tstamp: Union[uuid.UUID, str, int, bytes]
key_sz: int
value_sz: int
value_pos: int
key: bytes


class Bitcask:
DEFAULT_THRESHOLD = 1024
HEADER_FORMAT = ">I16sLL"
header_size = 28 # struct.calcsize(HEADER_FORMAT)
HINT_FORMAT = ">16sLLL"
hint_size = 32

def __init__(self, threshold: Optional[int] = DEFAULT_THRESHOLD) -> None:
"""
Expand Down Expand Up @@ -94,20 +117,62 @@ def _open(self) -> None:
"""
if self.__dirname == ":memory":
return
hint_files = self._read_hints()
self._open_with_hints(hint_files)

def _open_with_hints(self, hint_files) -> None:
"""
This utility function opens data files associated with each hint file
and populates the key directory with key records extracted from
the hint files.

Parameters:
- hint_files (Dict[str, List[Hint]]): A dictionary where
keys are data file names in form of uuid7
and values are lists of Hint objects representing hint files.

Returns:
None
"""
for uid, hints in hint_files.items():
file_id = crc32(uid.encode("utf-8"))
file_name = os.path.join(self.__dirname, uid + ".db")
current = open(file_name, "rb")
self.__datadir[file_id] = current
for hint in hints:
self.__keydir[hint.key] = KeyRec(
file_id, hint.value_sz, hint.value_pos, hint.tstamp
)

def _read_hints(self) -> Optional[Dict[str, List[Hint]]]:
"""
Reads data or hint files from the data directory
and returns a dictionary of hint files.

Returns:
Optional[Dict[str, List[Hint]]]: A dictionary where
keys are data file names in form of uuid7
and values are lists of Hint objects representing hint files.

Returns None if DataDir been set to ":memory".
"""
if self.__dirname == ":memory":
return
hint_files = {}
seen = {}
deleted = {}
files = os.listdir(self.__dirname)
files.sort()
files.reverse()
deleted = {}
for file in files:
file_name = os.path.join(self.__dirname, file)
if (
os.path.isfile(file_name)
and os.path.getsize(file_name) >= self.header_size
):
uuid7str, _ = os.path.splitext(file)
uid = crc32(uuid7str.encode("utf-8"))
uid, _ = os.path.splitext(file)
# TODO: check if hint file is here and read it instead
current = open(file_name, "rb")
self.__datadir[uid] = current
while current.tell() < os.path.getsize(file_name):
data = current.read(self.header_size)
_, ts_bytes, key_sz, value_sz = unpack(
Expand All @@ -119,29 +184,29 @@ def _open(self) -> None:
if value_sz == 0:
deleted[key] = True
continue
if key not in self.__keydir and key not in deleted:
self.__keydir[key] = KeyRec(
uid,
value_sz,
value_pos,
tstamp,
)
if key not in seen and key not in deleted:
seen[key] = True
hint = Hint(tstamp, key_sz, value_sz, value_pos, key)
if uid not in hint_files:
hint_files[uid] = []
hint_files[uid].append(hint)
current.seek(value_sz, 1)
return hint_files

def _reactivate(self) -> None:
"""
Reactivates the storage by creating a new active storage file.
"""
uid = uuid7()
new_active = io.BytesIO()
uid = uuid7(as_type="str")
new_active = BytesIO()
if self.__dirname != ":memory":
if self.__active is not None:
prev_active = self.__datadir[self.__active]
prev_active.close()
self.__datadir[self.__active] = open(prev_active.name, "rb")
file_name = os.path.join(self.__dirname, str(uid) + ".db")
file_name = os.path.join(self.__dirname, uid + ".db")
new_active = open(file_name, "a+b")
self.__active = crc32(uid.bytes)
self.__active = crc32(uid.encode("utf-8"))
self.__datadir[self.__active] = new_active
self.__cur = 0

Expand Down Expand Up @@ -297,7 +362,68 @@ def __next__(self) -> bytes:
keyrec = next(self.__iter)
return self._get(keyrec)

def merge(self):
def merge(self) -> bool:
"""
Runs merge operation on data dir removing all obsolete
and deleted keys from immutable data files and creating
hint files for new merged data files.

Returns:
bool: True if the operation is successful.

Raises:
RuntimeError: If bitcask is of type :memory.
"""
if self.__dirname == ":memory":
raise RuntimeError("Notsupported operation for type :memory.")
# create bitcask instance for the latest values
merge_cask = Bitcask(self.threshold)
merge_dir = os.path.join(self.__dirname, "merge")
os.makedirs(merge_dir)
merge_cask.open(merge_dir)
# store all the latest keys from immutable files in merge bitcask
for key, keyrec in self.__keydir.items():
if keyrec.file_id != self.__active:
value = self._get(keyrec)
merge_cask.put(key, value)
# reset active file
merge_cask._reactivate()
# build and store hint fils for merged data files
hint_files = merge_cask._read_hints()
for uid, hints in hint_files.items():
hint_file_name = os.path.join(merge_dir, uid + ".hint")
hint_file = open(hint_file_name, "a+b")
for hint in hints:
head = pack(
self.HINT_FORMAT,
hint.tstamp.bytes,
hint.key_sz,
hint.value_sz,
hint.value_pos,
)
hint_file.write(head)
hint_file.write(hint.key)
hint_file.close()
merge_cask.close()
# move merged files in working dir, then delete merge dir
for file in os.listdir(merge_dir):
file_path = os.path.join(merge_dir, file)
if os.path.isfile(file_path):
shutil.move(file_path, os.path.join(self.__dirname, file))
shutil.rmtree(merge_dir)
# delete all old immutable keys and files
keydir = self.__keydir.copy()
for key, keyrec in keydir.items():
if keyrec.file_id != self.__active:
del self.__keydir[key]
datadir = self.__datadir.copy()
for file_id, file in datadir.items():
if file_id != self.__active:
file.close()
os.remove(os.path.join(self.__dirname, file.name))
del self.__datadir[file_id]
# open all new files and propagate keydir from hint_files
self._open_with_hints(hint_files)
return True

def sync(self) -> bool:
Expand All @@ -306,7 +432,12 @@ def sync(self) -> bool:

Returns:
bool: True if the operation is successful.

Raises:
RuntimeError: If bitcask is of type :memory.
"""
if self.__dirname == ":memory":
raise RuntimeError("Notsupported operation for type :memory.")
self.__datadir[self.__active].flush()
return True

Expand Down
Loading

0 comments on commit 6eefc5d

Please sign in to comment.