Skip to content

Added support for Disk Spillable Compaction to prevent OOM issues#289

Merged
vinothchandar merged 1 commit intoapache:masterfrom
n3nash:spillable_hashmap
Feb 27, 2018
Merged

Added support for Disk Spillable Compaction to prevent OOM issues#289
vinothchandar merged 1 commit intoapache:masterfrom
n3nash:spillable_hashmap

Conversation

@n3nash
Copy link
Copy Markdown
Contributor

@n3nash n3nash commented Dec 18, 2017

Some assumptions at the moment :

  1. Always write to LocalFS
  2. Key in <Key,Value> is always a string.
  3. Estimation of record is based only on GenericRecord to bytes (which removes the size of the schema)
  4. A magic number for the sizingFactor - just came up with a random value at the moment.
  5. More tests required to test 1) Load 2) Updates on data written to Disk.
  6. Need a way to pass the size of the Map via config. Since we don't have the config during compaction (RTView) need to probably store it in the hoodie.properties file, but hoodie.properties file is immutable at the moment so if client want to tune it they won't be able to. Need a way to be able to provide this knob. One solution is to probably use SparkEnv.get("spark.executor.memory") and then tune based on that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address the indentation introduced in this diff.

@n3nash n3nash force-pushed the spillable_hashmap branch 2 times, most recently from c56657a to cada120 Compare December 22, 2017 21:52
@n3nash
Copy link
Copy Markdown
Contributor Author

n3nash commented Jan 3, 2018

@vinothchandar please take a look when you get a chance.

@vinothchandar
Copy link
Copy Markdown
Member

Will do.. @n3nash can you remove the "WIP" prefix from PRs which are ready for final review.

@vinothchandar
Copy link
Copy Markdown
Member

I will try to get to the reviews before monday meeting next week.

@n3nash n3nash force-pushed the spillable_hashmap branch 2 times, most recently from 550e66a to 3d2dc25 Compare January 5, 2018 17:58
@n3nash n3nash changed the title (WIP) Added support for Disk Spillable Compaction to prevent OOM issues Added support for Disk Spillable Compaction to prevent OOM issues Jan 5, 2018
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename : hoodie.compaction.spill.threshold ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove blank line

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to SpillableMapTestUtils

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets pick this up from a jobConf property.. it can default to 0.75 * freeMemory(), without that, it will fill up heap and potentially OOM with fragmentation issues

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed, I just put it there to start running tests. There are 3 properties we could use : 1) https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapred/JobConf.html#getMemoryForMapTask() 2) https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapred/JobConf.html#getMemoryForReduceTask() 3) https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapred/JobConf.html#MAPRED_TASK_MAXVMEM_PROPERTY (DEPRECATED).
I think getMemoryForMapTask() makes most sense, but it can be that a query to this table is started in a reduce job of a previous query (by default reduce memory = map task memory).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should simplify this more by assuming that a HoodieAvroDataBlock can be held in memory.. i.e during write we batch and create multiple data blocks..

Copy link
Copy Markdown
Contributor Author

@n3nash n3nash Jan 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean just use the old implementation of Maps.newHashMap() ? The problem is we only write 1 data block during a write batch so the size of this can be fairly large given we have the other records which holds all compacted records.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar here, I think you're referring to just just Maps.newHashMap instead of spillable ? With the new logformat changes and lazy reading of data, this should work.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How expensive is this constructor? alternative is to reset this everytime and then update() as needed

Copy link
Copy Markdown
Contributor Author

@n3nash n3nash Jan 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
 * Creates a new CRC32 object.
 */
public CRC32() {
}

That's all it does. No other member variables initialized or anything.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to computeValueSize() ? since you actually serializing here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt: byte[] bytes ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this method could be shared with some code in CompactedRecordScanner as well?

Copy link
Copy Markdown
Contributor Author

@n3nash n3nash Jan 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's why added a TODO, will look into this now that we are cleaning it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you always have to flush() ? it could be costly right?

Copy link
Copy Markdown
Contributor Author

@n3nash n3nash Jan 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, SizeAwareDataOutputStream uses DataOutputStream (which uses FileOutputStream) which doesn't buffer anything, it flushes on every write(..) so flush() is a no-op. ( * The flush method of OutputStream does nothing). But we could consider using BufferedOutputStream and then flush only when a get(..) is called because get(..) uses an inputStream from the file so data needs to be written to disk, but BufferedOutputStream doesn't provide api's like writeLong() and writeInt() or readLong() or readInt() so we have to implement them ourselves.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I am talking about fsync/flush at Linux level (not particularly on java apis which could trigger them)..

Given the entire task is going to be retried if any failures happen, what I would suggest is to let Linux pdflush take care of flushing your appends to the log file completely.. Using a BufferedOutputStream may be tricky, if you need to do a get (like you mentioned) .. Most implementations keep a write buffer and decouple it from the flushing.. Flushing on every get will get expensive as well and may not actually help wuth anything, just incurring more system calls..

what do you think about the options?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. It seems like from this documentation : https://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush() this will just incurr an additional system call. Although, my worry is that we perform get(..) after put(..) but I think the OS should manage that irrespective of this extra system call. Most implementations I've seen keep a BufferedOutputStream but in this case we can just rely on OS buffer and pdflush.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does with or without flush() make a difference performance wise? whats the AI here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ran a small test on a hadoop box :

public class TestFlush {

  public static void main(String args[]) {
    try {
      FileOutputStream fileOutputStream = new FileOutputStream(new File("/tmp/test1"));
      for(int i = 1; i < 1000; i = i*10) {
        int count = 100000;
        count = count*i;
        System.out.println("NumWrites => " + count);
        long startTime = 0;
        startTime = System.currentTimeMillis();
        int j = 0;
        while (j < count) {
          fileOutputStream.write(1);
          j++;
        }
        System.out.println("WithoutFlush => " + (System.currentTimeMillis() - startTime));
        fileOutputStream = new FileOutputStream(new File("/tmp/test2"));
        startTime = System.currentTimeMillis();
        j = 0;
        while (j < count) {
          fileOutputStream.write(1);
          fileOutputStream.flush();
          j++;
        }
        System.out.println("WithFlush => " + (System.currentTimeMillis() - startTime));
      }
    } catch(Exception e) {

    }
  }
}

NumWrites => 100000
WithoutFlush => 117
WithFlush => 117
NumWrites => 1000000
WithoutFlush => 1171
WithFlush => 1223
NumWrites => 10000000
WithoutFlush => 12182
WithFlush => 11930

With Larger bytes :
....
byte [] data = new byte[10000]
...
fileOutputStream.write(data);
....

NumWrites => 100000
WithoutFlush => 838
WithFlush => 848
NumWrites => 1000000
WithoutFlush => 17770
WithFlush => 14958

From the tests there doesn't seem to be much since the OutputStream flush() does not really have any impl.

Action Item : Removed flush() since it's a no-op. Use BufferedOutputStream in future to avoid flushing to disk all the time if we see write() performance degrade. At the moment, from running large jobs, I see write(..) takes < 1 ms.

@n3nash n3nash force-pushed the spillable_hashmap branch from 3d2dc25 to 5769029 Compare January 11, 2018 07:53
@vinothchandar
Copy link
Copy Markdown
Member

lets continue this review, after we work through the perf issues as well..

@n3nash n3nash force-pushed the spillable_hashmap branch from 5769029 to 257ec74 Compare February 3, 2018 21:36
@vinothchandar
Copy link
Copy Markdown
Member

@n3nash can you please point out portions you want me to re-review?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we invest in a RecordSampler class that will run estimate sizes with probability p for each record? This way, its kept more upto date, instead of just using first record.

In this case, lets change variable to metadataEntrySampleSize or sth to denote that this is not an accurate average

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I plan to invest some time in the RecordSampler, I have no cycles right now. I'll rename to what you mentioned, that seems clear, and then open a task to add a record sampler (which might be needed in other places too)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, we may benefit from generic size sampler class

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much this improve the runtime by? compared to cost of sorting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the numbers from sorting vs non-sorting :

----------------- With Sorting enabled---------------------------
18/02/20 20:21:05 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records 43588
18/02/20 20:21:05 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in memory 17109
18/02/20 20:21:05 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in disk 26479
18/02/20 20:21:05 INFO collection.LazyFileIterable: ExtraLogs:: Time Taken to sort 18
18/02/20 20:21:14 INFO compact.HoodieRealtimeTableCompactor: ExtraLogs::Time take to read from spillabel 8513

18/02/21 04:52:40 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records 187068
18/02/21 04:52:40 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in memory 16341
18/02/21 04:52:40 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in disk 170727
18/02/21 04:52:40 INFO collection.LazyFileIterable: ExtraLogs:: Time Taken to sort 146
18/02/21 04:53:45 INFO compact.HoodieRealtimeTableCompactor: ExtraLogs::Time take to read from spillabel 64863

----------------- Without Sorting ---------------------------
18/02/20 22:47:11 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records 170444
18/02/20 22:47:11 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in memory 16341
18/02/20 22:47:11 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in disk 154103
18/02/20 22:47:11 INFO collection.LazyFileIterable: ExtraLogs:: Time Taken to sort 0
18/02/20 22:48:26 INFO compact.HoodieRealtimeTableCompactor: ExtraLogs::Time take to read from spillabel 74631

18/02/21 19:04:38 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records 242087
18/02/21 19:04:38 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in memory 17740
18/02/21 19:04:38 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Size in memory 2900694448
18/02/21 19:04:38 INFO log.HoodieCompactedLogRecordScanner: ExtraLogs:: Number of records in disk 224347
18/02/21 19:04:38 INFO collection.LazyFileIterable: ExtraLogs:: Time Taken to sort 0
18/02/21 19:06:01 INFO compact.HoodieRealtimeTableCompactor: ExtraLogs::Time take to read from spillabel 82955

It seems like sorting gives a minor benefit but again this is controlled by the layout of the dataset on DISK in both cases and how many back and forth disk seeks the non-sorting actually does.
But I realized that this will not matter at the moment since we started to pass Map<> to HoodieMergeHandle instead of Iterator. So when we think of SortMerge we can spend more time on this, WDYT ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. gains also depend on how IO loaded the box really is or if the seek was teh bottleneck.. Even without sorting, its only taking < 1 ms to read, so def not seeking everytime.. and seems like page cache is kicking in..
This will come in handy on a loaded system anyways.. lets revisit down the line .. sure

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar here, will make the change to make 0.75 a constant ? Or can we just pick it up from jobConf and default to some value if not present ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes a job conf would be good..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added a CONF but we probably need to document and expose this conf properly since there should be an easy way to set this for Presto/Hive queries.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what you mean by other objects..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the other objects is just he DiskBased metadata Map<String, Metadata>. I did some calculations and say for a parquet of 1GB there are about 400K entries spilled to disk (worst case), then 400K* (averageSizeofKey=36bytes + metadataSize = 4 + 8 + 8 + sizeoffilepath=max(100bytes)) = 60 MB. So it's not huge, we could technically get rid of sizingFactor but kept it to have a way to be conservative.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay makes sense

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar here, I guess we can changes this to maxMemorySizeInBytes since we will use only the in memory Maps.newHashMap() below ? The problem is We need account for Maps.newHashMap() since we are still reading block level ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to budget for size of a data block correct (after we split it into pieces) . but we can remove the /2 here. sure

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg.

Copy link
Copy Markdown
Contributor Author

@n3nash n3nash Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar here, should we rename this since it can also be used in HoodieMergeHandle or have a different config for MergeHandle spillableMap, I feel a different config with same defaults makes sense, thoughts ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean usage inside compaction vs cow/mergehandle? In that case, we are doing one of those cases right.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we are doing one of those cases but seems like DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES name to set the size of spillable map is weird for cow ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answer would depend on if the SpillableMap is ever used by COW path..I don't think thats true today.. So its okay for now. We can add a new param when we change COW actually.. Thats my take.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@n3nash
Copy link
Copy Markdown
Contributor Author

n3nash commented Feb 12, 2018

@vinothchandar added comments, will rebase as well in sometime. The rebase shouldn't change the code much since it's mostly in the Scanner code and that code doesn't have many changes in this diff so you should be good without the rebase too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we invest in a RecordSampler class that will run estimate sizes with probability p for each record? This way, its kept more upto date, instead of just using first record.

In this case, lets change variable to metadataEntrySampleSize or sth to denote that this is not an accurate average

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this happen outside in the caller of this method, so value remains an opaque byte[] here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Map's types are <Key, HoodieRecord>

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what you mean by other objects..

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much this improve the runtime by? compared to cost of sorting?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah changed it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make a linkedhashmap and throw it away? can't we scan just using the sorted entries?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using the LinkedHashMap for something else and forgot to remove the code, done.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes a job conf would be good..

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, we may benefit from generic size sampler class

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I am talking about fsync/flush at Linux level (not particularly on java apis which could trigger them)..

Given the entire task is going to be retried if any failures happen, what I would suggest is to let Linux pdflush take care of flushing your appends to the log file completely.. Using a BufferedOutputStream may be tricky, if you need to do a get (like you mentioned) .. Most implementations keep a write buffer and decouple it from the flushing.. Flushing on every get will get expensive as well and may not actually help wuth anything, just incurring more system calls..

what do you think about the options?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean usage inside compaction vs cow/mergehandle? In that case, we are doing one of those cases right.

@vinothchandar
Copy link
Copy Markdown
Member

I think this is a good shape to merge for an initial version.. can you file followup ticket for all open items.. we can circle back on this..

@n3nash n3nash force-pushed the spillable_hashmap branch 3 times, most recently from af9c1a6 to 261b950 Compare February 23, 2018 05:34
@n3nash n3nash force-pushed the spillable_hashmap branch 3 times, most recently from 026ec73 to f3db019 Compare February 23, 2018 07:23
@n3nash
Copy link
Copy Markdown
Contributor Author

n3nash commented Feb 23, 2018

Yes, let me file the tickets first and add some more test results tomorrow before we can merge this.

@vinothchandar vinothchandar merged commit 6fec965 into apache:master Feb 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants