Skip to content

Commit

Permalink
data output stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Sep 17, 2018
1 parent f6f4b4b commit 66f8d2b
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 65 deletions.
Expand Up @@ -201,8 +201,7 @@ public static MemoryBlock allocateMemoryWithRetry(long taskId, long size)
if (baseBlock == null) {
INSTANCE.printCurrentMemoryUsage();
throw new MemoryException(
"Not enough memory. total size:" + INSTANCE.totalMemory + " used : " + INSTANCE.memoryUsed
+ " requested: " + size + " .please increase carbon.unsafe.working.memory.in.mb");
"Not enough memory. please increase carbon.unsafe.working.memory.in.mb");
}
return baseBlock;
}
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.util;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;

/**
* wrapper around DataOutputStream. Which clears the buffer for reuse
*/
public final class ReUsableByteArrayDataOutputStream extends DataOutputStream {

private ByteArrayOutputStream outputStream;

public ReUsableByteArrayDataOutputStream(ByteArrayOutputStream outputStream) {
super(outputStream);
this.outputStream = outputStream;
}

public void resetByteArrayOutputStream() {
outputStream.reset();
}

public int getByteArrayOutputStreamSize() {
return outputStream.size();
}

public byte[] getByteArray() {
return outputStream.toByteArray();
}

}
Expand Up @@ -24,15 +24,14 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;

Expand Down Expand Up @@ -350,11 +349,12 @@ public void writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow s
*
* @param row raw row
* @param outputStream output stream
* @param rowBuffer array backend buffer
* @param reUsableByteArrayDataOutputStream DataOutputStream backend by ByteArrayOutputStream
* @throws IOException if error occurs while writing to stream
*/
public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException {
DataOutputStream outputStream,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException {
// write dict & sort
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
Expand All @@ -368,14 +368,13 @@ public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
}

// pack no-sort
rowBuffer.clear();
packNoSortFieldsToBytes(row, rowBuffer);
rowBuffer.flip();
int packSize = rowBuffer.limit();
reUsableByteArrayDataOutputStream.resetByteArrayOutputStream();
packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
int packSize = reUsableByteArrayDataOutputStream.getByteArrayOutputStreamSize();

// write no-sort
outputStream.writeInt(packSize);
outputStream.write(rowBuffer.array(), 0, packSize);
outputStream.write(reUsableByteArrayDataOutputStream.getByteArray(), 0, packSize);
}

/**
Expand Down Expand Up @@ -524,12 +523,11 @@ public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObje
* @param row raw row
* @param baseObject base object of the memory block
* @param address base address for the row
* @param rowBuffer array backend buffer
* @return number of bytes written to memory
*/
public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
Object baseObject, long address, ByteBuffer rowBuffer, long unsafeRemainingLength)
throws MemoryException {
public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Object baseObject,
long address, ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream,
long unsafeRemainingLength) throws MemoryException, IOException {
int size = 0;
// write dict & sort
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
Expand All @@ -552,18 +550,16 @@ public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
}

// convert pack no-sort
rowBuffer.clear();
packNoSortFieldsToBytes(row, rowBuffer);
rowBuffer.flip();
int packSize = rowBuffer.limit();
reUsableByteArrayDataOutputStream.resetByteArrayOutputStream();
packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
int packSize = reUsableByteArrayDataOutputStream.getByteArrayOutputStreamSize();

validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4 + packSize);
// write no-sort
CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
size += 4;
CarbonUnsafe.getUnsafe()
.copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
packSize);
CarbonUnsafe.getUnsafe().copyMemory(reUsableByteArrayDataOutputStream.getByteArray(),
CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, packSize);
size += packSize;
return size;
}
Expand All @@ -580,30 +576,32 @@ private void validateUnsafeMemoryBlockSizeLimit(long unsafeRemainingLength, int
* Pack to no-sort fields to byte array
*
* @param row raw row
* @param rowBuffer byte array backend buffer
* @param @param reUsableByteArrayDataOutputStream
* DataOutputStream backend by ByteArrayOutputStream
*/
private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
private void packNoSortFieldsToBytes(Object[] row,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException {
// convert dict & no-sort
for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]);
reUsableByteArrayDataOutputStream.writeInt((int) row[this.dictNoSortDimIdx[idx]]);
}
// convert no-dict & no-sort
for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
rowBuffer.putShort((short) bytes.length);
rowBuffer.put(bytes);
reUsableByteArrayDataOutputStream.writeShort((short) bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
}
// convert varchar dims
for (int idx = 0; idx < this.varcharDimCnt; idx++) {
byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]];
rowBuffer.putInt(bytes.length);
rowBuffer.put(bytes);
reUsableByteArrayDataOutputStream.writeInt(bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
}
// convert complex dims
for (int idx = 0; idx < this.complexDimCnt; idx++) {
byte[] bytes = (byte[]) row[this.complexDimIdx[idx]];
rowBuffer.putInt(bytes.length);
rowBuffer.put(bytes);
reUsableByteArrayDataOutputStream.writeInt(bytes.length);
reUsableByteArrayDataOutputStream.write(bytes);
}

// convert measure
Expand All @@ -613,28 +611,28 @@ private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
tmpValue = row[this.measureIdx[idx]];
tmpDataType = this.dataTypes[idx];
if (null == tmpValue) {
rowBuffer.put((byte) 0);
reUsableByteArrayDataOutputStream.write((byte) 0);
continue;
}
rowBuffer.put((byte) 1);
reUsableByteArrayDataOutputStream.write((byte) 1);
if (DataTypes.BOOLEAN == tmpDataType) {
if ((boolean) tmpValue) {
rowBuffer.put((byte) 1);
reUsableByteArrayDataOutputStream.write((byte) 1);
} else {
rowBuffer.put((byte) 0);
reUsableByteArrayDataOutputStream.write((byte) 0);
}
} else if (DataTypes.SHORT == tmpDataType) {
rowBuffer.putShort((Short) tmpValue);
reUsableByteArrayDataOutputStream.writeShort((Short) tmpValue);
} else if (DataTypes.INT == tmpDataType) {
rowBuffer.putInt((Integer) tmpValue);
reUsableByteArrayDataOutputStream.writeInt((Integer) tmpValue);
} else if (DataTypes.LONG == tmpDataType) {
rowBuffer.putLong((Long) tmpValue);
reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue);
} else if (DataTypes.DOUBLE == tmpDataType) {
rowBuffer.putDouble((Double) tmpValue);
reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue);
} else if (DataTypes.isDecimal(tmpDataType)) {
byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
rowBuffer.putShort((short) decimalBytes.length);
rowBuffer.put(decimalBytes);
reUsableByteArrayDataOutputStream.writeShort((short) decimalBytes.length);
reUsableByteArrayDataOutputStream.write(decimalBytes);
} else {
throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
}
Expand Down
Expand Up @@ -19,13 +19,13 @@

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.carbondata.core.memory.IntPointerBuffer;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
Expand Down Expand Up @@ -65,8 +65,10 @@ public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBloc
this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
}

public int addRow(Object[] row, ByteBuffer rowBuffer) throws MemoryException {
int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
public int addRow(Object[] row,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream)
throws MemoryException, IOException {
int size = addRow(row, dataBlock.getBaseOffset() + lastSize, reUsableByteArrayDataOutputStream);
buffer.set(lastSize);
lastSize = lastSize + size;
return size;
Expand All @@ -79,9 +81,12 @@ public int addRow(Object[] row, ByteBuffer rowBuffer) throws MemoryException {
* @param address
* @return
*/
private int addRow(Object[] row, long address, ByteBuffer rowBuffer) throws MemoryException {
return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
dataBlock.getBaseObject(), address, rowBuffer, dataBlock.size() - lastSize);
private int addRow(Object[] row, long address,
ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream)
throws MemoryException, IOException {
return sortStepRowHandler
.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, dataBlock.getBaseObject(), address,
reUsableByteArrayDataOutputStream, dataBlock.size() - lastSize);
}

/**
Expand Down
Expand Up @@ -21,7 +21,6 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -41,6 +40,7 @@
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
Expand All @@ -53,6 +53,7 @@
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;


public class UnsafeSortDataRows {
/**
* LOGGER
Expand All @@ -73,7 +74,7 @@ public class UnsafeSortDataRows {

private SortParameters parameters;
private TableFieldStat tableFieldStat;
private ThreadLocal<ByteBuffer> rowBuffer;
private ThreadLocal<ReUsableByteArrayDataOutputStream> reUsableByteArrayDataOutputStream;
private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;

private UnsafeCarbonRowPage rowPage;
Expand All @@ -100,11 +101,10 @@ public UnsafeSortDataRows(SortParameters parameters,
UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
this.parameters = parameters;
this.tableFieldStat = new TableFieldStat(parameters);
this.rowBuffer = new ThreadLocal<ByteBuffer>() {
@Override protected ByteBuffer initialValue() {
this.reUsableByteArrayDataOutputStream = new ThreadLocal<ReUsableByteArrayDataOutputStream>() {
@Override protected ReUsableByteArrayDataOutputStream initialValue() {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
// DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
return ByteBuffer.wrap(byteStream.toByteArray());
return new ReUsableByteArrayDataOutputStream(byteStream);
}
};
this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
Expand Down Expand Up @@ -197,7 +197,7 @@ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGrou
for (int i = 0; i < size; i++) {
try {
if (rowPage.canAdd()) {
bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
} else {
handlePreviousPage();
try {
Expand All @@ -209,7 +209,7 @@ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGrou
"exception occurred while trying to acquire a semaphore lock: " + ex.getMessage());
throw new CarbonSortKeyAndGroupByException(ex);
}
bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
}
} catch (Exception e) {
LOGGER.error(
Expand All @@ -230,7 +230,7 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
// if record holder list size is equal to sort buffer size then it will
// sort the list and then write current list data to file
if (rowPage.canAdd()) {
rowPage.addRow(row, rowBuffer.get());
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
} else {

handlePreviousPage();
Expand All @@ -242,10 +242,10 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
"exception occurred while trying to acquire a semaphore lock: " + ex.getMessage());
throw new CarbonSortKeyAndGroupByException(ex);
}
rowPage.addRow(row, rowBuffer.get());
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
}
} catch (Exception e) {
LOGGER.error(
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
throw new CarbonSortKeyAndGroupByException(e);
}
Expand Down

0 comments on commit 66f8d2b

Please sign in to comment.