From 557174ec88956406a9c5d84b8a788e05cdadde1a Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 5 Jan 2015 00:48:33 +0900 Subject: [PATCH 1/4] TAJO-1271: Improve memory usage in HashShuffleFileWriteExec. --- .../java/org/apache/tajo/conf/TajoConf.java | 3 +- .../physical/HashShuffleFileWriteExec.java | 79 +++++++++++++------ .../org/apache/tajo/worker/TajoWorker.java | 4 + .../tajo/worker/TaskAttemptContext.java | 1 + .../planner/physical/TestPhysicalPlanner.java | 2 + .../org/apache/tajo/storage/RowStoreUtil.java | 2 + .../org/apache/tajo/tuple/RowBlockReader.java | 2 + .../tuple/offheap/OffHeapRowBlockReader.java | 5 ++ .../src/main/resources/storage-default.xml | 4 +- .../src/test/resources/storage-default.xml | 4 +- .../tajo/storage/HashShuffleAppender.java | 25 ++++-- .../storage/HashShuffleAppenderManager.java | 58 +++++++++++++- .../java/org/apache/tajo/storage/RawFile.java | 13 ++- .../src/test/resources/storage-default.xml | 4 +- 14 files changed, 155 insertions(+), 51 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index ab11ddddef..3bfc9f190a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -202,9 +202,10 @@ public static enum ConfVars implements ConfigKey { SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192), SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120), SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20), - SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000), + SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 128 * 1048576), //128MB SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volumn-mb", 30), HASH_SHUFFLE_PARENT_DIRS("tajo.hash.shuffle.parent.dirs.count", 10), + SHUFFLE_HASH_PARALLEL_EXECUTION_NUM_PER_DISK("tajo.shuffle.hash.parallel-execution.num-per-disk", 1), // Storage Configuration -------------------------------------------------- ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100), diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 3c4949f2a3..1587e68413 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.CatalogUtil; @@ -29,14 +30,18 @@ import org.apache.tajo.plan.logical.ShuffleFileWriteNode; import org.apache.tajo.storage.HashShuffleAppender; import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.RowWriter; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; /** * HashShuffleFileWriteExec is a physical executor to store intermediate data into a number of @@ -52,7 +57,8 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { private final int numShuffleOutputs; private final int [] shuffleKeyIds; private HashShuffleAppenderManager hashShuffleAppenderManager; - private int numHashShuffleBufferTuples; + private int maxBufferSize; + private int initialRowBufferSize; public HashShuffleFileWriteExec(TaskAttemptContext context, final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException { @@ -74,7 +80,8 @@ public HashShuffleFileWriteExec(TaskAttemptContext context, } this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs); this.hashShuffleAppenderManager = context.getHashShuffleAppenderManager(); - this.numHashShuffleBufferTuples = context.getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_BUFFER_SIZE); + this.maxBufferSize = context.getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_BUFFER_SIZE); + this.initialRowBufferSize = Math.max(maxBufferSize / numShuffleOutputs, 64 * StorageUnit.KB); } @Override @@ -92,52 +99,65 @@ private HashShuffleAppender getAppender(int partId) throws IOException { return appender; } -// Map partitionStats = new HashMap(); - Map> partitionTuples = new HashMap>(); + Map partitionTuples = new HashMap(); long writtenBytes = 0L; + long usedMem = 0; @Override public Tuple next() throws IOException { try { Tuple tuple; int partId; - int tupleCount = 0; long numRows = 0; while ((tuple = child.next()) != null) { - tupleCount++; numRows++; partId = partitioner.getPartition(tuple); - List partitionTupleList = partitionTuples.get(partId); - if (partitionTupleList == null) { - partitionTupleList = new ArrayList(1000); - partitionTuples.put(partId, partitionTupleList); + OffHeapRowBlock rowBlock = partitionTuples.get(partId); + if (rowBlock == null) { + rowBlock = new OffHeapRowBlock(outSchema, initialRowBufferSize); + partitionTuples.put(partId, rowBlock); } - try { - partitionTupleList.add(tuple.clone()); - } catch (CloneNotSupportedException e) { - } - if (tupleCount >= numHashShuffleBufferTuples) { - for (Map.Entry> entry : partitionTuples.entrySet()) { + + RowWriter writer = rowBlock.getWriter(); + long prevUsedMem = rowBlock.usedMem(); + RowStoreUtil.convert(tuple, writer); + usedMem += (rowBlock.usedMem() - prevUsedMem); + + if (usedMem >= maxBufferSize) { + List> resultList = Lists.newArrayList(); + for (Map.Entry entry : partitionTuples.entrySet()) { int appendPartId = entry.getKey(); HashShuffleAppender appender = getAppender(appendPartId); - int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); - writtenBytes += appendedSize; - entry.getValue().clear(); + + resultList.add(hashShuffleAppenderManager. + writePartitions(appender, context.getTaskId(), entry.getValue(), false)); + + } + + for (Future future : resultList) { + writtenBytes += future.get(); } - tupleCount = 0; + usedMem = 0; } } // processing remained tuples - for (Map.Entry> entry : partitionTuples.entrySet()) { + List> resultList = Lists.newArrayList(); + for (Map.Entry entry : partitionTuples.entrySet()) { int appendPartId = entry.getKey(); HashShuffleAppender appender = getAppender(appendPartId); - int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); - writtenBytes += appendedSize; - entry.getValue().clear(); + + resultList.add(hashShuffleAppenderManager. + writePartitions(appender, context.getTaskId(), entry.getValue(), true)); + + } + + for (Future future : resultList) { + writtenBytes += future.get(); } + usedMem = 0; TableStats aggregated = (TableStats)child.getInputStats().clone(); aggregated.setNumBytes(writtenBytes); aggregated.setNumRows(numRows); @@ -146,7 +166,7 @@ public Tuple next() throws IOException { partitionTuples.clear(); return null; - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); throw new IOException(e); } @@ -165,6 +185,13 @@ public void close() throws IOException{ appenderMap = null; } + if(partitionTuples != null && partitionTuples.size() > 0){ + for (OffHeapRowBlock rowBlock : partitionTuples.values()) { + rowBlock.release(); + } + partitionTuples.clear(); + } + partitioner = null; plan = null; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 4d96529fe5..e28444d8cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -374,6 +374,10 @@ public void serviceStop() throws Exception { return; } + if (hashShuffleAppenderManager != null) { + hashShuffleAppenderManager.shutdown(); + } + if(webServer != null) { try { webServer.stop(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 3092c47050..3d6f43f38f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -119,6 +119,7 @@ public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager(); } else { try { + // for testing ? this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf()); } catch (IOException e) { LOG.error(e.getMessage(), e); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 6db76aeeb9..f2476a6572 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -665,6 +665,7 @@ public final void testPartitionedStorePlan() throws IOException, PlanningExcepti exec.next(); exec.close(); ctx.getHashShuffleAppenderManager().close(ebId); + ctx.getHashShuffleAppenderManager().shutdown(); String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); @@ -799,6 +800,7 @@ public final void testPartitionedStorePlanWithEmptyGroupingSet() exec.next(); exec.close(); ctx.getHashShuffleAppenderManager().close(ebId); + ctx.getHashShuffleAppenderManager().shutdown(); String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 24b6280cf2..87ad8fefd3 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -356,6 +356,8 @@ public static void convert(Tuple tuple, RowWriter writer) { case FLOAT8: writer.putFloat8(tuple.getFloat8(i)); break; + + case CHAR: case TEXT: writer.putText(tuple.getBytes(i)); break; diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java index be734e1e29..8894f9a535 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java @@ -30,4 +30,6 @@ public interface RowBlockReader { public boolean next(T tuple); public void reset(); + + public int rows(); } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java index 4a9313f1b0..a6dcd74016 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java @@ -60,4 +60,9 @@ public void reset() { curPosForRead = 0; curRowIdxForRead = 0; } + + @Override + public int rows() { + return rowBlock.rows(); + } } diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index abea9de1b0..9ae24b4a61 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -214,7 +214,7 @@ tajo.storage.raw.io.write-buffer.bytes - 131072 - 128KB write buffer + 65536 + 64KB write buffer diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index 712f66428f..03d4a5b7cc 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -180,7 +180,7 @@ tajo.storage.raw.io.write-buffer.bytes - 131072 - 128KB write buffer + 65536 + 64KB write buffer diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java index 4c772c9c90..697951ea01 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java @@ -23,6 +23,8 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.offheap.ZeroCopyTuple; import org.apache.tajo.util.Pair; import java.io.IOException; @@ -38,6 +40,7 @@ public class HashShuffleAppender implements Appender { private FileAppender appender; private AtomicBoolean closed = new AtomicBoolean(false); private int partId; + private int volumeId; private TableStats tableStats; @@ -59,11 +62,14 @@ public class HashShuffleAppender implements Appender { private ExecutionBlockId ebId; - public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { + private final ZeroCopyTuple zeroCopyTuple = new ZeroCopyTuple(); + + public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender, int volumeId) { this.ebId = ebId; this.partId = partId; this.appender = appender; this.pageSize = pageSize; + this.volumeId = volumeId; } @Override @@ -77,25 +83,26 @@ public void init() throws IOException { * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. * After writing if a current page exceeds pageSize, pageOffset will be added. * @param taskId - * @param tuples + * @param rowBlockReader * @return written bytes * @throws java.io.IOException */ - public int addTuples(TaskAttemptId taskId, List tuples) throws IOException { + public int writeRowBlock(TaskAttemptId taskId, RowBlockReader rowBlockReader) throws IOException { synchronized(appender) { if (closed.get()) { return 0; } long currentPos = appender.getOffset(); - for (Tuple eachTuple: tuples) { - appender.addTuple(eachTuple); + while (rowBlockReader.next(zeroCopyTuple)){ + appender.addTuple(zeroCopyTuple); } + long posAfterWritten = appender.getOffset(); int writtenBytes = (int)(posAfterWritten - currentPos); - int nextRowNum = rowNumInPage + tuples.size(); + int nextRowNum = rowNumInPage + rowBlockReader.rows(); List>> taskIndexes = taskTupleIndexes.get(taskId); if (taskIndexes == null) { taskIndexes = new ArrayList>>(); @@ -110,7 +117,7 @@ public int addTuples(TaskAttemptId taskId, List tuples) throws IOExceptio rowNumInPage = 0; } - totalRows += tuples.size(); + totalRows += rowBlockReader.rows(); return writtenBytes; } } @@ -206,4 +213,8 @@ public List>> getMergedTupleIndexes() { public void taskFinished(TaskAttemptId taskId) { taskTupleIndexes.remove(taskId); } + + public int getVolumeId() { + return volumeId; + } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 74190bc873..f0834f4338 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -18,6 +18,9 @@ package org.apache.tajo.storage; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -30,6 +33,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; import org.apache.tajo.util.Pair; import java.io.IOException; @@ -37,13 +41,15 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; public class HashShuffleAppenderManager { private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); - private Map> appenderMap = - new ConcurrentHashMap>(); + private ConcurrentMap> appenderMap = Maps.newConcurrentMap(); + private ConcurrentMap executors = Maps.newConcurrentMap(); // parallel writing + private List temporalPaths = Lists.newArrayList(); + private TajoConf systemConf; private FileSystem defaultFS; private FileSystem localFS; @@ -60,6 +66,26 @@ public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); localFS = FileSystem.getLocal(systemConf); pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + + Iterable allLocalPath = lDirAllocator.getAllLocalPathsToRead(".", systemConf); + int parallelExecution = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_PARALLEL_EXECUTION_NUM_PER_DISK); + for (Path path : allLocalPath) { + temporalPaths.add(localFS.makeQualified(path).toString()); + //concurrency control of root path + executors.put(temporalPaths.size() - 1, Executors.newFixedThreadPool(parallelExecution)); + } + } + + protected int getVolumeId(Path path) { + int i = 0; + for (String rootPath : temporalPaths) { + if (path.toString().startsWith(rootPath)) { + break; + } + i++; + } + Preconditions.checkPositionIndex(i, temporalPaths.size() - 1); + return i; } public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, @@ -92,7 +118,8 @@ public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, partitionAppenderMeta = new PartitionAppenderMeta(); partitionAppenderMeta.partId = partId; partitionAppenderMeta.dataFile = dataFile; - partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender = + new HashShuffleAppender(ebId, partId, pageSize, appender, getVolumeId(dataFile)); partitionAppenderMeta.appender.init(); partitionAppenderMap.put(partId, partitionAppenderMeta); @@ -168,6 +195,29 @@ public void finalizeTask(TaskAttemptId taskId) { } } + /** + * Asynchronously write partitions. + */ + public Future writePartitions(final HashShuffleAppender appender, final TaskAttemptId taskId, + final OffHeapRowBlock rowBlock, final boolean releaseBlock) { + ExecutorService executor = executors.get(appender.getVolumeId()); + return executor.submit(new Callable() { + @Override + public Integer call() throws Exception { + int bytes = appender.writeRowBlock(taskId, rowBlock.getReader()); + rowBlock.clear(); + if (releaseBlock) rowBlock.release(); + return bytes; + } + }); + } + + public void shutdown() { + for (ExecutorService service : executors.values()) { + service.shutdown(); + } + } + public static class HashShuffleIntermediate { private int partId; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 5213ba0502..2be312c9eb 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -48,7 +48,8 @@ public class RawFile { private static final Log LOG = LogFactory.getLog(RawFile.class); public static final String READ_BUFFER_SIZE = "tajo.storage.raw.io.read-buffer.bytes"; public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes"; - public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB; + public static final int DEFAULT_READ_BUFFER_SIZE = 128 * StorageUnit.KB; + public static final int DEFAULT_WRITE_BUFFER_SIZE = 64 * StorageUnit.KB; public static class RawFileScanner extends FileScanner implements SeekableScanner { private FileChannel channel; @@ -95,7 +96,7 @@ public void init() throws IOException { + ", fragment length :" + fragment.getLength()); } - buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); + buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_READ_BUFFER_SIZE)); buffer = buf.nioBuffer(0, buf.capacity()); columnTypes = new DataType[schema.size()]; @@ -341,9 +342,7 @@ public Tuple next() throws IOException { } case INET4 : - byte [] ipv4Bytes = new byte[4]; - buffer.get(ipv4Bytes); - tuple.put(i, DatumFactory.createInet4(ipv4Bytes)); + tuple.put(i, DatumFactory.createInet4(buffer.getInt())); break; case DATE: { @@ -494,7 +493,7 @@ public void init() throws IOException { columnTypes[i] = schema.getColumn(i).getDataType(); } - buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); + buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_WRITE_BUFFER_SIZE)); buffer = buf.nioBuffer(0, buf.capacity()); // comput the number of bytes, representing the null flags @@ -718,7 +717,7 @@ public void addTuple(Tuple t) throws IOException { } case INET4 : - buffer.put(t.getBytes(i)); + buffer.putInt(t.getInt4(i)); break; default: diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index adddf667a1..0b25c4af4d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -194,7 +194,7 @@ tajo.storage.raw.io.write-buffer.bytes - 131072 - 128KB write buffer + 65536 + 64KB write buffer From aa1d218040c5233ace675b2b6ed1241af1965df1 Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 5 Jan 2015 03:48:29 +0900 Subject: [PATCH 2/4] cleanup sync codes --- .../java/org/apache/tajo/worker/ExecutionBlockContext.java | 5 ++--- .../src/main/java/org/apache/tajo/storage/BufferPool.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index dd3ee687cc..564cc66855 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -271,7 +271,7 @@ public TajoWorker.WorkerContext getWorkerContext(){ return manager.getWorkerContext(); } - protected ClientSocketChannelFactory getShuffleChannelFactory(){ + protected synchronized ClientSocketChannelFactory getShuffleChannelFactory(){ if(channelFactory == null) { int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM); channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum); @@ -283,9 +283,8 @@ public Timer getRPCTimer() { return manager.getRPCTimer(); } - protected void releaseShuffleChannelFactory(){ + protected synchronized void releaseShuffleChannelFactory(){ if(channelFactory != null) { - channelFactory.shutdown(); channelFactory.releaseExternalResources(); channelFactory = null; } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 85c79fa17e..e4f9072cc0 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -44,7 +44,7 @@ public static long maxDirectMemory() { } - public synchronized static ByteBuf directBuffer(int size) { + public static ByteBuf directBuffer(int size) { return allocator.directBuffer(size); } From 937f1a8f23e86c3dcea002692a5bf3f1d25257f6 Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 5 Jan 2015 15:55:49 +0900 Subject: [PATCH 3/4] cleanup slow codes --- .../org/apache/tajo/tuple/offheap/UnSafeTuple.java | 10 ++-------- .../apache/tajo/storage/text/DelimitedTextFile.java | 5 +++-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index b742e6d8e8..8d9b69d151 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -160,7 +160,7 @@ public Datum get(int fieldId) { case FLOAT8: return DatumFactory.createFloat8(getFloat8(fieldId)); case TEXT: - return DatumFactory.createText(getText(fieldId)); + return DatumFactory.createText(getBytes(fieldId)); case TIMESTAMP: return DatumFactory.createTimestamp(getInt8(fieldId)); case DATE: @@ -241,13 +241,7 @@ public double getFloat8(int fieldId) { @Override public String getText(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return new String(bytes); + return new String(getBytes(fieldId), Charset.forName("UTF-8")); } public IntervalDatum getInterval(int fieldId) { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 15db4c335a..d8d4d4d793 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -344,6 +344,8 @@ public TextLineSerDe getLineSerde() { @Override public float getProgress() { + if(reader == null) return 0.0f; + try { if (!reader.isReadable()) { return 1.0f; @@ -356,7 +358,7 @@ public float getProgress() { long remainingBytes = Math.max(endOffset - filePos, 0); return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); } - } catch (IOException e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); return 0.0f; } @@ -448,7 +450,6 @@ public void close() throws IOException { } } finally { IOUtils.cleanup(LOG, reader); - reader = null; } } From fc82244349037f7eb98078dc65ea771ade48002a Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 9 Jan 2015 14:06:39 +0900 Subject: [PATCH 4/4] fix divide by zero --- .../engine/planner/physical/HashShuffleFileWriteExec.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 1587e68413..1803ec5b15 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -81,7 +81,11 @@ public HashShuffleFileWriteExec(TaskAttemptContext context, this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs); this.hashShuffleAppenderManager = context.getHashShuffleAppenderManager(); this.maxBufferSize = context.getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_BUFFER_SIZE); - this.initialRowBufferSize = Math.max(maxBufferSize / numShuffleOutputs, 64 * StorageUnit.KB); + if(numShuffleOutputs > 0){ + this.initialRowBufferSize = Math.max(maxBufferSize / numShuffleOutputs, 64 * StorageUnit.KB); + } else { + this.initialRowBufferSize = 64 * StorageUnit.KB; + } } @Override