Skip to content

Object Chunking and Garbage Collection

UENISHI Kota edited this page Feb 16, 2016 · 33 revisions

Note:

The large file material here should be valid for Riak CS 1.0, and the GC implementation described is currently under development, to be released in Riak CS 1.1.

Goals

The simple goal is the support objects up to 5TB in size, with the same API and behavioral characteristics as S3. S3 offers eventually consistent access to objects, with no concurrency control other than last-write-wins. This does mean that there might be one "version" of a file being read on one machine, while another is accepting a write for the same file, and yet another is deleting the file (but who's view for the delete is yet another version). All of this has to happen while also garbage collecting overwritten and failed-uploaded files. To accomplish this, we make heavy use of sibling resolution and Riak's eventual consistency properties.

Implementation Overview

Riak CS splits files up into small blocks, and stores each block as a separate Riak object. Files less than the size of one block will simply be stored as a single block. There is a separate manifest object that is used to record metadata about the file, such as the key, MD5, and how many blocks have been written so far. Manifests and individual blocks are stored in different buckets. A user bucket called "foo", will, for example, correspond to two buckets in Riak, one for the manifests, and one for the blocks.

There is a manifest for each PUT operation on a particular key, each with a different UUID. Because of things like garbage collection and concurrent PUTs, the object stored at the manifest {Bucket, Key} is a collection of manifests (an orddict), not just a single manifest. Furthermore, {allow_mult, true} is set on this bucket, so there may be several siblings, each which are a collection of manifests. Depending on the operation, a piece of code may examine all of the manifests, or only a particular one. The erlang record definition for manifests is well commented here.

Individual blocks are immutable, as they include the UUID as part of their key. They can however, be deleted.

Manifests can be in one of four different states:

  • writing: A manifest is in this state when it is first created, and as the blocks are being written to Riak.

  • active: A manifest is in this state once all of the blocks have been written to Riak. This is the only state that a manifest can be in and serve GET requests.

  • pending_delete: When a user deletes a file or a file is overwritten, the manifest is first put in the pending_delete state, before the move to the GC bucket is successful. Once the manifest has been successfully moved, it will go into the scheduled_delete state.

  • scheduled_delete: A manifest is in this state when its information has been written to the riak-cs-gc bucket thus scheduling the file blocks for deletion. The manifest stays around for as long as leeway_seconds, and is pruned lazily.

When two manifests with the same UUID are in conflict (because they came from manifest-collection siblings), they are resolved with this code.

Operations

  • GET:

    1. Retrieve manifests
    2. Resolve siblings
    3. Select all active manifests, and choose the one with the most recent write_start_time (a property of the manifest). If there are no active manifests, return 404.

    Note, we specifically don't write the resolved manifests back to Riak, as this can create a situation where to actors both resolve siblings, therefore creating siblings again. This follows the general rule-of-thumb, don't make a write to Riak just to resolve siblings.

  • PUT:

    1. Create a new manifest (with fresh UUID) in the writing state. Once all of the blocks have been written, change the state to active.
    2. Follow the same steps as in DELETE, to delete any manifests that this overwrites with the following exception: Manifests found in the writing state are not marked as pending_delete unless their last_block_written_time is leeway_seconds in the past.

    Note, each time the manifest is written back to Riak, we resolve siblings, and write back with the correct vector clock.

  • PUT Copy: actually a combination of GET and PUT

    1. Spawn a PUT fsm and a GET fsm
    2. Deliver block data fetched from GET fsm, to PUT fsm
    3. On every block check the socket by reading 0 bytes to detect disconnected clients and to avoid unnecessary copy
  • DELETE:

    1. Retrieve manifests
    2. Resolve siblings
    3. Select all active and writing manifests, and mark them as pending_delete.
    4. "Move" these manifests to the GC bucket, or delete blocks on the fly if the object size is below active_delete_threshold. See Active-Deletion-of-Small-Objects for details.
    5. Mark them as scheduled_delete

Note: at any of these paths, before retrieving manifests there is a phase of

  1. Authentication. Fetching user in moss.users whose key defined in Authorization header in HTTP request.
  2. Owner identification. Fetching bucket record in moss.buckets from bucket name, which has owner's access key in its contents. Fetching owner's record at moss.users if it is not same as the accessing user.
  3. Authorization. Extracting ACLs and Policies defined in bucket record metadata and evaluating all of them with accessor, owner and other information.

After all of these passed, get/put/delete code may run and otherwise the whole request fails.

There are three types of processes involved in get/put/delete. They're responsibilities are detailed below.

Coordinator FSM

There is a coordinator FSM for each of get/put/delete. This FSM is responsible for keeping track of which blocks remain to be retrieved/written/deleted by a group of gen_servers, and for communicating back to the original caller. It's goals are similar to the Riak get/put/delete FSMs. This FSM only concerns itself with the "active manifest", and should not have knowledge of siblings or manifests with other UUIDs. The coordinator FSM is also responsible for launching the other two types of processes.

Manifest FSM

The manifest FSM is responsible for get/put/delete'ing manifests from Riak (or coordinating a gen_server to do this), and for keeping track and acting on manifest siblings and "other" UUIDs. It does this so that the coordinator FSM doesn't have to concern itself with these things. This FSM will do things like spawn GC when it comes across manifests that haven't been deleted or whose deletion processes appear to have died, and for making sure that things written back to Riak always include the correct vector clock

Blocks gen_server

The blocks gen_server is a simple wrapper around riakc_pb_socket that has knowledge of how to reply to the coordinator FSM and how to turn block identifiers into Riak BKeys. This may be multiple processes to fetch blocks concurrenlty (configured by (fetch|put|delete)_concurrency). The fallback logic and error handling are compilicated, regarding quorum, replication fallback and timeouts; this subsection try to demonstrate it.

GET blocks: As blocks are all write-once (as well as delete-once) and immutable, there is few need to perform n=all on get request. In most normal cases without failure the factor of N is just unnecessary load to network throughput in a large object storage use case. Thus CS fetches blocks with n=one get option, with sloppy quorum disabled. It first tries n=one local get, if it fails in notfound, then it second tries to fetch with n=all. Even it does not work then it tries to perform proxy_get if it is enterprise edition. There are several retry logic, but this is repeated for all blocks in an object. Sibling resolutions are nasty, see riak_cs_block_server:resolve_block_object/2. After sibling resolution the fetched block is delivered to GET fsm.

PUT blocks: Blocks are put to Riak with n=all, pr=1 to keep it visible from reads.

DELETE blocks: This might be performed by GC-spawned delete fsm, or put fsm where active_delete_threshold is enabled. To prevent block leak (block leak is an inconsistent state where no reference from any manifest to the block is lost), HEAD operation to know the vector clock is performed with n=all, pr=quorum. If nothing is found there it is treated as deleted. First trial to delete is with n=all, pr=all, pw=all. If it fails second trial is with n=all and all other options are default of Riak. Operators must be sure that few handoffs are running or waiting (the actual number depends upon N), to prevent such block leaks.

Multipart Upload Support

Multipart uploads: There is one additional detail here. There is effectively a new manifest state (call it a "substate") during the transition from writing state to the active state. It is possible for an upload ID to have 5 parts (call them 1-5), but the completed object would only use parts 1, 4, and 5. In such a case, we must somehow garbage collect parts 2 and 3.

In the case when we must garbage collect some parts as we move to the active state, there is an additional property added to the ?MANIFEST.props property list. The properties are:

  • multipart. The value for the multipart key is a ?MULTIPART_MANIFEST record. This record contains all of the extra metadata required for multipart upload support.

  • multipart_clean. When this property is present, the GC daemon has been notified of all parts that require garbage collection. The presence (or absence) of this property defines a substate-like difference in the active state.

If the "upload finished" command uses all parts, then the manifest will be written with both state=active and also the multipart_clean property present in the ?MANIFEST.props property list.

If the "upload finished" command does not use all parts, then the manifest will first be written with state=active but the multipart_clean property will be absent. After the GC has been successfully scheduled, the multipart_clean property will be added to the props list, and the manifest will be updated.

When an object is fetched, and it is state=active but does not have the multipart_clean property, then GC for the unused parts is rescheduled.

Design of the multipart upload support

In the deep mists of time, there were a couple of designs discussed. They are listed at https://github.com/basho/riak_cs/issues/300 ... they were "Reid's Strawman" and "Scott's Strawman".

In the end, Scott mostly implemented Reid's proposal.

The ?MANIFEST record remains unchanged: the #lfs_manifest_v3 record was not changed. All multipart information is stored in the ?MANIFEST.props property list. The roles of the multipart and multipart_clean properies are explained above.

A ?MULTIPART_MANIFEST record contains the basic extra metadata for multipart uploads. Also, there are three lists of parts, each with a member in the ?MULTIPART_MANIFEST record.

  • ?MULTIPART_MANIFEST.parts -- Info about all parts.
  • ?MULTIPART_MANIFEST.done_parts -- All parts that have finished successfullly.
  • ?MULTIPART_MANIFEST.cleanup_parts -- All unused parts, after an "upload finished" operation

The blocks for each part are stored using the same scheme as a single-part file:

  • Each part has a UUID
  • The data for the part is divided into app.config lfs_block_size size blocks/chunks (default = 1 MByte).
  • Each block/chunk has a {UUID, BlockId} name, where BlockId is an integer >= 0.

It was then a small change to make the riak_cs_block_server module use the {UUID, BlockId} naming scheme everywhere.

Garbage Collection

Overview

First, a reminder that for a given named object, multiple internal versions of that object may be stored in the system at one time. Each version of the object is accessible by an object manifest that includes a UUID identifying that particular version. There is at most one active manifest for a named object at any one time and the active version is the only one that is externally available to a Riak CS user.

Garbage collection of an object version involves several different actions. These actions can be divided into synchronous actions that occur while the user is waiting for notification of successful command completion and asynchronous actions that are not directly tied to user actions and occur in the background. These two action groups are described in more detail in the following sections.

Synchronous GC Actions

There are two direct actions a user may take to initiate the garbage collection of an object version: overwriting the object with a new version or deleting the object.

When an object version is overwritten a new object manifest is written with the state set to active and this new version becomes what is available to the user, but in the case of a delete the object is no longer externally available.

Also, as part of the overwrite or delete action, a set of eligible manifest versions are determined and the state of each eligible manifest is changed to pending_delete and the delete_marked_time field is set to a time value representing the current time.

The method for compiling the list of eligible manifests is dependent on the operation.

For object overwrites, the previously active manifest version is selected along with any manifest versions that are in the writing state where the last_block_written_time field (or the write_start_time if last_block_written_time is undefined) of the manifest represents a time value greater than leeway_seconds seconds ago. If a manifest version remains in the writing state for greater than leeway_seconds seconds, it is assumed that that manifest version represents a failed upload attempt and therefore it is acceptable to reap any object blocks that may have been written. Manifest versions in the writing state whose last_block_written_time has not exceeded the leeway_seconds threshold are not deemed eligible because they could represent an object version that is still in the progress of writing its blocks.

Object deletes are more straightforward. Since no object is externally available to the user after the delete operation, then any manifest versions in the active or writing state are eligible to be cleaned up. There is no concern about reaping the object version that is currently being written to become the next active version.

Once the states of the eligible manifests have been updated to pending_delete the manifest information for any pending_delete manifest versions are collected into a CRDT set and the set is written as a value to the riak-cs-gc bucket keyed by a time value representing the current epoch time plus the leeway interval (i.e. the leeway_seconds configuration option). If that write is successful then the state for each manifest in the set is updated to scheduled_delete. This indicates that the blocks of the object have been scheduled for deletion by the garbage collection daemon and avoids other manifest resolution processes for the object from scheduling unnecessary deletions.

Once the manifest enters the scheduled_delete state it remains as a tombstone for a minimum of leeway_seconds.

After these actions have been attempted, the synchronous portion of the garbage collection process is concluded and a response is return to the user who issued the request.

Garbage Collection Daemon

The asynchronous portion of the garbage collection process is orchestrated by the garbage collection daemon ( riak_cs_gc_manager ) that wakes up at specific intervals and spawns a batch process ( riak_cs_gc_batch ) process that has multiple workers underneath. It checks the riak-cs-gc bucket for any scheduled entries that are eligible for reaping, and assigns keys to each worker. Whan a worker gets GC bucket keys assigned by batch process, it processes block deletion and manifest reaping.

 +----------+            +--------+
 |gc_manager+--(spawn)-->|gc_batch| ..(fetch keys by 2i query)
 +----------+            +---+----+
       \                /    |  +-----------+
        <-report finish/     +--+ gc_worker | ..(process keys assiend by batch)
                             |  +-----------+
                             |  +-----------+
                             +--+ gc_worker |
                             |  +-----------+
                             .....
                             |  +-----------+
                             +--+ gc_worker |
                                +-----------+

The enters a running state and begins to delete the object blocks associated with the eligible keys and continues until all keys have been processed. The duration of this running state varies depending on the number of keys involved and the size of the objects they represent. The daemon checks for messages after processing each object so that the work interval may be manually interrupted if needed.

Deletion eligibility is determined using the key values in the riak-cs-gc bucket which are time values. If the current time minus leeway period according to the daemon is later than the time represented by a key, the blocks for the object manifest stored at that key are eligible for deletion and the daemon attempts to delete them.

The batch gathers the eligible keys for deletion by performing a secondary index range query on the $key index with a lower bound of time 0 and an upper bound of the current time minus leeway period. This allows the batch to collect all the keys that are eligible for deletion and have some way of accounting for clock skew.

Once the object blocks represented by a key in the riak-cs-gc bucket have all been deleted, the key is deleted from the riak-cs-gc bucket.

Manifest Updates

Manifest versions are retrieved and updated by the riak_cs_manifest_fsm module with very few exceptions. This module encapsulates the logic needed to retrieve the manifests, resolve any conflicts due to siblings, and write updated manifest versions back to Riak.

Object Block Reaping

The actual deletion of the blocks of an object is managed by the riak_cs_delete_fsm module. It starts up a number of delete workers (based on the configured delete concurrency) and passes off object block information to those workers who in turn carry out the actual delete operation for that block. The delete workers are instances of the riak_cs_block_server module.

Once a worker deletes a block it notifies the delete fsm and waits for notification about another block to delete. Once all blocks of an object are deleted then the delete fsm starts an instance of the manifest fsm to handle deleting the manifest version from the object manifest data structure and if there are no remaining manifest versions to delete the entire object manifest data structure. The goal of this final step is to avoid the cost of scanning through empty manifest keys that could linger indefinitely.

Trade-offs

  1. An slow reader may have blocks GC'd as it is reading an object if the read exceeds the leeway interval.
  2. There is some reliance on system clocks and this could lead to object blocks being deleted earlier or later than their intended eligibility window dictates due to clock skew.
  3. A network partition (or machine failure) lasting longer than leeway_seconds could cause a manifest to "come back to life" and appear active, it would then continually serve requests whose blocks could not be found.
You can’t perform that action at this time.