Skip to content

Commit

Permalink
[qob] In GCS, recreate the ReadChannel if a transient error occurs
Browse files Browse the repository at this point in the history
CHANGELOG: Fix #13356 and fix #13409. In QoB pipelines with 10K or more partitions, transient "Corrupted block detected" errors were common. This was caused by incorrect retry logic. That logic has been fixed.

I now assume we cannot reuse a ReadChannel after any exception occurs during read. We also do not
assume that the ReadChannel "atomically", in some sense, modifies the ByteBuffer. In particular, if
we encounter any error, we blow away the ByteBuffer and restart our read entirely.
  • Loading branch information
danking committed Sep 27, 2023
1 parent de009fd commit 64c4c6e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 46 deletions.
3 changes: 0 additions & 3 deletions hail/src/main/scala/is/hail/io/fs/FS.scala
Expand Up @@ -192,9 +192,6 @@ abstract class FSSeekableInputStream extends InputStream with Seekable {
} else {
bb.clear()
bb.limit(0)
if (bb.remaining() != 0) {
assert(false, bb.remaining().toString())
}
physicalSeek(newPos)
}
pos = newPos
Expand Down
72 changes: 29 additions & 43 deletions hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala
Expand Up @@ -8,8 +8,8 @@ import com.google.cloud.storage.Storage.{BlobGetOption, BlobListOption, BlobWrit
import com.google.cloud.storage.{Blob, BlobId, BlobInfo, Storage, StorageException, StorageOptions}
import com.google.cloud.{ReadChannel, WriteChannel}
import is.hail.io.fs.FSUtil.dropTrailingSlash
import is.hail.services.retryTransientErrors
import is.hail.utils.fatal
import is.hail.services.{retryTransientErrors, isTransientError}
import is.hail.utils._
import org.apache.log4j.Logger

import java.io.{ByteArrayInputStream, FileNotFoundException, IOException}
Expand Down Expand Up @@ -137,39 +137,27 @@ class GoogleStorageFS(
}

private[this] def retryIfRequesterPays[T, U](
exc: Exception,
message: String,
code: Int,
exc: Throwable,
makeRequest: Seq[U] => T,
makeUserProjectOption: String => U,
bucket: String
): T = {
if (message == null) {
throw exc
}

val probablyNeedsRequesterPays = message.equals("userProjectMissing") || (code == 400 && message.contains("requester pays"))
if (!probablyNeedsRequesterPays) {
if (isRequesterPaysException(exc)) {
makeRequest(requesterPaysOptions(bucket, makeUserProjectOption))
} else {
throw exc
}

makeRequest(requesterPaysOptions(bucket, makeUserProjectOption))
}

def retryIfRequesterPays[T, U](
exc: Throwable,
makeRequest: Seq[U] => T,
makeUserProjectOption: String => U,
bucket: String
): T = exc match {
def isRequesterPaysException(exc: Throwable): Boolean = exc match {
case exc: IOException if exc.getCause() != null =>
retryIfRequesterPays(exc.getCause(), makeRequest, makeUserProjectOption, bucket)
isRequesterPaysException(exc.getCause())
case exc: StorageException =>
retryIfRequesterPays(exc, exc.getMessage(), exc.getCode(), makeRequest, makeUserProjectOption, bucket)
exc.getMessage != null && (exc.getMessage.equals("userProjectMissing") || (exc.getCode == 400 && exc.getMessage.contains("requester pays")))
case exc: GoogleJsonResponseException =>
retryIfRequesterPays(exc, exc.getMessage(), exc.getStatusCode(), makeRequest, makeUserProjectOption, bucket)
exc.getMessage != null && (exc.getMessage.equals("userProjectMissing") || (exc.getStatusCode == 400 && exc.getMessage.contains("requester pays")))
case exc: Throwable =>
throw exc
false
}

private[this] def handleRequesterPays[T, U](
Expand Down Expand Up @@ -213,30 +201,28 @@ class GoogleStorageFS(

val is: SeekableInputStream = new FSSeekableInputStream {
private[this] var reader: ReadChannel = null

private[this] def retryingRead(): Int = {
retryTransientErrors(
{ reader.read(bb) },
reset = Some({ () => reader.seek(getPosition) })
)
}
private[this] var options: Option[Seq[BlobSourceOption]] = None

private[this] def readHandlingRequesterPays(bb: ByteBuffer): Int = {
if (reader != null) {
retryingRead()
} else {
handleRequesterPays(
{ (options: Seq[BlobSourceOption]) =>
reader = retryTransientErrors {
storage.reader(url.bucket, url.path, options:_*)
}
while (true) {
try {
if (reader == null) {
val opts = options.getOrElse(FastSeq())
reader = storage.reader(url.bucket, url.path, opts:_*)
reader.seek(getPosition)
retryingRead()
},
BlobSourceOption.userProject,
url.bucket
)
}
return reader.read(bb)
} catch {
case exc: Exception if isRequesterPaysException(exc) && options.isEmpty =>
reader = null
bb.clear()
options = Some(requesterPaysOptions(url.bucket, BlobSourceOption.userProject))
case exc: Exception if isTransientError(exc) =>
reader = null
bb.clear()
}
}
throw new RuntimeException("unreachable")
}

override def close(): Unit = {
Expand Down

0 comments on commit 64c4c6e

Please sign in to comment.