Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to handle connection reset errors for GZIPInputStream #16606

Closed

Conversation

Akshat-Jain
Copy link
Contributor

Description

This PR adds retries to handle connection reset errors during GZIPInputStream. We were running into the following error if the connection resets in the middle of ingesting a gz file, causing the ingestion task to fail:

java.lang.RuntimeException: java.lang.IllegalStateException: java.io.EOFException: Unexpected end of ZLIB input stream
	at org.apache.druid.java.util.common.Either.valueOrThrow(Either.java:95)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:259)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.run(FrameProcessorExecutor.java:138)
	at org.apache.druid.msq.exec.WorkerImpl$1$2.run(WorkerImpl.java:840)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:259)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: java.io.EOFException: Unexpected end of ZLIB input stream
	at org.apache.druid.data.input.impl.FastLineIterator.readNextLine(FastLineIterator.java:124)
	at org.apache.druid.data.input.impl.FastLineIterator.hasNext(FastLineIterator.java:91)
	at org.apache.druid.data.input.TextReader$1.hasNext(TextReader.java:80)
	at org.apache.druid.data.input.IntermediateRowParsingReader$1.hasNext(IntermediateRowParsingReader.java:66)
	at org.apache.druid.java.util.common.parsers.CloseableIterator$2.findNextIteratorIfNecessary(CloseableIterator.java:72)
	at org.apache.druid.java.util.common.parsers.CloseableIterator$2.hasNext(CloseableIterator.java:93)
	at org.apache.druid.java.util.common.parsers.CloseableIterator$1.hasNext(CloseableIterator.java:42)
	at org.apache.druid.msq.input.external.ExternalSegment$1$1.hasNext(ExternalSegment.java:95)
	at org.apache.druid.java.util.common.guava.BaseSequence$1.next(BaseSequence.java:115)
	at org.apache.druid.segment.RowWalker.advance(RowWalker.java:75)
	at org.apache.druid.segment.RowBasedCursor.advanceUninterruptibly(RowBasedCursor.java:110)
	at org.apache.druid.segment.RowBasedCursor.advance(RowBasedCursor.java:103)
	at org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor.populateFrameWriterAndFlushIfNeeded(ScanQueryFrameProcessor.java:342)
	at org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor.populateFrameWriterAndFlushIfNeededWithExceptionHandling(ScanQueryFrameProcessor.java:309)
	at org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor.runWithSegment(ScanQueryFrameProcessor.java:248)
	at org.apache.druid.msq.querykit.BaseLeafFrameProcessor.runIncrementally(BaseLeafFrameProcessor.java:89)
	at org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor.runIncrementally(ScanQueryFrameProcessor.java:156)
	at org.apache.druid.frame.processor.FrameProcessors$1FrameProcessorWithBaggage.runIncrementally(FrameProcessors.java:75)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:230)
	... 8 more
Caused by: java.io.EOFException: Unexpected end of ZLIB input stream
	at java.base/java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:245)
	at java.base/java.util.zip.InflaterInputStream.read(InflaterInputStream.java:159)
	at java.base/java.util.zip.GZIPInputStream.read(GZIPInputStream.java:118)
	at org.apache.druid.data.input.BytesCountingInputEntity$BytesCountingInputStream.read(BytesCountingInputEntity.java:108)
	at java.base/java.io.FilterInputStream.read(FilterInputStream.java:107)
	at org.apache.druid.data.input.impl.FastLineIterator.readNextLine(FastLineIterator.java:120)
	... 26 more

This PR wraps the GZIPInputStream in a retry wrapper using a new class RetryingGZIPInputStream that handles such connection failures, and tries to continue the operation using retries.

Test plan

  1. I setup a simple http server using Python’s utility: python -m http.server 9333 - I had wikipedia.json.gz in the same directory for the server to serve.
  2. I triggered an MSQ ingestion using HTTP source as http://localhost:9333/wikipedia.json.gz
  3. I exited the Python process immediately after starting the ingestion.
  4. I verified that the task was failing at this point prior to this PR's changes. But with this PR's changes, retry mechanism kicks in.
  5. I re-ran the Python server in the middle of the ongoing retries.
  6. Validated that the next retry iteration was able to continue and take the task to successful completion.
  7. Verified that the number of rows ingested was correct. Also tried querying the data, works fine.
Key changed/added classes in this PR
  • RetryingGZIPInputStream: Class to facilitate retries over a GZIPInputStream.
  • RetryingInputStreamUtils: A utils class to extract common functionality between RetryingGZIPInputStream and RetryingInputStream.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@LakshSingla
Copy link
Contributor

This error doesn't seem limited to the GZIPInputStream. What will happen if you retry the experiment, but not with a .gz file? Do we still retry in that case?

@Akshat-Jain
Copy link
Contributor Author

@LakshSingla I haven't tried it out. But this particular error Unexpected end of ZLIB input stream is for gz file only. The problem was that we were wrapping the RetryingInputStream into a FilterInputStream and then creating a GZIPInputStream using the FilterInputStream (inside the gzipInputStream() method). But that doesn't happen for most of the other XyzInputStream in the way we are creating them, so they aren't the same.

public static InputStream decompress(final InputStream in, final String fileName) throws IOException
  {
    if (fileName.endsWith(Format.GZ.getSuffix())) {
      return gzipInputStream(in);
    } else if (fileName.endsWith(Format.BZ2.getSuffix())) {
      return new BZip2CompressorInputStream(in, true);
    } else if (fileName.endsWith(Format.XZ.getSuffix())) {
      return new XZCompressorInputStream(in, true);
    } else if (fileName.endsWith(Format.SNAPPY.getSuffix())) {
      return new FramedSnappyCompressorInputStream(in);
    .
    .
    .
}

@LakshSingla
Copy link
Contributor

But this particular error Unexpected end of ZLIB input stream is for gz file only

I don't think we need to solve for the error, but we are adding the missing retries. The error that the input stream throws when the server is disconnected is irrelevant as long as we retry it. Hence I was curious if the other streams retry normally if the server is disconnected.

. The problem was that we were wrapping the RetryingInputStream into a FilterInputStream and then creating a GZIPInputStream

Does it mean that when the server is disconnected, the inner input stream is working fine (and returning partial data), however the wrapping GZ stream is throwing because the data is incomplete? If so, why is the inner retrying stream not throwing an error and retrying on server disconnection?

But that doesn't happen for most of the other XyzInputStream in the way we are creating them, so they aren't the same.

Where are these other streams getting created?

@Akshat-Jain
Copy link
Contributor Author

Does it mean that when the server is disconnected, the inner input stream is working fine (and returning partial data), however the wrapping GZ stream is throwing because the data is incomplete? If so, why is the inner retrying stream not throwing an error and retrying on server disconnection?

No, the inner stream stops working. But since the inner stream isn't RetryingInputStream but rather a wrapper over RetryingInputStream, it loses the retries where we create a new input stream and start using that.

Previously: GZIPInputStream(FilterInputStream(RetryingInputStream))
Now: RetryingGZIPInputStream(GZIPInputStream(FilterInputStream(RetryingInputStream)))

Where are these other streams getting created?

In the code that I added above (return new BZip2CompressorInputStream(in, true);, return new XZCompressorInputStream(in, true);, etc). Link to the method in the code:

public static InputStream decompress(final InputStream in, final String fileName) throws IOException

@cryptoe
Copy link
Contributor

cryptoe commented Jun 27, 2024

Linking the root cause : openjdk/jdk#19909
I think we should create a druid only wrapper for this ?

@Akshat-Jain
Copy link
Contributor Author

@cryptoe

I think we should create a druid only wrapper for this ?

We don't use it directly, it gets used internally within some other JDK layer.

@Akshat-Jain
Copy link
Contributor Author

This will get fixed in the newer JDK versions. The JDK fix linked above by Karan will be part of JDK 24.

We can probably wrap a bunch of input stream layers with our custom version but it would be very hard to maintain. Same thing applies for S3 input source.

Since this is a transient issue, and the fix would come out of JDK 24, I'm closing this PR.

@Akshat-Jain Akshat-Jain closed this Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants