Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions java/core/src/java/org/apache/orc/MemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public interface MemoryManager {

interface Callback {
/**
* The writer needs to check its memory usage
* The scale factor for the stripe size has changed and thus the
* writer should adjust their desired size appropriately.
* @param newScale the current scale factor for memory allocations
* @return true if the writer was over the limit
* @throws IOException
*/
boolean checkMemory(double newScale) throws IOException;
}
Expand All @@ -63,6 +63,21 @@ void addWriter(Path path, long requestedAllocation,
* Give the memory manager an opportunity for doing a memory check.
* @param rows number of rows added
* @throws IOException
* @deprecated Use {@link MemoryManager#checkMemory} instead
*/
void addedRow(int rows) throws IOException;

/**
* As part of adding rows, the writer calls this method to determine
* if the scale factor has changed. If it has changed, the Callback will be
* called.
* @param previousAllocation the previous allocation
* @param writer the callback to call back into if we need to
* @return the current allocation
*/
default long checkMemory(long previousAllocation,
Callback writer) throws IOException {
addedRow(1024);
return previousAllocation;
}
}
15 changes: 5 additions & 10 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -942,19 +942,14 @@ public static WriterOptions writerOptions(Properties tableProperties,
return new WriterOptions(tableProperties, conf);
}

private static ThreadLocal<MemoryManager> memoryManager = null;
private static MemoryManager memoryManager = null;

private static synchronized MemoryManager getStaticMemoryManager(
final Configuration conf) {
private static synchronized
MemoryManager getStaticMemoryManager(Configuration conf) {
if (memoryManager == null) {
memoryManager = new ThreadLocal<MemoryManager>() {
@Override
protected MemoryManager initialValue() {
return new MemoryManagerImpl(conf);
}
};
memoryManager = new MemoryManagerImpl(conf);
}
return memoryManager.get();
return memoryManager;
}

/**
Expand Down
122 changes: 30 additions & 92 deletions java/core/src/java/org/apache/orc/impl/MemoryManagerImpl.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,57 +20,35 @@

import org.apache.orc.MemoryManager;
import org.apache.orc.OrcConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicLong;

/**
* Implements a memory manager that keeps a global context of how many ORC
* writers there are and manages the memory between them. For use cases with
* dynamic partitions, it is easy to end up with many writers in the same task.
* By managing the size of each allocation, we try to cut down the size of each
* allocation and keep the task from running out of memory.
*
*
* This class is not thread safe, but is re-entrant - ensure creation and all
* invocations are triggered from the same thread.
*/
public class MemoryManagerImpl implements MemoryManager {

private static final Logger LOG = LoggerFactory.getLogger(MemoryManagerImpl.class);

/**
* How often should we check the memory sizes? Measured in rows added
* to all of the writers.
*/
final long ROWS_BETWEEN_CHECKS;
private final long totalMemoryPool;
private final Map<Path, WriterInfo> writerList =
new HashMap<Path, WriterInfo>();
private long totalAllocation = 0;
private double currentScale = 1;
private int rowsAddedSinceCheck = 0;
private final OwnedLock ownerLock = new OwnedLock();

@SuppressWarnings("serial")
private static class OwnedLock extends ReentrantLock {
public Thread getOwner() {
return super.getOwner();
}
}
private final Map<Path, WriterInfo> writerList = new HashMap<>();
private final AtomicLong totalAllocation = new AtomicLong(0);

private static class WriterInfo {
long allocation;
Callback callback;
WriterInfo(long allocation, Callback callback) {
WriterInfo(long allocation) {
this.allocation = allocation;
this.callback = callback;
}
}

Expand All @@ -80,26 +58,16 @@ private static class WriterInfo {
* pool.
*/
public MemoryManagerImpl(Configuration conf) {
double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
ROWS_BETWEEN_CHECKS = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
LOG.info(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "=" + ROWS_BETWEEN_CHECKS);
if(ROWS_BETWEEN_CHECKS < 1 || ROWS_BETWEEN_CHECKS > 10000) {
throw new IllegalArgumentException(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute() + "="
+ ROWS_BETWEEN_CHECKS + " is outside valid range [1,10000].");
}
totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
getHeapMemoryUsage().getMax() * maxLoad);
ownerLock.lock();
this(Math.round(ManagementFactory.getMemoryMXBean().
getHeapMemoryUsage().getMax() * OrcConf.MEMORY_POOL.getDouble(conf)));
}

/**
* Light weight thread-safety check for multi-threaded access patterns
* Create the memory manager
* @param poolSize the size of memory to use
*/
private void checkOwner() {
if (!ownerLock.isHeldByCurrentThread()) {
LOG.warn("Owner thread expected {}, got {}",
ownerLock.getOwner(), Thread.currentThread());
}
public MemoryManagerImpl(long poolSize) {
totalMemoryPool = poolSize;
}

/**
Expand All @@ -108,43 +76,32 @@ private void checkOwner() {
* @param path the file that is being written
* @param requestedAllocation the requested buffer size
*/
public void addWriter(Path path, long requestedAllocation,
public synchronized void addWriter(Path path, long requestedAllocation,
Callback callback) throws IOException {
checkOwner();
WriterInfo oldVal = writerList.get(path);
// this should always be null, but we handle the case where the memory
// manager wasn't told that a writer wasn't still in use and the task
// starts writing to the same path.
if (oldVal == null) {
oldVal = new WriterInfo(requestedAllocation, callback);
oldVal = new WriterInfo(requestedAllocation);
writerList.put(path, oldVal);
totalAllocation += requestedAllocation;
totalAllocation.addAndGet(requestedAllocation);
} else {
// handle a new writer that is writing to the same path
totalAllocation += requestedAllocation - oldVal.allocation;
totalAllocation.addAndGet(requestedAllocation - oldVal.allocation);
oldVal.allocation = requestedAllocation;
oldVal.callback = callback;
}
updateScale(true);
}

/**
* Remove the given writer from the pool.
* @param path the file that has been closed
*/
public void removeWriter(Path path) throws IOException {
checkOwner();
public synchronized void removeWriter(Path path) throws IOException {
WriterInfo val = writerList.get(path);
if (val != null) {
writerList.remove(path);
totalAllocation -= val.allocation;
if (writerList.isEmpty()) {
rowsAddedSinceCheck = 0;
}
updateScale(false);
}
if(writerList.isEmpty()) {
rowsAddedSinceCheck = 0;
totalAllocation.addAndGet(-val.allocation);
}
}

Expand All @@ -163,48 +120,29 @@ public long getTotalMemoryPool() {
* available for each writer.
*/
public double getAllocationScale() {
return currentScale;
long alloc = totalAllocation.get();
return alloc <= totalMemoryPool ? 1.0 : (double) totalMemoryPool / alloc;
}

/**
* Give the memory manager an opportunity for doing a memory check.
* @param rows number of rows added
* @throws IOException
*/
@Override
public void addedRow(int rows) throws IOException {
rowsAddedSinceCheck += rows;
if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
notifyWriters();
}
// PASS
}

/**
* Notify all of the writers that they should check their memory usage.
* @throws IOException
* Obsolete method left for Hive, which extends this class.
* @deprecated remove this method
*/
public void notifyWriters() throws IOException {
checkOwner();
LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
for(WriterInfo writer: writerList.values()) {
boolean flushed = writer.callback.checkMemory(currentScale);
if (LOG.isDebugEnabled() && flushed) {
LOG.debug("flushed " + writer.toString());
}
}
rowsAddedSinceCheck = 0;
// PASS
}

/**
* Update the currentScale based on the current allocation and pool size.
* This also updates the notificationTrigger.
* @param isAllocate is this an allocation?
*/
private void updateScale(boolean isAllocate) throws IOException {
if (totalAllocation <= getTotalMemoryPool()) {
currentScale = 1;
} else {
currentScale = (double) getTotalMemoryPool() / totalAllocation;
@Override
public long checkMemory(long previous, Callback writer) throws IOException {
long current = totalAllocation.get();
if (current != previous) {
writer.checkMemory(getAllocationScale());
}
return current;
}
}
45 changes: 30 additions & 15 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private static final int MIN_ROW_INDEX_STRIDE = 1000;

private final Path path;
private long adjustedStripeSize;
private final long stripeSize;
private final int rowIndexStride;
private final TypeDescription schema;
private final PhysicalWriter physicalWriter;
Expand All @@ -107,6 +107,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private final TreeWriter treeWriter;
private final boolean buildIndex;
private final MemoryManager memoryManager;
private long previousAllocation = -1;
private long memoryLimit;
private final long ROWS_PER_CHECK;
private long rowsSinceCheck = 0;
private final OrcFile.Version version;
private final Configuration conf;
private final OrcFile.WriterCallback callback;
Expand Down Expand Up @@ -178,7 +182,7 @@ public WriterImpl(FileSystem fs,
}
this.writeTimeZone = hasTimestamp(schema);
this.useUTCTimeZone = opts.getUseUTCTimestamp();
this.adjustedStripeSize = opts.getStripeSize();
this.stripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
Expand Down Expand Up @@ -212,9 +216,11 @@ public WriterImpl(FileSystem fs,
MIN_ROW_INDEX_STRIDE);
}
// ensure that we are able to handle callbacks before we register ourselves
memoryManager.addWriter(path, opts.getStripeSize(), this);
ROWS_PER_CHECK = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
memoryLimit = stripeSize;
memoryManager.addWriter(path, stripeSize, this);
LOG.info("ORC writer created for path: {} with stripeSize: {} options: {}",
path, adjustedStripeSize, unencryptedOptions);
path, stripeSize, unencryptedOptions);
}

//@VisibleForTesting
Expand All @@ -226,7 +232,7 @@ public static int getEstimatedBufferSize(long stripeSize, int numColumns,
// sizes.
int estBufferSize = (int) (stripeSize / (20L * numColumns));
estBufferSize = getClosestBufferSize(estBufferSize);
return estBufferSize > bs ? bs : estBufferSize;
return Math.min(estBufferSize, bs);
}

@Override
Expand Down Expand Up @@ -286,15 +292,22 @@ public static CompressionCodec createCodec(CompressionKind kind) {

@Override
public boolean checkMemory(double newScale) throws IOException {
long limit = Math.round(adjustedStripeSize * newScale);
long size = treeWriter.estimateMemory();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + physicalWriter + " size = " + size +
" limit = " + limit);
}
if (size > limit) {
flushStripe();
return true;
memoryLimit = Math.round(stripeSize * newScale);
return checkMemory();
}

private boolean checkMemory() throws IOException {
if (rowsSinceCheck >= ROWS_PER_CHECK) {
rowsSinceCheck = 0;
long size = treeWriter.estimateMemory();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + physicalWriter + " size = " + size +
" limit = " + memoryLimit);
}
if (size > memoryLimit) {
flushStripe();
return true;
}
}
return false;
}
Expand Down Expand Up @@ -676,7 +689,9 @@ public void addRowBatch(VectorizedRowBatch batch) throws IOException {
rowsInStripe += batch.size;
treeWriter.writeRootBatch(batch, 0, batch.size);
}
memoryManager.addedRow(batch.size);
rowsSinceCheck += batch.size;
previousAllocation = memoryManager.checkMemory(previousAllocation, this);
checkMemory();
} catch (Throwable t) {
try {
close();
Expand Down
Loading