Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TarReaderStage only pull when needed, #2714 #2715

Merged
merged 1 commit into from Aug 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,7 +42,7 @@ private[file] class TarReaderStage
if (buffer.length >= TarArchiveEntry.headerLength) {
readFile(buffer)
} else {
if (!hasBeenPulled(flowIn)) pull(flowIn)
tryPullIfNeeded()
setHandlers(flowIn, flowOut, new CollectHeader(buffer))
}
}
Expand All @@ -51,7 +51,7 @@ private[file] class TarReaderStage
def pushSource(metadata: TarArchiveMetadata, buffer: ByteString): Unit = {
if (buffer.length >= metadata.size) {
val (emit, remain) = buffer.splitAt(metadata.size.toInt)
log.debug(s"emitting completed source for $metadata")
log.debug("emitting completed source for [{}]", metadata)
push(flowOut, metadata -> Source.single(emit))
readTrailer(metadata, remain, subSource = None)
} else setHandlers(flowIn, flowOut, new CollectFile(metadata, buffer))
Expand Down Expand Up @@ -117,13 +117,18 @@ private[file] class TarReaderStage
case WarnTermination =>
log.warning(
"The tar content source was not subscribed to within {}, it must be subscribed to to progress tar file reading.",
timeout
timeout.toCoarsest
)
case NoopTermination =>
}
}
}

private def tryPullIfNeeded(): Unit = {
if (!hasBeenPulled(flowIn))
tryPull(flowIn)
}

/**
* Don't react on downstream pulls until we have something to push.
*/
Expand All @@ -137,7 +142,7 @@ private[file] class TarReaderStage
*/
private trait ExpectDownstreamPull extends OutHandler {
final override def onPull(): Unit = {
pull(flowIn)
tryPullIfNeeded()
setHandler(flowOut, IgnoreDownstreamPull)
}
}
Expand All @@ -152,7 +157,9 @@ private[file] class TarReaderStage
buffer ++= grab(flowIn)
if (buffer.length >= TarArchiveEntry.headerLength) {
readFile(buffer)
} else pull(flowIn)
} else {
tryPullIfNeeded()
}
}

override def onUpstreamFinish(): Unit = {
Expand All @@ -173,7 +180,6 @@ private[file] class TarReaderStage
extends InHandler
with IgnoreDownstreamPull {
private var emitted: Long = 0
private var flowInPulled = false

private val subSource: FileOutSubSource = {
val sub = new FileOutSubSource()
Expand All @@ -185,9 +191,8 @@ private[file] class TarReaderStage
subPush(buffer)
buffer = ByteString.empty
if (isClosed(flowIn)) onUpstreamFinish()
} else if (!flowInPulled) {
flowInPulled = true
pull(flowIn)
} else {
tryPullIfNeeded()
}
}
})
Expand All @@ -196,7 +201,7 @@ private[file] class TarReaderStage
sub
}

log.debug(s"emitting source for $metadata")
log.debug("emitting source for [{}]", metadata)
push(flowOut, metadata -> Source.fromGraph(subSource.source))
setHandler(flowOut, IgnoreDownstreamPull)

Expand All @@ -213,7 +218,6 @@ private[file] class TarReaderStage
}

override def onPush(): Unit = {
flowInPulled = false
subPush(grab(flowIn))
}

Expand Down Expand Up @@ -246,10 +250,13 @@ private[file] class TarReaderStage
subSource.foreach { src =>
src.complete()
setHandler(flowOut, ExpectDownstreamPull)
if (isAvailable(flowOut)) pull(flowIn)
if (isAvailable(flowOut))
tryPullIfNeeded()
}
readHeader(buffer.drop(trailerLength))
} else pull(flowIn)
} else {
tryPullIfNeeded()
}
}

override def onUpstreamFinish(): Unit = {
Expand All @@ -270,10 +277,10 @@ private[file] class TarReaderStage

override def onPush(): Unit = {
grab(flowIn)
pull(flowIn)
tryPullIfNeeded()
}

tryPull(flowIn)
tryPullIfNeeded()
}

}
Expand Down