From bc36f35214b30b95b406f88861d38056cac2f586 Mon Sep 17 00:00:00 2001 From: dpena Date: Sun, 25 Oct 2015 02:29:08 -0700 Subject: [PATCH] Finished defragmentation algorithm, handled all edge cases. Writing tests. --- graphene/storage/base/general_store.py | 2 +- graphene/storage/base/property.py | 18 +- graphene/storage/defrag/defragmenter.py | 241 ++++++++++++++++++++--- graphene/storage/defrag/reference_map.py | 104 +++++----- 4 files changed, 294 insertions(+), 71 deletions(-) diff --git a/graphene/storage/base/general_store.py b/graphene/storage/base/general_store.py index 3428ec9..0e35534 100644 --- a/graphene/storage/base/general_store.py +++ b/graphene/storage/base/general_store.py @@ -142,7 +142,7 @@ def read_from_index_packed_data(self, index): :param index: Index to get data from :type index: int :return: Raw data from file - :rtype: bytes + :rtype: str """ if index == 0: raise ValueError("Item cannot be read from index 0") diff --git a/graphene/storage/base/property.py b/graphene/storage/base/property.py index 0431465..d9f93d0 100644 --- a/graphene/storage/base/property.py +++ b/graphene/storage/base/property.py @@ -122,7 +122,7 @@ def is_primitive(self): :return: True if primitive, False otherwise :rtype: bool """ - return self.type.value >= 1 and self.type.value <= 7 + return self.type_is_primitive(self.type) def is_string(self): """ @@ -131,7 +131,7 @@ def is_string(self): :return: True if string, False otherwise :rtype: bool """ - return self.type.value == 8 + return self.type_is_string(self.type) def is_array(self): """ @@ -140,4 +140,16 @@ def is_array(self): :return: True if string, False otherwise :rtype: bool """ - return self.type.value >= 9 + return self.type_is_array(self.type) + + @staticmethod + def type_is_primitive(type): + return type.value >= 1 and type.value <= 7 + + @staticmethod + def type_is_string(type): + return type.value == 8 + + @staticmethod + def type_is_array(type): + return type.value >= 9 diff --git a/graphene/storage/defrag/defragmenter.py b/graphene/storage/defrag/defragmenter.py index f78fb75..ab39e65 100644 --- a/graphene/storage/defrag/defragmenter.py +++ b/graphene/storage/defrag/defragmenter.py @@ -1,7 +1,14 @@ +from struct import Struct + +from graphene.storage.base.property import Property +from reference_map import TypeReferenceMap, kArrayType, kStringType, kProperty,\ + kPropertyTypeOffset, kPropertyPayloadOffset class Defragmenter: - def __init__(self, base_store, id_store): + REFERENCE_STRUCT_FORMAT_STR = "= I" + + def __init__(self, base_store, id_store, referencing_stores): """ Initialize a Defragmenter instance to use for defragmenting files @@ -9,11 +16,24 @@ def __init__(self, base_store, id_store): :type base_store: GeneralStore :param id_store: Id store for the base store :type id_store: IdStore + :param referencing_stores: Stores that reference this base store + :type referencing_stores: list[GeneralStore] :return: Defragmenter instance for defragmenting base files :rtype: Defragmenter """ + # Store to defragment self.baseStore = base_store + # ID store that corresponds to this base store self.idStore = id_store + # Reference map for the current store + self.referenceMap = TypeReferenceMap[base_store.STORAGE_TYPE.__name__] + # Base stores that reference this store + self.referencingStores = referencing_stores + # Structure for a reference type (int, 4 bytes) + self.referenceStruct = Struct(self.REFERENCE_STRUCT_FORMAT_STR) + # Whether self references should be updated for this file + self.shouldUpdateSelfRef = \ + self.baseStore.STORAGE_TYPE.__name__ in self.referenceMap def defragment(self): """ @@ -23,22 +43,33 @@ def defragment(self): :rtype: None """ # Get total number of blocks (excluding 0) - total_blocks = self.baseStore.count() + total_num_blocks = self.baseStore.count() # Get IDs of empty blocks empty_blocks = sorted(self.idStore.get_all_ids()) # Get IDs of full blocks - non_empty_blocks = self.non_empty_blocks(empty_blocks, total_blocks) - # Filter continuous IDs from these full IDs - full_non_cont_blocks = self.non_continuous_ids(non_empty_blocks) + full_blocks = self.full_blocks(empty_blocks, total_num_blocks) + # Split into continuous IDs and non-continuous ids from full IDs + full_cont_blocks, full_non_cont_blocks = \ + self.non_continuous_ids(full_blocks) # Create table with the needed swaps swap_table = self.create_swap_table(full_non_cont_blocks, empty_blocks) + # Perform the needed swaps self.perform_swaps(swap_table) + # Trim the end of the defragmented file, full blocks have been shifted + self.baseStore.truncate_file(len(empty_blocks)) # File should not have any remaining ids, clear id store self.idStore.clear() - # Fix the references in any of the files that reference this store and - # any self-references (if any) - self.fix_references() + # Fix self references in this store (if any) + if not self.shouldUpdateSelfRef: + # The remaining self-reference offsets that were not updated are the + # full continuous blocks that were not swapped). + # These references weren't fixed since they weren't loaded into RAM + remaining_ref_range = xrange(1, max(full_cont_blocks) + 1, 1) + self.fix_references(swap_table, self.baseStore, + self.referenceMap, remaining_ref_range) + # Fix the references in any of the files that reference this store + self.fix_external_references(swap_table) def perform_swaps(self, swap_table): """ @@ -49,36 +80,202 @@ def perform_swaps(self, swap_table): :return: Nothing :rtype: None """ - src_data = self.fetch_data(swap_table.keys()) + src_data = self.fetch_data(swap_table) for src, dst in swap_table.items(): self.baseStore.write_to_index_packed_data(dst, src_data[src]) - def fetch_data(self, indexes): + def fetch_data(self, swap_table): """ Fetch data at the given indexes from the store into RAM - :param indexes: Indexes to fetch the data for - :type indexes: list[int] + :param swap_table: Dictionary of src->dest index maps for swaps needed + :type swap_table: dict[int, int] :return: Dictionary with the index keys and fetched data values :rtype: dict[int, str] """ + ref_map = self.referenceMap # TODO: split into parts to make this more RAM conservative data = {} - for idx in indexes: - data[idx] = self.baseStore.read_from_index_packed_data(idx) + for idx in swap_table.keys(): + packed_data = self.baseStore.read_from_index_packed_data(idx) + if self.shouldUpdateSelfRef: + store_type = self.baseStore.STORAGE_TYPE.__name__ + packed_data = self.update_references(ref_map, packed_data, + swap_table, store_type) + data[idx] = packed_data return data - def fix_references(self): + def update_references(self, ref_map, packed_data, swap_table, store_type): + """ + Updates the packed data if any references within have changed + + :param ref_map: Map of references for this packed data + :type ref_map: dict[str, list[int]] + :param packed_data: Packed data to update + :type packed_data: str + :param store_type: STORAGE_TYPE.__name__ of the packed data's store + :type store_type: str + :return: Updated packed data + :rtype: str + """ + # Type currently being defragmented + defrag_type = self.baseStore.STORAGE_TYPE.__name__ + # Size of the reference struct (4 bytes) + ref_size = self.referenceStruct.size + assert(defrag_type in ref_map, + "Can't update defrag references if reference map has none") + # New packed data after updates + new_packed_data = "" + prev_ending = 0 + for offset in ref_map[defrag_type]: + offset_val = self.value_at_offset(packed_data, offset) + # If the value at the current offset is a reference that needs + # to be updated, do so. + if offset_val in swap_table: + new_packed_data += packed_data[prev_ending:offset] + \ + self.referenceStruct.pack(swap_table[offset_val]) + else: + new_packed_data += packed_data[prev_ending:offset + ref_size] + prev_ending = offset + ref_size + # Append any remaining packed data + new_packed_data += packed_data[prev_ending:] + # Edge case: defragmenting a string type and updating a property store + if defrag_type == kStringType and store_type == kProperty: + new_packed_data = \ + self.handle_prop_payload_reference(swap_table, new_packed_data) + + return new_packed_data + + def fix_references(self, swap_table, base_store, ref_map, id_fix_range): + """ + Fixes references for the given file offsets that may have changed. + + :param swap_table: Dictionary of src->dest index maps for swaps done + :type swap_table: dict[int, int] + :param base_store: Store that needs references fixed + :type base_store: GeneralStore + :param ref_map: Reference map with the offsets for reference updates + :type ref_map: dict[str, list[int]] + :param id_fix_range: Range of IDs that might need their references fixed + :type id_fix_range: xrange + :return: Nothing + :rtype: None + """ + # Type of the store whose references are being fixed + store_type = base_store.STORAGE_TYPE.__name__ + # Edge Case: The only time not having a reference map is allowed is when + # an Array type is being defragmented. In this case the Property payload + # may or may not contain a reference to an Array. This would also + # be the case for a String type, but a Property has a reference to + # its name at offset 5, and may or may not have a reference to a String + # in its payload. + if not ref_map: + assert(self.baseStore.STORAGE_TYPE.__name__ == kArrayType, + "Only arrays are allowed to have no reference map") + # Update the property payloads for the array type + self.handle_prop_payload_references(swap_table, base_store, + id_fix_range) + return + + for block_idx in id_fix_range: + # Update references + packed_data = base_store.read_from_index_packed_data(block_idx) + new_packed_data = self.update_references(ref_map, packed_data, + swap_table, store_type) + # Only re-write the data if it has changed + if packed_data != new_packed_data: + base_store.write_to_index_packed_data(block_idx, new_packed_data) + + def fix_external_references(self, swap_table): """ - Fix any broken references, either within this file or from other files + Fix changed references in any files that reference this file + :param swap_table: Dictionary of src->dest index maps for swaps done + :type swap_table: dict[int, int] :return: Nothing :rtype: None """ - pass + # TODO: handle property payload (string, array, or data type) + for store in self.referencingStores: + ref_map = TypeReferenceMap[store.STORAGE_TYPE.__name__] + id_fix_range = xrange(1, store.count() + 1) + self.fix_references(swap_table, store, ref_map, id_fix_range) + + def handle_prop_payload_references(self, swap_table, base_store, + id_fix_range): + + for block_idx in id_fix_range: + # Update references if property type is a reference type + packed_data = base_store.read_from_index_packed_data(block_idx) + new_packed_data = \ + self.handle_prop_payload_reference(swap_table, packed_data) + # Only rewrite the packed data to store if references were changed + if packed_data != new_packed_data: + base_store.write_to_index_packed_data(block_idx, + new_packed_data) + + def handle_prop_payload_reference(self, swap_table, packed_data): + """ + Modifies the property packed data payload if it is a reference type + to an array or string type that is in the swap table. + + :param swap_table: Swap table to check for reference in + :type swap_table: dict[int, int] + :param packed_data: Property packed data to modify if it contains a + reference in the swap table + :type packed_data: str + :return: Modified packed data, or original if not reference type or if + the reference is not in the swap table + :rtype: str + """ + old_ref = self.value_at_offset(packed_data, kPropertyPayloadOffset) + # TODO: swap these checks to the most unlikely one being first + # Don't update if a swap didn't happen for the payload reference + if old_ref not in swap_table: + return packed_data + # Property payload is not a reference type being defragmented + if not self.property_type_is_defrag_reference(packed_data): + return packed_data + # Swap the payload reference with the new reference + return packed_data[:kPropertyPayloadOffset] + \ + self.referenceStruct.pack(swap_table[old_ref]) + \ + packed_data[kPropertyPayloadOffset + self.referenceStruct.size:] + + def value_at_offset(self, packed_data, offset): + """ + Returns the integer value in the packed data at the given offset + + :param packed_data: Packed data to get value from + :type packed_data: str + :param offset: Offset to look for value + :type offset: int + :return: Integer value at the given offset in the packed data + :rtype: int + """ + packed_value = packed_data[offset:offset+self.referenceStruct.size] + return self.referenceStruct.unpack(packed_value)[0] + + def property_type_is_defrag_reference(self, packed_data): + """ + Returns whether the property type of the packed data is a reference + to the type that is currently being defragmented. + + :param packed_data: Packed data to perform check on + :type packed_data: str + :return: True if the ref. type is the type currently being defragmented + :rtype: bool + """ + type_value = self.value_at_offset(packed_data, kPropertyTypeOffset) + property_type = Property.PropertyType(type_value) + if Property.type_is_array(property_type): + return self.baseStore.STORAGE_TYPE.__name__ == kArrayType + elif Property.type_is_string(property_type): + return self.baseStore.STORAGE_TYPE.__name__ == kStringType + else: + return False @staticmethod - def non_empty_blocks(empty_ids, total_blocks): + def full_blocks(empty_ids, total_blocks): """ Return the IDs of full blocks with the given empty IDs and total number of blocks @@ -102,17 +299,17 @@ def non_continuous_ids(ids): For example: Given: [1, 2, 3, 7, 8, 9] Filter out continuous: [1, 2, 3] - Returns: [7, 8, 9] + Returns: ([1,2,3], [7, 8, 9]) :param ids: List of IDs to filter :type ids: list[int] :return: List of non-continuous IDs - :rtype: list[int] + :rtype: (list[int],list[int]) """ for idx, i in enumerate(sorted(ids)): if idx != i: - return ids[idx:] - return [] + return ids[:idx], ids[idx:] + return ids, [] @staticmethod def create_swap_table(full_non_continuous_blocks, empty_blocks): diff --git a/graphene/storage/defrag/reference_map.py b/graphene/storage/defrag/reference_map.py index ca9e73c..d8480af 100644 --- a/graphene/storage/defrag/reference_map.py +++ b/graphene/storage/defrag/reference_map.py @@ -18,6 +18,7 @@ kNode = Node.__name__ kProperty = Property.__name__ kRelationship = Relationship.__name__ +kPayload = (kStringType, kArrayType) class HeaderType(Enum): @@ -27,9 +28,14 @@ class HeaderType(Enum): char = 3 data = 4 +# --------------------------------- Constants -------------------------------- # + +kPropertyTypeOffset = 1 +kPropertyPayloadOffset = 17 + # ----------------------------- Offset Descriptor ---------------------------- # -kArrayOffsetDescriptor = { +ArrayOffsetDescriptor = { 0: ("inUse", HeaderType.bool), 1: ("type", HeaderType.char), 2: ("previousBlock", HeaderType.int), @@ -39,20 +45,20 @@ class HeaderType(Enum): 18: ("data", HeaderType.data), } -kGeneralTypeOffsetDescriptor = { +GeneralTypeOffsetDescriptor = { 0: ("inUse", HeaderType.bool), 1: ("nameId", HeaderType.int), 5: ("firstType", HeaderType.int), } -kGeneralTypeTypeOffsetDescriptor = { +GeneralTypeTypeOffsetDescriptor = { 0: ("inUse", HeaderType.bool), 1: ("typeName", HeaderType.int), 5: ("propertyType", HeaderType.int), 9: ("nextType", HeaderType.int), } -kStringOffsetDescriptor = { +StringOffsetDescriptor = { 0: ("inUse", HeaderType.bool), 1: ("previousBlock", HeaderType.int), 5: ("length", HeaderType.int), @@ -60,14 +66,14 @@ class HeaderType(Enum): 13: ("data", HeaderType.data), } -kNodeOffsetDescriptor = { +NodeOffsetDescriptor = { 0: ("inUse", HeaderType.bool), 1: ("nextRelId", HeaderType.int), 5: ("nextPropId", HeaderType.int), 9: ("nodeType", HeaderType.int), } -kPropertyOffsetDescriptor = { +PropertyOffsetDescriptor = { 0: ("inUse", HeaderType.bool), 1: ("type", HeaderType.int), 5: ("nameId", HeaderType.int), @@ -76,7 +82,7 @@ class HeaderType(Enum): 17: ("propBlockId", HeaderType.data), } -kRelationshipOffsetDescriptor = { +RelationshipOffsetDescriptor = { 0: ("inUse_direction", HeaderType.bool), 1: ("firstNode", HeaderType.int), 5: ("secondNode", HeaderType.int), @@ -88,6 +94,16 @@ class HeaderType(Enum): 29: ("nextPropId", HeaderType.int), } +OffsetDescriptors = { + kArrayType: ArrayOffsetDescriptor, + kGeneralType: GeneralTypeOffsetDescriptor, + kGeneralTypeType: GeneralTypeTypeOffsetDescriptor, + kStringType: StringOffsetDescriptor, + kNode: NodeOffsetDescriptor, + kProperty: PropertyOffsetDescriptor, + kRelationship: RelationshipOffsetDescriptor, +} + def offset_descriptor_for_class(c): """ @@ -98,26 +114,14 @@ def offset_descriptor_for_class(c): :return: Offset descriptor sorted by dictionary keys :rtype: OrderedDict[str, (str, HeaderType)] """ - if c == Array: - return OrderedDict(sorted(kArrayOffsetDescriptor.items())) - elif c == GeneralType: - return OrderedDict(sorted(kGeneralTypeOffsetDescriptor.items())) - elif c == GeneralTypeType: - return OrderedDict(sorted(kGeneralTypeTypeOffsetDescriptor.items())) - elif c == String: - return OrderedDict(sorted(kStringOffsetDescriptor.items())) - elif c == Node: - return OrderedDict(sorted(kNodeOffsetDescriptor.items())) - elif c == Property: - return OrderedDict(sorted(kPropertyOffsetDescriptor.items())) - elif c == Relationship: - return OrderedDict(sorted(kRelationshipOffsetDescriptor.items())) - else: + try: + return OrderedDict(sorted(OffsetDescriptors[c])) + except KeyError: raise TypeError("Unrecognized class type") # ---------------------------- Offset -> Type Maps --------------------------- # -kArrayOffsetReferenceMap = { +ArrayOffsetReferenceMap = { 0: None, # inUse (bool, 1 byte) 1: None, # type (char, 1 byte) 2: kArrayType, # previousBlock (int, 4 bytes) @@ -126,43 +130,43 @@ def offset_descriptor_for_class(c): 14: kArrayType, # nextBlock (int, 4 bytes) } -kGeneralTypeOffsetReferenceMap = { +GeneralTypeOffsetReferenceMap = { 0: None, # inUse (bool, 1 byte) - 1: kStringType, # nameId (int, 4 bytes) + 1: kStringType, # nameId (int, 4 bytes) 5: kGeneralTypeType, # firstType (int, 4 bytes) } -kGeneralTypeTypeOffsetReferenceMap = { +GeneralTypeTypeOffsetReferenceMap = { 0: None, # inUse (bool, 1 byte) - 1: kStringType, # typeName (int, 4 bytes) + 1: kStringType, # typeName (int, 4 bytes) 5: None, # propertyType (int, 4 bytes) 9: kGeneralTypeType, # nextType (int, 4 bytes) } -kStringOffsetReferenceMap = { +StringOffsetReferenceMap = { 0: None, # inUse (bool, 1 byte) - 1: kStringType, # previousBlock (int, 4 bytes) + 1: kStringType, # previousBlock (int, 4 bytes) 5: None, # length (int, 4 bytes) - 9: kStringType, # nextBlock (int, 4 bytes) + 9: kStringType, # nextBlock (int, 4 bytes) } -kNodeOffsetReferenceMap = { +NodeOffsetReferenceMap = { 0: None, # inUse (bool, 1 byte) 1: kRelationship, # nextRelId (int, 4 bytes) 5: kProperty, # nextPropId (int, 4 bytes) 9: kGeneralType, # nodeType (int, 4 bytes) } -kPropertyOffsetReferenceMap = { - 0: None, # inUse (bool, 1 byte) - 1: None, # type (int, 4 bytes) +PropertyOffsetReferenceMap = { + 0: None, # inUse (bool, 1 byte) + 1: None, # type (int, 4 bytes) 5: kStringType, # nameId (int, 4 bytes) - 9: kProperty, # prevPropId (int, 4 bytes) - 13: kProperty, # nextPropId (int, 4 bytes) + 9: kProperty, # prevPropId (int, 4 bytes) + 13: kProperty, # nextPropId (int, 4 bytes) 17: (kStringType, kArrayType) # propBlockId (4 bytes: data, strId, arrayId) } -kRelationshipOffsetReferenceMap = { +RelationshipOffsetReferenceMap = { 0: None, # inUse_direction (bool, 1 byte) 1: kNode, # firstNode (int, 4 bytes) 5: kNode, # secondNode (int, 4 bytes) @@ -176,39 +180,49 @@ def offset_descriptor_for_class(c): # ---------------------------- Type -> Offset Maps --------------------------- # -kArrayTypeReferenceMap = { +ArrayTypeReferenceMap = { kArrayType: [2, 14] } -kGeneralTypeReferenceMap = { +GeneralTypeReferenceMap = { kStringType: [1], kGeneralTypeType: [5], } -kGeneralTypeTypeTypeReferenceMap = { +GeneralTypeTypeReferenceMap = { kStringType: [1], kGeneralTypeType: [9], } -kStringTypeReferenceMap = { +StringTypeReferenceMap = { kStringType: [1, 9], } -kNodeTypeReferenceMap = { +NodeTypeReferenceMap = { kRelationship: [1], kProperty: [5], kGeneralType: [9], } -kPropertyTypeReferenceMap = { +PropertyTypeReferenceMap = { kStringType: [5], kProperty: [9, 13], - (kStringType, kArrayType): [17], + kPayload: [kPropertyPayloadOffset], } -kRelationshipTypeReferenceMap = { +RelationshipTypeReferenceMap = { kNode: [1, 5], kGeneralType: [9], kRelationship: [13, 17, 21, 25], kProperty: [29], } + +TypeReferenceMap = { + kArrayType: ArrayTypeReferenceMap, + kGeneralType: GeneralTypeReferenceMap, + kGeneralTypeType: GeneralTypeTypeReferenceMap, + kStringType: StringTypeReferenceMap, + kNode: NodeTypeReferenceMap, + kProperty: PropertyTypeReferenceMap, + kRelationship: RelationshipTypeReferenceMap, +}