Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast #29558

Closed
wants to merge 3 commits into from

Conversation

LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Aug 27, 2020

What changes were proposed in this pull request?

In TorrentBroadcast.scala

L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true))

After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following blockifyObject()(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast.

Why are the changes needed?

We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak.

2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manually test. This UT is hard to write and the patch is straightforward.

@Ngone51
Copy link
Member

Ngone51 commented Aug 27, 2020

Why the application doesn't fail when the exception raised in TorrentBroadcast.writeBlocks()? Is it because of the thrift-server?

@SparkQA
Copy link

SparkQA commented Aug 27, 2020

Test build #127954 has finished for PR 29558 at commit f9314ae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Aug 28, 2020

Why the application doesn't fail when the exception raised in TorrentBroadcast.writeBlocks()? Is it because of the thrift-server?

Yes. I think in many cases, this exception won't fail the whole application. In our case, it happened in executing a query which included a BroadcastExchangeExec, it broadcasts a relation. It failed the query execution.

@LantaoJin
Copy link
Contributor Author

Ping @cloud-fan @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @LantaoJin . According to SPARK-32715 JIRA description, is this 3.1.0-only bug?

} catch {
case t: Throwable =>
logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
blockManager.removeBroadcast(id, tellMaster = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? What happens if this is thrown at line 135, @LantaoJin ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can begin to Try..Catch from L137 if the L134 is atomic. But I think L157: blockManager.removeBroadcast(id, tellMaster = true) is no harmful even exception throws from L135. Please correct me if I understand incorrectly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it has the thread-safe issue since blockManager is an IsolatedRpcEndpoint?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @dongjoon-hyun - blockManager.putSingle for broadcastid should be outside try/catch. Rest look fine.

@LantaoJin
Copy link
Contributor Author

Thank you for pinging me, @LantaoJin . According to SPARK-32715 JIRA description, is this 3.1.0-only bug?

No. I just use the latest version. I will update that jira page.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a unit test?

} catch {
case t: Throwable =>
logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
blockManager.removeBroadcast(id, tellMaster = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it has the thread-safe issue since blockManager is an IsolatedRpcEndpoint?

@Ngone51
Copy link
Member

Ngone51 commented Sep 3, 2020

cc @tgravescs @jiangxb1987

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Sep 14, 2020

Would it has the thread-safe issue since blockManager is an IsolatedRpcEndpoint?

removeBroadcast() should be thread-safe

@SparkQA
Copy link

SparkQA commented Sep 14, 2020

Test build #128649 has finished for PR 29558 at commit b565c33.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

retest this please

@dongjoon-hyun
Copy link
Member

Thank you for update, @LantaoJin .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @LantaoJin and @mridulm and @Ngone51 .
Merged to master/3.0/2.4.

dongjoon-hyun pushed a commit that referenced this pull request Sep 15, 2020
…oadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 7a9b066)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Sep 15, 2020
…oadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes #29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 7a9b066)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128684 has finished for PR 29558 at commit b565c33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…oadcast

### What changes were proposed in this pull request?
In TorrentBroadcast.scala
```scala
L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false))
L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true))
```
After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory.

This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast.

### Why are the changes needed?
We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak.

> 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
116G->112G(170G), 184.9121920 secs]
[Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
1: 676531691 72035438432 [B
2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
3: 99551 12018117568 [Ljava.lang.Object;
4: 26570 4349629040 [I
5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
6: 1708819 256299456 [C
7: 2338 179615208 [J
8: 1703669 54517408 java.lang.String
9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
10: 177396 25545024 java.net.URI
...

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test. This UT is hard to write and the patch is straightforward.

Closes apache#29558 from LantaoJin/SPARK-32715.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 7a9b066)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants