Skip to content

Commit

Permalink
Finished defragmentation algorithm, handled all edge cases.
Browse files Browse the repository at this point in the history
Writing tests.
  • Loading branch information
dpena committed Oct 25, 2015
1 parent c48ce42 commit bc36f35
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 71 deletions.
2 changes: 1 addition & 1 deletion graphene/storage/base/general_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def read_from_index_packed_data(self, index):
:param index: Index to get data from
:type index: int
:return: Raw data from file
:rtype: bytes
:rtype: str
"""
if index == 0:
raise ValueError("Item cannot be read from index 0")
Expand Down
18 changes: 15 additions & 3 deletions graphene/storage/base/property.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def is_primitive(self):
:return: True if primitive, False otherwise
:rtype: bool
"""
return self.type.value >= 1 and self.type.value <= 7
return self.type_is_primitive(self.type)

def is_string(self):
"""
Expand All @@ -131,7 +131,7 @@ def is_string(self):
:return: True if string, False otherwise
:rtype: bool
"""
return self.type.value == 8
return self.type_is_string(self.type)

def is_array(self):
"""
Expand All @@ -140,4 +140,16 @@ def is_array(self):
:return: True if string, False otherwise
:rtype: bool
"""
return self.type.value >= 9
return self.type_is_array(self.type)

@staticmethod
def type_is_primitive(type):
return type.value >= 1 and type.value <= 7

@staticmethod
def type_is_string(type):
return type.value == 8

@staticmethod
def type_is_array(type):
return type.value >= 9
241 changes: 219 additions & 22 deletions graphene/storage/defrag/defragmenter.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
from struct import Struct

from graphene.storage.base.property import Property
from reference_map import TypeReferenceMap, kArrayType, kStringType, kProperty,\
kPropertyTypeOffset, kPropertyPayloadOffset


class Defragmenter:
def __init__(self, base_store, id_store):
REFERENCE_STRUCT_FORMAT_STR = "= I"

def __init__(self, base_store, id_store, referencing_stores):
"""
Initialize a Defragmenter instance to use for defragmenting files
:param base_store: Base store to perform swaps on
:type base_store: GeneralStore
:param id_store: Id store for the base store
:type id_store: IdStore
:param referencing_stores: Stores that reference this base store
:type referencing_stores: list[GeneralStore]
:return: Defragmenter instance for defragmenting base files
:rtype: Defragmenter
"""
# Store to defragment
self.baseStore = base_store
# ID store that corresponds to this base store
self.idStore = id_store
# Reference map for the current store
self.referenceMap = TypeReferenceMap[base_store.STORAGE_TYPE.__name__]
# Base stores that reference this store
self.referencingStores = referencing_stores
# Structure for a reference type (int, 4 bytes)
self.referenceStruct = Struct(self.REFERENCE_STRUCT_FORMAT_STR)
# Whether self references should be updated for this file
self.shouldUpdateSelfRef = \
self.baseStore.STORAGE_TYPE.__name__ in self.referenceMap

def defragment(self):
"""
Expand All @@ -23,22 +43,33 @@ def defragment(self):
:rtype: None
"""
# Get total number of blocks (excluding 0)
total_blocks = self.baseStore.count()
total_num_blocks = self.baseStore.count()
# Get IDs of empty blocks
empty_blocks = sorted(self.idStore.get_all_ids())
# Get IDs of full blocks
non_empty_blocks = self.non_empty_blocks(empty_blocks, total_blocks)
# Filter continuous IDs from these full IDs
full_non_cont_blocks = self.non_continuous_ids(non_empty_blocks)
full_blocks = self.full_blocks(empty_blocks, total_num_blocks)
# Split into continuous IDs and non-continuous ids from full IDs
full_cont_blocks, full_non_cont_blocks = \
self.non_continuous_ids(full_blocks)
# Create table with the needed swaps
swap_table = self.create_swap_table(full_non_cont_blocks, empty_blocks)

# Perform the needed swaps
self.perform_swaps(swap_table)
# Trim the end of the defragmented file, full blocks have been shifted
self.baseStore.truncate_file(len(empty_blocks))
# File should not have any remaining ids, clear id store
self.idStore.clear()
# Fix the references in any of the files that reference this store and
# any self-references (if any)
self.fix_references()
# Fix self references in this store (if any)
if not self.shouldUpdateSelfRef:
# The remaining self-reference offsets that were not updated are the
# full continuous blocks that were not swapped).
# These references weren't fixed since they weren't loaded into RAM
remaining_ref_range = xrange(1, max(full_cont_blocks) + 1, 1)
self.fix_references(swap_table, self.baseStore,
self.referenceMap, remaining_ref_range)
# Fix the references in any of the files that reference this store
self.fix_external_references(swap_table)

def perform_swaps(self, swap_table):
"""
Expand All @@ -49,36 +80,202 @@ def perform_swaps(self, swap_table):
:return: Nothing
:rtype: None
"""
src_data = self.fetch_data(swap_table.keys())
src_data = self.fetch_data(swap_table)
for src, dst in swap_table.items():
self.baseStore.write_to_index_packed_data(dst, src_data[src])

def fetch_data(self, indexes):
def fetch_data(self, swap_table):
"""
Fetch data at the given indexes from the store into RAM
:param indexes: Indexes to fetch the data for
:type indexes: list[int]
:param swap_table: Dictionary of src->dest index maps for swaps needed
:type swap_table: dict[int, int]
:return: Dictionary with the index keys and fetched data values
:rtype: dict[int, str]
"""
ref_map = self.referenceMap
# TODO: split into parts to make this more RAM conservative
data = {}
for idx in indexes:
data[idx] = self.baseStore.read_from_index_packed_data(idx)
for idx in swap_table.keys():
packed_data = self.baseStore.read_from_index_packed_data(idx)
if self.shouldUpdateSelfRef:
store_type = self.baseStore.STORAGE_TYPE.__name__
packed_data = self.update_references(ref_map, packed_data,
swap_table, store_type)
data[idx] = packed_data
return data

def fix_references(self):
def update_references(self, ref_map, packed_data, swap_table, store_type):
"""
Updates the packed data if any references within have changed
:param ref_map: Map of references for this packed data
:type ref_map: dict[str, list[int]]
:param packed_data: Packed data to update
:type packed_data: str
:param store_type: STORAGE_TYPE.__name__ of the packed data's store
:type store_type: str
:return: Updated packed data
:rtype: str
"""
# Type currently being defragmented
defrag_type = self.baseStore.STORAGE_TYPE.__name__
# Size of the reference struct (4 bytes)
ref_size = self.referenceStruct.size
assert(defrag_type in ref_map,
"Can't update defrag references if reference map has none")
# New packed data after updates
new_packed_data = ""
prev_ending = 0
for offset in ref_map[defrag_type]:
offset_val = self.value_at_offset(packed_data, offset)
# If the value at the current offset is a reference that needs
# to be updated, do so.
if offset_val in swap_table:
new_packed_data += packed_data[prev_ending:offset] + \
self.referenceStruct.pack(swap_table[offset_val])
else:
new_packed_data += packed_data[prev_ending:offset + ref_size]
prev_ending = offset + ref_size
# Append any remaining packed data
new_packed_data += packed_data[prev_ending:]
# Edge case: defragmenting a string type and updating a property store
if defrag_type == kStringType and store_type == kProperty:
new_packed_data = \
self.handle_prop_payload_reference(swap_table, new_packed_data)

return new_packed_data

def fix_references(self, swap_table, base_store, ref_map, id_fix_range):
"""
Fixes references for the given file offsets that may have changed.
:param swap_table: Dictionary of src->dest index maps for swaps done
:type swap_table: dict[int, int]
:param base_store: Store that needs references fixed
:type base_store: GeneralStore
:param ref_map: Reference map with the offsets for reference updates
:type ref_map: dict[str, list[int]]
:param id_fix_range: Range of IDs that might need their references fixed
:type id_fix_range: xrange
:return: Nothing
:rtype: None
"""
# Type of the store whose references are being fixed
store_type = base_store.STORAGE_TYPE.__name__
# Edge Case: The only time not having a reference map is allowed is when
# an Array type is being defragmented. In this case the Property payload
# may or may not contain a reference to an Array. This would also
# be the case for a String type, but a Property has a reference to
# its name at offset 5, and may or may not have a reference to a String
# in its payload.
if not ref_map:
assert(self.baseStore.STORAGE_TYPE.__name__ == kArrayType,
"Only arrays are allowed to have no reference map")
# Update the property payloads for the array type
self.handle_prop_payload_references(swap_table, base_store,
id_fix_range)
return

for block_idx in id_fix_range:
# Update references
packed_data = base_store.read_from_index_packed_data(block_idx)
new_packed_data = self.update_references(ref_map, packed_data,
swap_table, store_type)
# Only re-write the data if it has changed
if packed_data != new_packed_data:
base_store.write_to_index_packed_data(block_idx, new_packed_data)

def fix_external_references(self, swap_table):
"""
Fix any broken references, either within this file or from other files
Fix changed references in any files that reference this file
:param swap_table: Dictionary of src->dest index maps for swaps done
:type swap_table: dict[int, int]
:return: Nothing
:rtype: None
"""
pass
# TODO: handle property payload (string, array, or data type)
for store in self.referencingStores:
ref_map = TypeReferenceMap[store.STORAGE_TYPE.__name__]
id_fix_range = xrange(1, store.count() + 1)
self.fix_references(swap_table, store, ref_map, id_fix_range)

def handle_prop_payload_references(self, swap_table, base_store,
id_fix_range):

for block_idx in id_fix_range:
# Update references if property type is a reference type
packed_data = base_store.read_from_index_packed_data(block_idx)
new_packed_data = \
self.handle_prop_payload_reference(swap_table, packed_data)
# Only rewrite the packed data to store if references were changed
if packed_data != new_packed_data:
base_store.write_to_index_packed_data(block_idx,
new_packed_data)

def handle_prop_payload_reference(self, swap_table, packed_data):
"""
Modifies the property packed data payload if it is a reference type
to an array or string type that is in the swap table.
:param swap_table: Swap table to check for reference in
:type swap_table: dict[int, int]
:param packed_data: Property packed data to modify if it contains a
reference in the swap table
:type packed_data: str
:return: Modified packed data, or original if not reference type or if
the reference is not in the swap table
:rtype: str
"""
old_ref = self.value_at_offset(packed_data, kPropertyPayloadOffset)
# TODO: swap these checks to the most unlikely one being first
# Don't update if a swap didn't happen for the payload reference
if old_ref not in swap_table:
return packed_data
# Property payload is not a reference type being defragmented
if not self.property_type_is_defrag_reference(packed_data):
return packed_data
# Swap the payload reference with the new reference
return packed_data[:kPropertyPayloadOffset] + \
self.referenceStruct.pack(swap_table[old_ref]) + \
packed_data[kPropertyPayloadOffset + self.referenceStruct.size:]

def value_at_offset(self, packed_data, offset):
"""
Returns the integer value in the packed data at the given offset
:param packed_data: Packed data to get value from
:type packed_data: str
:param offset: Offset to look for value
:type offset: int
:return: Integer value at the given offset in the packed data
:rtype: int
"""
packed_value = packed_data[offset:offset+self.referenceStruct.size]
return self.referenceStruct.unpack(packed_value)[0]

def property_type_is_defrag_reference(self, packed_data):
"""
Returns whether the property type of the packed data is a reference
to the type that is currently being defragmented.
:param packed_data: Packed data to perform check on
:type packed_data: str
:return: True if the ref. type is the type currently being defragmented
:rtype: bool
"""
type_value = self.value_at_offset(packed_data, kPropertyTypeOffset)
property_type = Property.PropertyType(type_value)
if Property.type_is_array(property_type):
return self.baseStore.STORAGE_TYPE.__name__ == kArrayType
elif Property.type_is_string(property_type):
return self.baseStore.STORAGE_TYPE.__name__ == kStringType
else:
return False

@staticmethod
def non_empty_blocks(empty_ids, total_blocks):
def full_blocks(empty_ids, total_blocks):
"""
Return the IDs of full blocks with the given empty IDs and total
number of blocks
Expand All @@ -102,17 +299,17 @@ def non_continuous_ids(ids):
For example:
Given: [1, 2, 3, 7, 8, 9]
Filter out continuous: [1, 2, 3]
Returns: [7, 8, 9]
Returns: ([1,2,3], [7, 8, 9])
:param ids: List of IDs to filter
:type ids: list[int]
:return: List of non-continuous IDs
:rtype: list[int]
:rtype: (list[int],list[int])
"""
for idx, i in enumerate(sorted(ids)):
if idx != i:
return ids[idx:]
return []
return ids[:idx], ids[idx:]
return ids, []

@staticmethod
def create_swap_table(full_non_continuous_blocks, empty_blocks):
Expand Down
Loading

0 comments on commit bc36f35

Please sign in to comment.