SIP 3

kimballa edited this page Sep 14, 2010 · 7 revisions
SIP 3
Title File format for large object (LOB) storage
Author Aaron Kimball (aaron at cloudera dot com)
Created May 5, 2010
Status Accepted
Discussion http://github.com/cloudera/sqoop/issues/issue/11
Implementation kimballa/sqoop@c385de1d

Abstract

This is a proposal for a file format for the storage of large objects. This describes the file format itself, its reader/writer API, and how it would be integrated with the primary record storage.

Problem statement

Large objects (columns of type CLOB and BLOB) can be very large; often larger than can be reasonably materialized completely in-memory. The main mechanisms Hadoop makes available to access records depend on them being fully materialized in memory, even if their contents are fully or partially ignored. Large objects in databases are often stored indirectly; a record locator is used to manipulate the record, but its contents are accessed lazily (e.g., through an InputStream) when desired.

Sqoop has recently added a mechanism that allows indirect storage of large objects, but lacks the ability to efficiently store these large objects in consolidated files.

This proposal outlines:

  • A file format for storage of large records. Individual records are stored as uninterpreted byte or character streams, and are accessed lazily. Iterating through the records is an inexpensive operation; users can skip records without needing to deserialize them.
  • The API through which these files are accessed. Large object files can be manipulated directly (e.g., as a collection of large objects) by Java programs operating on individual files, or through MapReduce programs.
  • The manner of integration between these large object storage files and the regular record storage.

Specification

File format requirements

  • Must support very large objects (at least several gigabytes in length).
  • Exact length of the objects are not known ahead of time.
  • Users must be able to partially read an object then efficiently transition to the next object without reading the entire object they opened.
  • Should support compression. The objects are assumed to be very large, so a per-record compression system is acceptable.
  • Should support splitting for use as an InputFormat to MapReduce.
  • Individual records must be addressable by byte offset or another marker easily computed at write time.

Anti-requirements

  • Data can be restricted to uninterpreted byte and character streams. Further typing is unnecessary.
  • Does not require (key, value) pairs; values alone are sufficient.

Data model

A LobFile is an unordered collection of values with integer record id keys. Values are character arrays or byte arrays, with an arbitrary length. This length may be several gigabytes. Individual values are not expected to be fully materializable in memory at a point in time. Users will lazily consume data from values. Zero-length values are allowed.

An arbitrary (but assumed small) amount of user-specified metadata may be included in the file. Some metadata elements are well-defined and are used as parameters to the specific encoding of values in the file. Other elements are left to the user to define and interpret. The metadata is assumed to be fully materializable.

Format specification

The LobFile format includes an arbitrary number of variable-length records. The start of each record is demarcated by a RecordStartMark, a 16-byte string that is unique on a per-file basis. Following the RecordStartMark is a non-negative integer, indicating that the record is actually a user data record, or a negative integer, which describes one of several internal record formats.

The lengths of the internal records are usually encoded following the
record type id. The lengths of the user’s data records are stored in an
index at the end of the file.

The LobFile format is as follows:

LobFile ::= LobHeader LobRecord* LobIndex Finale

LobHeader ::= "LOB" Integer(versionNum) RecordStartMark MetaBlock
RecordStartMark ::= Byte[16]
MetaBlock ::= Integer(numEntries) MetaEntry
MetaEntry ::= Utf8String(key) BytesWritable(val)

LobRecord ::= RecordStartMark Long(entryId) Long(claimedLen) Byte[*](data)

LobIndex ::= IndexSegment* IndexTable
IndexSegment ::= RecordStartMark Long(-1) Long(segmentLen)
    Long[entriesPerSegment](recordLen)

IndexTable ::= RecordStartMark Long(-3)
    Int(tableCount) IndexTableEntry*
IndexTableEntry ::= Long(segmentOffset) Long(firstIndexId)
    Long(firstIndexOffset) Long(lastIndexOffset)

Finale ::= RecordStartMark Long(-2) Long(indexStart)

Listing of LobFile format expressed as a context-free grammar.

Data serialization

Values in the grammar above with type Utf8String are UTF-8 encoded length-prefixed strings. In Java, this is performed using the Hadoop Text.writeString(DataOutput, String) method, which writes the length of the string (in bytes) as a VInt followed by the UTF-8 encoded byte array itself.

Integer values are signed 32-bit quantities written with VInt encoding. In Java, this is perfomed using the Hadoop WritableUtils.writeVInt(DataOutput, int) method. For values -120 <= i <= 127, the actual value is directly encoded as one byte. For other values of i, the first byte value indicates whether the integer is positive or negative, and the number of bytes that follow. If the first byte value v is between -121 and -124, the following integer is positive, with number of bytes that follow are -(v+120). If the first byte value v is between -125 and -128, the following integer is negative, with number of bytes that follow are -(v+124). Bytes are stored in the high-non-zero-byte-first order.

Long values are signed 64-bit quantities written with VInt encoding. In Java, this is performed using the Hadoop WritableUtils.writeVLong(DataOutput, long) method. For -112 <= i <= 127, only one byte is used with the actual value. For other values of i, the first byte value indicates whether the long is positive or negative, and the number of bytes that follow. If the first byte value v is between -113 and -120, the following long is positive, with number of bytes that follow are -(v+112). If the first byte value v is between -121 and -128, the following long is negative, with number of bytes that follow are -(v+120). Bytes are stored in the high-non-zero-byte-first order.

The MetaEntry array is a mapping from strings (length-prefixed UTF-8 encoded, as above) to BytesWritable values, which are length-prefixed byte arrays. These byte arrays are written using instances of the org.apache.hadoop.io.BytesWritable serializable class. This class writes the length prefix as a 2’s-compliment 32-bit value in big endian order (see the DataOutput Javadoc for the specific formula), followed by the byte array itself.

Header

The header defines the basic properties of the file.

The versionNum must currently be 0. Different values of versionNum imply other formats related to this one but not yet defined.

The RecordStartMark is a randomly-chosen array of 16 bytes. It should be different on a per-file basis. It appears once in the header to define the RecordStartMark for the file, and then once before each actual data record. This allows clients to seek to the beginning of an arbitrary record.

The MetaBlock contains an a set of arbitrary key, value pairs.
Some of these key-value pairs are well-defined:

  • “EntryEncoding” — should be the UTF-8 encoded string “CLOB” or “BLOB”. BLOB is assumed if missing.
  • “CompressionCodec” — if present, should be the UTF-8 encoded name of the codec to use to decompress each entry. The compressor is reset between each record (they are encoded independently). Only the data byte array is encoded.
  • “EntriesPerSegment” — should be the VInt-encoded number of length values in a given IndexSegment. All IndexSegments (except the final IndexSegment in a file) should contain exactly this many entries. This metadata entry is required.

Files with an EntryEncoding of “CLOB” should provide a character-based access mechanism (e.g., java.io.Reader) to records, but may be used in a byte-based fashion (e.g., java.io.InputStream).

Data records

Following the header are the user’s data records. These records have sequentially increasing ‘entryId’ fields; the first record has entryId=0. These entryIds refer to the offset into the LobIndex.

The claimedLen field for a record refers to the length to advertise to consumers of a record. It does not strictly specify the amount of data held in the file. For character-based records, it may refer to the length of the record in characters, not bytes.

Following the per-record header information of entryId and claimedLen is the data byte array itself. This may be compressed on a per-record basis according to the CompressionCodec specified in the MetaBlock.

The LobRecords are variable length. Their lengths may not be known ahead of time due to the use of compression. Their true in-file lengths are recorded in the LobIndex.

Index records

The LobIndex is written to the end of the file. It contains an arbitrary number of IndexSegment records. Each IndexSegment begins with the VLong-encoded value -1 (to distinguish it from a LobRecord), and contains an array of record lengths. The LobIndex is a complete index of all lengths, and they run sequentially. i.e., the first IndexSegment may contain the lengths of records 0..4095. The next IndexSegment (usually immediately adjacent in the file, but not technically required) contains the lengths of records 4096..8191.

The segmentLen field in an IndexSegment captures the number of bytes required to write the recordLen array.

It then provides an array of recordLen values, which correspond to the true length of the entire LobRecord. This includes the RecordStartMark, the lengths of the VLong-encoded entryId and claimedLen fields, and the true
compressed length of the data byte array. An entry can be expediently retrieved by index using this mechanism.

Following the IndexSegment array is the IndexTable record. This is a higher-order index used for seeking. IndexSegments may be read lazily out of the file as the reader requires their data. The IndexTable is always held completely in memory. The IndexTable begins with a RecordStartMark and a record type id of -3. It then encodes its own length.

The IndexTable is an array of table entries. tableCount is the number of entries in the IndexTableEntry array.

Each IndexTableEntry represents one IndexSegment. It contains the offset of the IndexSegment in the file, the first entryId whose length is present in the segment, and the first and last offsets of records indexed by the segment. This way, when seeking to an arbitrary offset in the file, one can scan through the IndexTable to find the correct IndexSegment, then seek directly to the correct IndexSegment, read a relatively small amount of data and determine the length of and absolute offset to the first record following the user’s requested seek target.

Finale

The Finale is always at the very end of the file. It contains the RecordStartMark, the record type id -2, and the offset in the file of the start of the IndexTable.

Recovery semantics

The biggest vulnerability of these files is that the index is written to the end. Thus, an interrupted writer may close the file before writing the index data. Nevertheless, the data is still recoverable.

Since the records are all demarcated by RecordStartMarks, then the data can be extracted from a truncated file by scanning forward and reading the data sequentially.

The IndexSegments are complete indices of all the lengths of all the records. So by scanning forward through the IndexSegments, the IndexTable can be rebuilt if truncated.

Since the IndexSegments and IndexTable are led by their record type ids of -1 and -3, which cannot be confused for regular entryIds, the locations of these records can be found by linear scanning, in case the finale was not written to the file.

Compression

LobFiles should support a variety of compression codecs. To support different codecs in a language-neutral manner, the LobFile defines support for a number of codecs, specified by strings. Each codec name is bound to a concrete implementation in Java.

A user specifies that a file is compressed with a particular codec by specifying the "CompressionCodec" key in the MetaBlock. The value associated with this key must be a string identifying the codec name.

The following table describes the codecs that may be used:

name Java implementation class
none (none)
deflate org.apache.hadoop.io.compress.DefaultCodec
lzo com.hadoop.compression.lzo.LzoCodec

If the "CompressionCodec" key is not specified, then it is assumed that it has the value "none"; this implies that user data is written as an uninterpreted byte array directly to the file.

The "deflate" codec uses the deflate algorithm (specified in RFC 1951) and can be implemented using the zlib library. Hadoop’s DefaultCodec will use zlib if native bindings are installed, or a compatible Java implementation otherwise.

The "lzo" compression codec can be found in the external GPL hadoop lzo compression library at
http://code.google.com/p/hadoop-gpl-compression/ or http://github.com/kevinweil/hadoop-lzo. This must be separately installed.

File API

Writing LobFiles

LobFiles are created through a call to a method LobFile.create(), which allows the actual writer class to be selected dynamically. This will return an instance of type LobFile.Writer with the following public API:

  /**
   * Class that writes out a LobFile. Instantiate via LobFile.create().
   */
  public static abstract class Writer implements Closeable {

    /**
     * If this Writer is writing to a physical LobFile, then this returns
     * the file path it is writing to. Otherwise it returns null.
     * @return the fully-qualified path being written to by this writer.
     */
    public abstract Path getPath();

    /**
     * Finishes writing the LobFile and closes underlying handles.
     */
    public abstract void close() throws IOException;

    /**
     * Terminates the current record and writes any trailing zero-padding
     * required by the specified record size.
     * This is implicitly called between consecutive writeBlobRecord() /
     * writeClobRecord() calls.
     */
    public abstract void finishRecord() throws IOException;

    /**
     * Declares a new BLOB record to be written to the file.
     * @param len the "claimed" number of bytes that will be present in this
     * record. The actual record may contain more or fewer bytes than len.
     */
    public abstract OutputStream writeBlobRecord(long len) throws IOException;

    /**
     * Declares a new CLOB record to be written to the file.
     * @param len the "claimed" number of characters that will be written
     * to this record. The actual number of characters may differ.
     */
    public abstract java.io.Writer writeClobRecord(long len)
        throws IOException;

    /**
     * Report the current position in the output file
     * @return the number of bytes written through this Writer.
     */
    public abstract long tell() throws IOException;
  }

Listing of LobFile.Writer API

Reading LobFiles

The LobFile.open() method will allow a user to read a LobFile. This will inspect the header of the file and determine the appropriate sub-format (as specified by the versionNum field of the file) and return an appropriate instance of type LobFile.Reader whose API follows:

  /**
   * Class that can read a LobFile. Create with LobFile.open().
   */
  public static abstract class Reader implements Closeable {
    /**
     * If this Reader is reading from a physical LobFile, then this returns
     * the file path it is reading from. Otherwise it returns null.
     * @return the fully-qualified path being read by this reader.
     */
    public abstract Path getPath();

    /**
     * Report the current position in the file
     * @return the current offset from the start of the file in bytes.
     */
    public abstract long tell() throws IOException;

    /**
     * Move the file pointer to the first available full record beginning at
     * position 'pos', relative to the start of the file.  After calling
     * seek(), you will need to call next() to move to the record itself.
     * @param pos the position to seek to or past.
     */
    public abstract void seek(long pos) throws IOException;

    /**
     * Advances to the next record in the file.
     * @return true if another record exists, or false if the
     * end of the file has been reached.
     */
    public abstract boolean next() throws IOException;

    /**
     * @return true if we have aligned the Reader (through a call to next())
     * onto a record.
     */
    public abstract boolean isRecordAvailable();

    /**
     * Reports the length of the record to the user.
     * If next() has not been called, or seek() has been called without
     * a subsequent call to next(), or next() returned false, the return
     * value of this method is undefined.
     * @return the 'claimedLen' field of the current record. For
     * character-based records, this is often in characters, not bytes.
     * Records may have more bytes associated with them than are reported
     * by this method, but never fewer.
     */
    public abstract long getRecordLen();

    /**
     * Return the byte offset at which the current record starts.
     * If next() has not been called, or seek() has been called without
     * a subsequent call to next(), or next() returned false, the return
     * value of this method is undefined.
     * @return the byte offset of the beginning of the current record.
     */
    public abstract long getRecordOffset();

    /**
     * Return the entryId of the current record to the user.
     * If next() has not been called, or seek() has been called without
     * a subsequent call to next(), or next() returned false, the return
     * value of this method is undefined.
     * @return the 'entryId' field of the current record.
     */
    public abstract long getRecordId();

    /**
     * @return an InputStream allowing the user to read the next binary
     * record from the file.
     */
    public abstract InputStream readBlobRecord() throws IOException;

    /**
     * @return a java.io.Reader allowing the user to read the next character
     * record from the file.
     */
    public abstract java.io.Reader readClobRecord() throws IOException;

    /**
     * Closes the reader.
     */
    public abstract void close() throws IOException;

    /**
     * @return true if the Reader.close() method has been called.
     */
    public abstract boolean isClosed();
  }

Listing of the LobFile.Reader API

This API allows users to seek to arbitrary records by their position in the file (retrieved by calling Writer.tell() immediately before calling writeBlobRecord() or writeClobRecord()).

Not yet implemented is efficient access by entryId, although this also should be possible given the current format specification.

Integration with primary record storage

Records are ordinarily stored in fully-materialized form either as delimited text or in a binary form using SequenceFiles.

Large objects that are actually small (e.g., a couple MB; this value is configurable through --inline-lob-limit) are stored inline with the main record storage. Large objects that cross this threshold are written to a LobFile that is opened in tandem with the main file being written to by the import process. The large object will be referenced in the main record storage by a string-based record locator which provides the location of the large object itself.

This locator is a string with the form “externalLob(lf,filename,offset,len)”. The string externalLob indicates that this is an externally-stored object. The "lf" parameter notes that the large object is stored in a LobFile (other formats, e.g., SequenceFile, may be used in the future). Following this are the filename where the records are stored, the offset in the file where the record begins, and the length of the record. These are used to open the LobFile (through the LobFile.Reader API), seek to the start of the record, and read the data back to the user. The claimed length of the object is provided here so that the file need not be opened to determine the length of the object. The filename in the locator may be a relative path, in which case it is relative to the directory holding the file for the materialized primary record storage. Storing relative paths allows primary record storage files to be relocated along with their external large object storage.

In either case, objects are accessed through a LobRef which encapsulates the record. The BlobRef type provides an InputStream-based accessor to the record data. The ClobRef type provides a Reader-based accessor. Regardless of whether the object was stored inline with the rest of the record, or externally, the nature of the underlying storage is abstracted from the user.

The reference classes will lazily open the underlying storage if called upon to do so by the user. Open file handles are cached in the current MapReduce process; opening a second large object stored in the same underlying file does not incur additional file-opening overhead (unless multiple LobRef instances open the same file concurrently). This ensures that records within the same file can be iterated over efficiently. If all records in the same file are accessed by the user sequentially, the user will see sequential performance (as seeks are only necessary every few thousand records to retrieve more of the LobIndex). When a seek is necessary to align on the position of another record, the Reader will heuristically determine whether to seek directly to the new location, or to read and consume the bytes between the current location and the target, to ensure that streaming performance is utilized where available.

Compatibility Issues

A large object storage mechanism has not yet been released, so there are no backwards compatibility issues to speak of.

Test Plan

The reference implementation provides unit tests which access large objects through the LobFile API as well as through BlobRef and ClobRef, integrating these files with regular record storage.

Furthermore, a longer-running “stress test” writes several large files which contain records of multiple gigabytes each. Operations such as seeking, iterating, and reading these records are then performed to ensure proper operation.

Discussion

Please provide feedback and comments at http://github.com/cloudera/sqoop/issues/issue/11