Skip to content

Commit

Permalink
reveiw comment fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Sep 18, 2018
1 parent 8310516 commit 0fa9545
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
Expand Up @@ -473,10 +473,11 @@ public IntermediateSortTempRow readRowFromMemoryWithNoSortFieldConvert(Object ba
* @param baseObject base object of the memory block
* @param address base address of the row
* @param outputStream output stream
* @param unsafeTotalLength
* @throws IOException if error occurs while writing to stream
*/
public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject,
long address, DataOutputStream outputStream, long unsafeRemainingLength)
public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject, long address,
DataOutputStream outputStream, long unsafeRemainingLength, long unsafeTotalLength)
throws IOException, MemoryException {
int size = 0;

Expand All @@ -490,7 +491,7 @@ public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObje
for (int idx = 0; idx < noDictSortDimCnt; idx++) {
short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, length);
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, length, unsafeTotalLength);
byte[] bytes = new byte[length];
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
Expand All @@ -503,7 +504,7 @@ public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObje
// packed no-sort & measure
int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
size += 4;
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, len);
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, len, unsafeTotalLength);
byte[] noSortDimsAndMeasures = new byte[len];
CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
Expand All @@ -523,15 +524,16 @@ public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObje
* @param row raw row
* @param baseObject base object of the memory block
* @param address base address for the row
* @param unsafeTotalLength
* @return number of bytes written to memory
*/
public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Object baseObject,
long address, ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream,
long unsafeRemainingLength) throws MemoryException, IOException {
long unsafeRemainingLength, long unsafeTotalLength) throws MemoryException, IOException {
int size = 0;
// write dict & sort
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4);
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4, unsafeTotalLength);
CarbonUnsafe.getUnsafe()
.putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]);
size += 4;
Expand All @@ -540,7 +542,8 @@ public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Obje
// write no-dict & sort
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 2 + bytes.length);
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 2 + bytes.length,
unsafeTotalLength);
CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
size += 2;
CarbonUnsafe.getUnsafe()
Expand All @@ -554,7 +557,7 @@ public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Obje
packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
int packSize = reUsableByteArrayDataOutputStream.getSize();

validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4 + packSize);
validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4 + packSize, unsafeTotalLength);
// write no-sort
CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
size += 4;
Expand All @@ -564,11 +567,13 @@ public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Obje
return size;
}

private void validateUnsafeMemoryBlockSizeLimit(long unsafeRemainingLength, int requestedSize)
throws MemoryException {
if (unsafeRemainingLength <= requestedSize) {
private void validateUnsafeMemoryBlockSizeLimit(long unsafeRemainingLength, int requestedSize,
long unsafeTotalLength) throws MemoryException {
if (unsafeTotalLength <= requestedSize) {
throw new MemoryException(
"not enough unsafe memory for sort: increase the 'offheap.sort.chunk.size.inmb' ");
} else if (unsafeRemainingLength <= requestedSize) {
throw new MemoryException("cannot handle this row. create new page");
}
}

Expand Down
Expand Up @@ -86,7 +86,7 @@ private int addRow(Object[] row, long address,
throws MemoryException, IOException {
return sortStepRowHandler
.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, dataBlock.getBaseObject(), address,
reUsableByteArrayDataOutputStream, dataBlock.size() - lastSize);
reUsableByteArrayDataOutputStream, dataBlock.size() - lastSize, dataBlock.size());
}

/**
Expand All @@ -112,7 +112,7 @@ public IntermediateSortTempRow getRow(long address) {
*/
public void writeRow(long address, DataOutputStream stream) throws IOException, MemoryException {
sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
dataBlock.getBaseObject(), address, stream, dataBlock.size() - lastSize);
dataBlock.getBaseObject(), address, stream, dataBlock.size() - lastSize, dataBlock.size());
}

public void freeMemory() {
Expand Down Expand Up @@ -162,4 +162,8 @@ public enum MemoryManagerType {
public void setReadConvertedNoSortField() {
this.convertNoSortFields = true;
}

public void makeCanAddFail() {
this.lastSize = (int) sizeToBeUsed;
}
}
Expand Up @@ -194,9 +194,7 @@ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGrou
}
for (int i = 0; i < size; i++) {
try {
if (rowPage.canAdd()) {
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
} else {
if (!rowPage.canAdd()) {
handlePreviousPage();
try {
rowPage = createUnsafeRowPage();
Expand All @@ -207,12 +205,19 @@ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGrou
"exception occurred while trying to acquire a semaphore lock: " + ex.getMessage());
throw new CarbonSortKeyAndGroupByException(ex);
}
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
}
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
} catch (Exception e) {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
throw new CarbonSortKeyAndGroupByException(e);
if (e.getMessage().contains("cannot handle this row. create new page"))
{
rowPage.makeCanAddFail();
// so that same rowBatch will be handled again in new page
i--;
} else {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
throw new CarbonSortKeyAndGroupByException(e);
}
}
}
}
Expand All @@ -227,10 +232,7 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
try {
// if record holder list size is equal to sort buffer size then it will
// sort the list and then write current list data to file
if (rowPage.canAdd()) {
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
} else {

if (!rowPage.canAdd()) {
handlePreviousPage();
try {
rowPage = createUnsafeRowPage();
Expand All @@ -240,12 +242,18 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
"exception occurred while trying to acquire a semaphore lock: " + ex.getMessage());
throw new CarbonSortKeyAndGroupByException(ex);
}
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
}
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
} catch (Exception e) {
LOGGER.error(
if (e.getMessage().contains("cannot handle this row. create new page"))
{
rowPage.makeCanAddFail();
addRow(row);
} else {
LOGGER.error(
"exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
throw new CarbonSortKeyAndGroupByException(e);
throw new CarbonSortKeyAndGroupByException(e);
}
}
}

Expand Down

0 comments on commit 0fa9545

Please sign in to comment.