Skip to content

Commit

Permalink
[CARBONDATA-2885] Broadcast Issue and Small file distribution Issue
Browse files Browse the repository at this point in the history
Issue :-

In External Table Carbon Relation sizeInByte is wrong (always 0) because of this Join Queries are identified for broadcast even
Table actual size is > 10MB( default broadcast).This is making fail some of the join table ( table which should select sortmergeJoin but
because of wrong calculation it gone for broadcast join)

if Merge_small_file task distribution is enabled ,Join queries are failed (TPCH).
carbon opens many carbon files but it not getting closed.

Root Cause :-
1. Current relation size calculation is based on tablestatus file but since
External Table does not have tablestatus file so always zero was returned.
2. if Merge_small_file task distribution is enabled carbon opens many carbon files but it not getting closed.
Solution :-

if Table is External Table then calculate size from TablePath .
close the carbon files for scan is finished.

This closes #2658
  • Loading branch information
BJangir authored and kumarvishal09 committed Aug 27, 2018
1 parent f81543e commit 1fb1f19
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ void updateDataBlockIterator() {

private DataBlockIterator getDataBlockIterator() {
if (blockExecutionInfos.size() > 0) {
try {
fileReader.finish();
} catch (IOException e) {
throw new RuntimeException(e);
}
BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
blockExecutionInfos.remove(executionInfo);
return new DataBlockIterator(executionInfo, fileReader, batchSize, queryStatisticsModel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,39 +156,48 @@ case class CarbonRelation(
private var sizeInBytesLocalValue = 0L

def sizeInBytes: Long = {
val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
carbonTable.getAbsoluteTableIdentifier)
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments.getValidSegments.isEmpty) {
sizeInBytesLocalValue = 0L
} else {
val tablePath = carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
// get the valid segments
val segments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments.getValidSegments.asScala
var size = 0L
// for each segment calculate the size
segments.foreach {validSeg =>
// for older store
if (null != validSeg.getLoadMetadataDetails.getDataSize &&
null != validSeg.getLoadMetadataDetails.getIndexSize) {
size = size + validSeg.getLoadMetadataDetails.getDataSize.toLong +
validSeg.getLoadMetadataDetails.getIndexSize.toLong
} else {
size = size + FileFactory.getDirectorySize(
CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))
if (carbonTable.isExternalTable) {
val tablePath = carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
}
} else {
val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
carbonTable.getAbsoluteTableIdentifier)
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments.getValidSegments.isEmpty) {
sizeInBytesLocalValue = 0L
} else {
val tablePath = carbonTable.getTablePath
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
// get the valid segments
val segments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments.getValidSegments.asScala
var size = 0L
// for each segment calculate the size
segments.foreach { validSeg =>
// for older store
if (null != validSeg.getLoadMetadataDetails.getDataSize &&
null != validSeg.getLoadMetadataDetails.getIndexSize) {
size = size + validSeg.getLoadMetadataDetails.getDataSize.toLong +
validSeg.getLoadMetadataDetails.getIndexSize.toLong
} else {
size = size + FileFactory.getDirectorySize(
CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))
}
}
// update the new table status time
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
// update the new size
sizeInBytesLocalValue = size
}
// update the new table status time
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
// update the new size
sizeInBytesLocalValue = size
}
}
}

sizeInBytesLocalValue
}

Expand Down

0 comments on commit 1fb1f19

Please sign in to comment.