Skip to content

Memory Usage Improvement Study

mtuncer edited this page Dec 24, 2014 · 12 revisions

Objective

Improve memory usage performance of cstore_fdw based on findings outlined in document at Memory Usage Improvement Document.

This document will contain the discussion and rationale behind upcoming modifications.

Preliminary Work

A test bed is created with following setup.

  • Single node citusdb (master only)
  • amazon customer review data (1999) with 1172645 rows and 12 columns
  • cstore_fdw with stripe size = 150000, and block size = 10000,
  • 2 cstore tables customer_reviews, and customer_reviews_compressed
  • data size on disk 67B (compressed), 264MB (uncompressed)
  • uncompressed stripe size is approximately 33MB on disk.

Measurements

A macro is used to log process memory usage using mstats() a macro is defined as follows

#define CSTORE_LOGMEM(msg) ( \
    {struct mstats ms = mstats(); \
    ereport(WARNING, (errmsg(msg " bytes used : %d", (int)(ms.bytes_used))));\
    })

Data is inserted into both tables using copy function from csv file. Traces are put inside LoadFilteredStripeData() to log memory usage before and after the stripe load. Following query is executed so that it will cause reading of all stripes and their column values. copy (select * from customer_reviews) to '/dev/null';

Average additional memory used per stripe load is 65MB for uncompressed and 67MB for compressed case. This includes metadata allocated for stripe/block skipping, and exists array for each row/column.

Read optimization

Description

LoadFilteredStripeData() function reads all stripe's data into memory. It uncompresses the data if necessary. A proposed change here is to leave data in serialized format as it is and deserialize it only when needed. Savings in this will be more visible with compressed data.

In the test case, there are 150 000 x 12 = 1 800 000 Datum memory is allocated in addition to deserialized data. It takes 14.4MB of memory by itself. If we only deserialize/uncompress one block at a time we could reduce this usage to 10 000 x 12 x 8 = 960K (10% of original overhead or 13.4MB savings). There could be some additional memory usage due to additional metadata to be kept.

When relevant changes are made on the code. Following results are observed.

Type Memory Usage (before) Memory Usage (After) Difference
uncompressed 65MB 51MB -14MB
compressed 67MB 20MB -47MB

This result was compatible with the expectation. We should be able reduce uncompressed case memory usage little bit more by optimizing memory use on exists data structure and other metadata.

Implementation

Stripe data is organized in StripeData->ColumnData->ColumnBlockData order.

typedef struct ColumnBlockData
{
	bool *existsArray;
	Datum *valueArray;
} ColumnBlockData;

typedef struct ColumnData
{
	ColumnBlockData **blockDataArray;
} ColumnData;

typedef struct StripeData
{
	uint32 columnCount;
	uint32 rowCount;
	ColumnData **columnDataArray;
} StripeData;

LoadFilteredStripeData() function calls LoadColumnData() which in turn loads each block of a column. This function uncompresses, and deserializes data as Datum[]. We removed this functionality and kept the serialized data in new field ColumnBlockData->rawData, ColumnBlockData->valueArray is left NULL. Uncompress/deserialization is left to later stage.

A function SetActiveBlock(ColumnData* columnData, uint64 blockIndex) is introduced to make sure currently needed data is deserialized before being accessed inside ReadStripeNextRow(). New function basically checks if requested block is the same as block previously deserialized, if it is then returns. Otherwise, it frees the old ColumnBlockData->valueArray, sets it to NULL; decompresses/deserializes requested block data into `ColumnBlockData->valueArray' from 'ColumnBlockData->rawData'.

compressionType attribute is needed at to uncompress, and 4 additional attributes (rowCount, typeByValue, typeLength, typeAlign) are needed to deserialize raw data. There fields are introduced into ColumnBlockData at present state. There is an index variable introduced to ColumnData struct to keep index of current deserialized block.

After the changes above data structures looks like

typedef struct ColumnBlockData
{
   bool *existsArray;
   Datum *valueArray;
   StringInfo rawData;
   CompressionType valueCompressionType;
   uint32 rowCount;
   bool typeByValue;
   int typeLength;
   char typeAlign;
} ColumnBlockData;

typedef struct ColumnData
{
   ColumnBlockData **blockDataArray;
} ColumnData;

typedef struct StripeData
{
   uint32 columnCount;
   uint32 rowCount;
   ColumnData **columnDataArray;
} StripeData;

ColumnBlockData structure is polluted with additional state data. Attributes typeByValue, typeLength, and typeAlign could be retrieved from TupleDescriptor. TupleDescriptor is available at both TableReadState and TableWriteState. Attributes rowCount and compressionType are available at ColumnBlockSkipNode structure which is a part of StripeSkipList. This data is available via TableWriteState but not in TableReadState.

We could also have two versions of ColumnBlockData for serialized and deserialized fashion. We could have DeserializedColumnBlockData as a part of TableReadState (and TableWriteState for writing). ColumnBlockData would only contain serialized versions of value (and exist) data.

typedef struct DeserializedColumnBlockData
{
   bool *existsArray;
   Datum *valueArray;
   StringInfo uncompressedDataBuffer;
} DeserializedColumnBlockData;


typedef struct ColumnBlockData
{
   StringInfo serializedExistsArray;
   StringInfo serializedValueArray;
   CompressionType valueCompressionType;
   uint32 rowCount;
} ColumnBlockData;

typedef struct ColumnData
{
   ColumnBlockData **blockDataArray;
} ColumnData;

typedef struct StripeData
{
   uint32 columnCount;
   uint32 rowCount;
   ColumnData **columnDataArray;
   DeserializedColumnBlockData **deserializedColumnBlockData;
   int32  deserializedBlockIndex;
} StripeData;

Currently accessed block is deserialized and accessible inside StripeData structure. StripeData also contains the block index to keep track of deserialization.

DeserializedColumnBlockData->existsArray and DeserializedColumnBlockData->valueArray memory storage is allocated once when the stripe is being loaded, all block reloads use the same memory for those arrays. This will reduce allocations of big memory chunks (80KB for exist array, 80KB value array) at each block reload.

Note that there is an additional member uncompressedDataBuffer in DeserializedColumnBlockData struct. It is needed to keep track of buffer of uncompressed data since Datums inside valueArray contain references to original data. It needs to be freed/realloced during block reloads at this time. Further improvement would be to reuse previously allocated memory, and grow it if needed. That would require some changes in existing decompression code.

Code Changes

Few changes are needed

  • Remove uncompress/deserialization parts from LoadColumnData() function and keep the data as it was read from disk. It will also need to record compression type and number of rows in the data block.
  • add a function to decompress/deserialize block data for target block. This function needs to be called inside CStoreReadNextRow() function just before ReadStripeNextRow() to make sure target block data is available for accessing.

Write Optimizations

Current State

Current write operation work as follows

  • table header/footer structures are created/initialized at CStoreBeginWrite()
  • CStoreWriteRow() is called for inserting a row into stripe
  • it checks if a stripe exist to write into
    • creates an empty stripe if needed (1)
  • computes block index and block row index from current rowCount
  • sets attribute values for each column data at target block (2)
  • updates block min/max values for each column data
  • if stripe row count reaches row count limit
    • stripe is flushed to disk (3)

There are 2 main areas that large chunks of memory is allocated.

  1. CreateEmptyStripeData() function allocates exist and value data buffers for a fully loaded stripe. In the case study this is 150 000 x 12 x ( 8 + 8) = 28.8 MB, excluding auxiliary memory to keep some extra pointers.
  2. memory is allocated for each Datum that is of type by reference
  3. FlushDisk() function
  4. serializes value array and exist arrays per column/block basis. whole serialized stripe data is kept in memory.
  5. compresses column/block data, buffers used for compression are as big as the original data buffers.

Therefore, for each stripe we create 28.8 MB for data storage, memory as data size in stripe, memory as data size for serialization buffer, and memory as data size for compression buffer (serialized data buffer is freed in compressed case). Total memory used for write operation is 2 x stripe data size + 28.8MB + metadata memory. It is useful to note that item 2 also contains excessive number of memory allocations for each Datum of type by reference.

Proposal

As we did in memory read optimizations, we do not need to keep all stripe data both in deserialized/serialized format. We could make some savings by keeping single copy.

Proposed changes are

  1. Use the same data structures used in read optimizations
  2. keep only serialized/compressed data in memory for whole stripe
  3. keep data in deserialized form only for one block of columns.
  4. serialize/compress data values once a column data block is full.

Savings

Currently single stripe takes 2 x actual data + 28.8MB + some overhead in memory. After the changes memory will be needed for

  • serialized/compressed content
  • only one block of deserialized content.

Memory requirement of a stripe may go down to 1.5x stripe data for uncompressed case. Probably even less than the data size for compressed case as opposed almost 3x in current state.

In the test case, after the improvements we should have savings about 60 MBs for uncompressed, and about 80MBs for compressed case.

Impact

  1. CreateEmptyStripeData() will be changed to create containers for ColumnBlockData pointers for each column/block. (code deletion), and memory would be allocated for DeserializedColumnBlockData for each column.
  2. a block full check will be made upon insertion of each row. when a block is filled, it would be serialized/compressed and stored in memory, existing data may need to be cleared in DeserializedColumnBlockData structure.
  3. FlushStripe() will be changed to only serialized/flush last active block (code removal).

Measurement

The same memory usage function is used to get process memory usage. Memory usage at 2 parts are recorded: 1- memory used to store deserialized data and related metadata 2- memory used during flush operation (serialization/compression)

Type Memory Usage (before) Memory Usage (After) Difference
uncompressed (1) 84MB *MB *MB
uncompressed (2) 95MB *MB *MB
uncompressed (Total) 179MB 105MB -74MB
compressed (1) 84MB *MB *MB
compressed (2) 40MB *MB *MB
compressed (Total) 124MB 45MB -79MB

Note that since we do not keep whole deserialized data, we could only measure total memory usage. Results are in line with expectations.

Further Improvements

While we have achieved good amount of savings, 105MB memory usage is still has areas of improvement. We can have additional savings by dismissing the use of valueArray and directly serialize data as individual rows are added. This will save us from storing column values. It would also reduce number of allocations drastically. DatumCopy() function will never be called, and no memory allocation will be made for value storage.