Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelizing parquet write and spark's external read operation. #294

Merged
merged 1 commit into from Mar 15, 2018

Conversation

ovj
Copy link
Contributor

@ovj ovj commented Jan 3, 2018

For large data inserts we have noticed that time spent in reading records from spark's external sorter is almost comparable to time spent in writing records to parquet. We want to reduce the overall write time by parallelizing read and write operations.

As a part of the PR we are trying to parallelize below operations

  • reading records from spark's external record reader + pre computing insert value (this saves writer thread's time) (depends a lot on the complexity of the schema).
  • writing records to parquet file.

With these changes we are able to reduce our final parquet write stage runtime from ~1.1h to ~19min.

@vinothchandar , @n3nash, @jianxu please take a look at it.

FYI @esmioley

@ovj
Copy link
Contributor Author

ovj commented Jan 11, 2018

Spoke offline with Vinoth. Summarizing our discussion here. We want to see if we can do the same for MergeHandle or not. Here are my initial thoughts on this.
Merge Handle does 2 below operations

  • Read records from Spark and add it to "keyToNewRecords". There is no benefit in parallelizing this operation.
  • Read records from old parquet file and write it to new parquet file. I think there is some room here to optimize based on how much total time is spent in reading records from old parquet file vs total time spent in writing records to new parquet file. I will measure the timings and update it. If needed I will create another PR to handle MergeHandle use case.

@@ -46,6 +46,8 @@
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String INSERT_WRITE_BUFFER_LIMIT = "hoodie.insert.write.buffer.limit";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just call this hoodie.write.buffer.limit since we intend to eventually add this to MergeHandle as well. Please rename the variables everywhere accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually hoodie.write.buffer.limit.bytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Changed it to "hoodie.write.buffer.limit.bytes".

public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRecord<K>> implements Iterator<T> {

private static Logger logger = LogManager.getLogger(BufferedIterator.class);
private static final int RECORD_SAMPLING_RATE = 64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1-line docs on what these constants are.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and comments in general for methods where necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me add comments.

@@ -121,6 +121,10 @@
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoodie-common cannot depend on spark.. this is also pulled in by hoodie-hadoop-mr.. Please remove this dependency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it. Earlier we had created new HoodieSparkTaskContext to pass on TaskContext information to newly launched thread (as TaskContext.get() is threadlocal). But now we have figured out another way to set TaskContext which will not need any of these changes. "TaskContext$.MODULE$.setTaskContext(taskContext);"

import org.apache.spark.TaskContext;
import scala.Serializable;

public class HoodieSparkTaskContext implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand the motivation for this class.. https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/TaskContext.html seems serializable and all this is doing is picking out four variables out of the TaskContext class. Can we remove this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also like I mentioned this class should not reside inside hoodie-common anyways

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this class. setting it via "TaskContext$.MODULE$.setTaskContext(taskContext);"

* every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in buffer accordingly. This is done to
* ensure that we don't OOM.
*/
public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRecord<K>> implements Iterator<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an unit test for BufferedIterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me add one.

private final AtomicBoolean isDone = new AtomicBoolean(false);
private T nextRecord;

private final AtomicLong readCounter = new AtomicLong(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the intermittent newlines between variables? Dont see any obvious grouping of variables either. can we fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure let me fix it.

@Override
protected List<WriteStatus> computeNext() {
List<WriteStatus> statuses = new ArrayList<>();
final HoodieSparkTaskContext hoodieSparkTaskContext = HoodieSparkTaskContext.createNewHoodieSparkTaskContext();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 comment atleast every 10 lines, is a good rule of thumb IMO


private final AtomicLong samplingRecordCounter = new AtomicLong(-1);

public BufferedIterator(final Iterator<T> iterator, HoodieWriteConfig config) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass in the just the value for the buffer limit instead of the whole HoodieWriteConfig object? A class named BufferedIterator having HoodieWriteConfig passed in seems like something to avoid IMO .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

along with bufferLimit I also need schema. will pass both in then.

public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRecord<K>> implements Iterator<T> {

private static Logger logger = LogManager.getLogger(BufferedIterator.class);
private static final int RECORD_SAMPLING_RATE = 64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and comments in general for methods where necessary

this.schema = HoodieIOHandle.createHoodieWriteSchema(config);
}

private void adjustBufferSize(final T record) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename -> adjustBufferSizeIfNeeded

}

private void readNextRecord() {
if (readCounter.incrementAndGet() % 1024 == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull 1024 into a constant above?

}
}

public void startRecordReader() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename to something like startBuffering(), current name makes it sound like you are starting another thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. done.


@Override
public T next() {
if (this.nextRecord == null && !this.isDone.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just call hasNext() here, instead of repeating code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg.

private final long bufferMemoryLimit;
private final Schema schema;

private long avgRecordSizeOfSampledRecords = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: avgSampleSizeBytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private final Schema schema;

private long avgRecordSizeOfSampledRecords = 0;
private long numOfSampledRecords = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: numSamples

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -46,6 +46,8 @@
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String INSERT_WRITE_BUFFER_LIMIT = "hoodie.insert.write.buffer.limit";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually hoodie.write.buffer.limit.bytes

while (true) {
try {
throwExceptionIfFailed();
newRecord = buffer.poll(5, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull 5 into a constant above.

/**
* We will be using record size to determine how many records we should cache and will change permits accordingly.
*/
private final Semaphore rateLimiter = new Semaphore(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use of a Semaphore just to track the buffer size seems an overkill to me.. Can we just use an AtomicLong named currentBufferSize?

Copy link
Contributor Author

@ovj ovj Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used for throttling too. AtomicLong here will not be sufficient.


private void insertRecord(T t) throws Exception {
adjustBufferSize(t);
rateLimiter.acquire();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the permit record based or byte based? in adjustBufferSize, we seem to be acquiring/releasing as much permits as the buffer needs to shrink/grow in bytes.. but here, in insertRecord and as well as in readNextRecord, each permit seems to imply a record?

Side note: if we are just sampling, what value do we use to reduce the buffer size by, once we picked an item off.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

permit is based on record count (and not on bytes). in "adjustBufferSizeIfNeeded" we try to acquire/release multiple permits based on whether new limit has increased or decreased.

private long numOfSampledRecords = 0;
private final Iterator<T> internalIterator;

private final AtomicReference<Exception> hasFailed = new AtomicReference(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can simply use volatile if your intention is just to publish the exception from the other thread here quickly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile may not work here. My intention here is to capture root cause of exception. (other one most likely will be InterruptedException).

} catch (Exception e) {
logger.error("error writing hoodie records", e);
if (writeError.compareAndSet(null, e)) {
bufferedIterator.markAsFailed(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh not a big fan of by hand threads and error notification thread-thread.. Can we just use ExecutorService an https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html if needed, to fire off two futures and cancel one if the other fails..

the need for semaphore here should go away..

this.rateLimiter.release(RECORD_CACHING_LIMIT + 1);
}

private static final class BufferedIteratorRecord<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this class. Can't we just use Optional above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. done.

}
}

private static final class BufferedInsertPayload implements HoodieRecordPayload {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.. can we avoid this class, by narrowing/adjusting the generic type def in the class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I wanted this to offload computing to reader thread. Let me introduce another api in HoodieRecord -> prepareInsertValue(). Let me know what you think.

@ovj
Copy link
Contributor Author

ovj commented Mar 14, 2018

Thanks @vinothchandar. Addressed all your comments. Please take another look at it. I have verified it internally.

@vinothchandar vinothchandar merged commit c5b4cb1 into apache:master Mar 15, 2018
vinishjail97 pushed a commit to vinishjail97/hudi that referenced this pull request Dec 15, 2023
…te and read for avro (apache#8764) (apache#294)

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants