Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchHelper can hang due to underlying network issues #695

Closed
kristopherkane opened this issue Jan 25, 2022 · 3 comments · Fixed by #687
Closed

BatchHelper can hang due to underlying network issues #695

kristopherkane opened this issue Jan 25, 2022 · 3 comments · Fixed by #687

Comments

@kristopherkane
Copy link

kristopherkane commented Jan 25, 2022

We recently observed Apache Spark 3.1 jobs hang on write using connector 2.2.4 on Dataproc 2.0. Here is a thread dump of a 'stuck' executor which will hang forever:

Search:
Thread ID	Thread Name	Thread State	Thread Locks
32	Executor task launch worker for task 81.0 in stage 34.0 (TID 2163)	WAITING	Lock(java.util.concurrent.locks.ReentrantLock$NonfairSync@1551696668}), Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1731738234})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
java.util.concurrent.FutureTask.get(FutureTask.java:191)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFromFuture(GoogleCloudStorageFileSystem.java:892)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.awaitRequestsCompletion(BatchHelper.java:266)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.flushIfPossible(BatchHelper.java:206)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.flush(BatchHelper.java:237)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfos(GoogleCloudStorageImpl.java:1891)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.checkNoFilesConflictingWithDirs(GoogleCloudStorageFileSystem.java:1224)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:277)
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:78)
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:70)
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:613)
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1126)
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1106)
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:995)
org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$1253/490190335.apply(Unknown Source)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:131)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$462/119145942.apply(Unknown Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

Here is the BatchHelper thread of the same dump:

67	gcsfs-batch-helper-0	RUNNABLE	Lock(java.util.concurrent.ThreadPoolExecutor$Worker@332153874}), Monitor(java.lang.Object@1934048116}), Monitor(sun.net.www.protocol.https.HttpsClient@751456677})
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
org.conscrypt.ConscryptEngineSocket$SSLInputStream.readFromSocket(ConscryptEngineSocket.java:920)
org.conscrypt.ConscryptEngineSocket$SSLInputStream.processDataFromSocket(ConscryptEngineSocket.java:884)
org.conscrypt.ConscryptEngineSocket$SSLInputStream.access$100(ConscryptEngineSocket.java:706)
org.conscrypt.ConscryptEngineSocket.doHandshake(ConscryptEngineSocket.java:230)
org.conscrypt.ConscryptEngineSocket.startHandshake(ConscryptEngineSocket.java:209) => holding Monitor(java.lang.Object@1934048116})
org.conscrypt.ConscryptEngineSocket.waitForHandshake(ConscryptEngineSocket.java:547)
org.conscrypt.ConscryptEngineSocket.getOutputStream(ConscryptEngineSocket.java:290)
sun.net.www.http.HttpClient.openServer(HttpClient.java:465)
sun.net.www.http.HttpClient.openServer(HttpClient.java:558) => holding Monitor(sun.net.www.protocol.https.HttpsClient@751456677})
sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:264)
sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:203)
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:189)
sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:151)
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.execute(BatchHelper.java:177)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.lambda$queue$0(BatchHelper.java:164)
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper$$Lambda$1330/894668572.call(Unknown Source)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

The FutureTask.get() is the BatchHelper's list of HttpRequest in GoogleCloudStorageImpl.getItemInfos(GoogleCloudStorageImpl.java:1891)

GoogleCloudStorageImpl uses RetryHttpInitializer which does not set a write timeout. The default HttpRequest write timeout is infinite.

I added a rather immature simulation of the hang in com.google.api.client.http.HttpRequest and the BatchHelper heap appears the same:

"main@1" prio=5 tid=0x1 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at sun.misc.Unsafe.park(Unsafe.java:-1)
	  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
	  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	  at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFromFuture(GoogleCloudStorageFileSystem.java:892)
	  at com.google.cloud.hadoop.gcsio.BatchHelper.awaitRequestsCompletion(BatchHelper.java:266)
	  at com.google.cloud.hadoop.gcsio.BatchHelper.flushIfPossible(BatchHelper.java:206)
	  at com.google.cloud.hadoop.gcsio.BatchHelper.flush(BatchHelper.java:237)
	  at com.google.cloud.hadoop.gcsio.BatchHelperTest.lockTest(BatchHelperTest.java:272)

Does it make sense to add a configurable write timeout to RetryHttpInitializer or would the operations not be idempotent? I admit that the write timeout didn't help my artificial test so I'm asking the question sooner to negate it as an option.

@medb
Copy link
Contributor

medb commented Jan 25, 2022

Write timeout works only for PUT and POST requests and implemented using per-write thread/executor that could have negative performance implications, that's why we didn't make use of it.

I think that more systemic approach in #687 would be a preferable solution to this issue.

@kristopherkane
Copy link
Author

Shall I close this or keep it linked to 687?

@medb
Copy link
Contributor

medb commented Jan 25, 2022

Let's keep it open and linked, as it has useful context for the issue that we are trying to fix. Thanks for the write up!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants