-
Notifications
You must be signed in to change notification settings - Fork 703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-3265] Fixed memory leak in Range Sort #3095
Conversation
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2398/ |
Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10657/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2624/ |
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo) | ||
carbonSessionInfo.getSessionParams.getAddedProps.asScala.map { | ||
f => CarbonProperties.getInstance().addProperty(f._1, f._2) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems that two pieces of code are the same, better to use a method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qiuchenjian Done
TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) | ||
val carbonSessionInfo: CarbonSessionInfo = { | ||
var info = ThreadLocalSessionInfo.getCarbonSessionInfo | ||
if (info == null || null == info.getSessionParams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if info != null ,no need to generate a new object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of code is not required, so I have removed it.
please modify the title to use range column instead of Range Partition |
@@ -127,9 +130,32 @@ object DataLoadProcessBuilderOnSpark { | |||
} | |||
|
|||
// 4. Write | |||
sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => | |||
sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { | |||
TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the changes related to ThreadLocalSessionInfo are not needed. we are already creating ThreadLocalSessionInfo in executor side if null.
Check org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark#writeFunc for reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kunal642 Okay..I have removed these changes
@@ -221,9 +247,32 @@ object DataLoadProcessBuilderOnSpark { | |||
.map(_._2) | |||
|
|||
// 4. Sort and Write data | |||
sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => | |||
sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { | |||
TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2425/ |
Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10683/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2652/ |
LGTM |
In range sort, unsafe memory was not getting cleared in case of task failure. So, added a fix for memory leak. This closes #3095
In range sort, unsafe memory was not getting cleared in case of task failure. So, added a fix for memory leak. This closes #3095
In range sort, unsafe memory was not getting cleared in case of task failure. So, added a fix for memory leak. This closes apache#3095
In range sort, unsafe memory was not getting cleared in case of task failure . So, added a fix for memory leak.
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
Any interfaces changed?
Any backward compatibility impacted?
Document update required?
Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.