Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock eventually occurs in JdkHttpClient for certain requests #200

Closed
Billzabob opened this issue Apr 1, 2020 · 44 comments · Fixed by #260
Closed

Deadlock eventually occurs in JdkHttpClient for certain requests #200

Billzabob opened this issue Apr 1, 2020 · 44 comments · Fixed by #260

Comments

@Billzabob
Copy link

I am currently sharing the Java 8 HTTP client to create both the WebSocket client and the HTTP client. I make a single request with the HTTP client followed by connecting to a WebSocket and sending messages periodically. Eventually (always around 400 seconds later), everything hangs and CPU usage is locked at 100%.

This doesn't occur if:

  • I create a separate client and WebSocket client using .simple[IO]
  • I don't make the initial request with the client
  • I request other URLs with the client

Since it doesn't happen with every URL, I'm assuming it has something to do with the endpoint I'm hitting. It gives the following response:

Response(
  status=200,
  headers=Headers(
    cf-cache-status: DYNAMIC,
    cf-ray: 57d4a1eb4eaaa062-SLC,
    connection: keep-alive,
    content-type: application/json,
    date: Wed, 01 Apr 2020 19:14:20 GMT,
    expect-ct: max-age=604800,
    report-uri="https://report-uri.cloudflare.com/cdn-cgi/beacon/expect-ct",
    server: cloudflare,
    set-cookie: <REDACTED>,
    set-cookie: <REDACTED>,
    strict-transport-security: max-age=31536000; includeSubDomains,
    transfer-encoding: chunked,
    via: 1.1 google,
    x-envoy-upstream-service-time: 26,
    x-ratelimit-bucket: 41f9cd5d28af77da04563bcb1d67fdfd,
    x-ratelimit-limit: 2,
    x-ratelimit-remaining: 1,
    x-ratelimit-reset: 1585768466,
    x-ratelimit-reset-after: 5
  )
)

{
  "url": "wss://gateway.discord.gg",
  "shards": 1,
  "session_start_limit": {
    "total": 1000,
    "remaining": 992,
    "reset_after": 23572680
  }
}

I tried to put together a minimized example of what I'm seeing here:

import cats.effect._
import cats.implicits._
import fs2.Stream
import java.net.http.HttpClient
import org.http4s.client.jdkhttpclient._
import org.http4s.client.jdkhttpclient.WSFrame._
import org.http4s._
import org.http4s.headers._
import org.http4s.implicits._
import scala.concurrent.duration._

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    IO(HttpClient.newHttpClient).map(client => (JdkHttpClient[IO](client), JdkWSClient[IO](client))).flatMap {
      case (client, wsClient) =>
        client.expect[String](Request[IO](uri = uri"https://discordapp.com/api/gateway/bot", headers = Headers.of(headers))) >> connection(wsClient)
          .compile
          .drain
          .as(ExitCode.Success)
    }
  }

  // Removed since it's private info
  val token: String = ???

  val headers =
    Authorization(Credentials.Token("Bot".ci, token))

  def connection(wsClient: WSClient[IO]) =
    Stream
      .resource(wsClient.connectHighLevel(WSRequest(uri"wss://echo.websocket.org")))
      .flatMap { connection =>
        printResponses(connection).concurrently(
          Stream
            .awakeEvery[IO](1.seconds)
            .evalMap(time => connection.send(Text(s"${time.toSeconds} seconds")))
        )
      }

  def printResponses(connection: WSConnectionHighLevel[IO]): Stream[IO, Unit] = {
    connection.receiveStream
      .collect {
        case Text(data, _) => data
      }
      .evalMap(s => IO(println(s)))
  }

}

This prints out:

1 seconds
2 seconds
3 seconds
4 seconds
5 seconds
6 seconds
7 seconds
8 seconds
...
393 seconds
394 seconds
395 seconds
396 seconds
397 seconds

And then it hangs and CPU usage is stuck at 100%

@Billzabob
Copy link
Author

I was able to narrow this down even further. It seems to just be related to the client. The following code also hangs and pegs CPU at 100% after about 460 seconds:

import cats.effect._
import cats.implicits._
import fs2.Stream
import org.http4s._
import org.http4s.client.jdkhttpclient._
import org.http4s.headers._
import org.http4s.implicits._
import scala.concurrent.duration._

object Main2 extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    JdkHttpClient.simple[IO].flatMap { client =>
      Stream
        .awakeEvery[IO](5.seconds)
        .evalTap(_ => client.expect[String](Request[IO](uri = uri"https://discordapp.com/api/gateway/bot", headers = Headers.of(headers))))
        .evalTap(time => IO(println(s"${time.toSeconds} seconds")))
        .compile
        .drain
        .as(ExitCode.Success)
    }
  }

  val token = ???

  val headers =
    Authorization(Credentials.Token("Bot".ci, token))

}

@Billzabob Billzabob changed the title Deadlock occurs when sharing JDK client JdkHttpClient and JdkWSClient Deadlock eventually occurs in JdkHttpClient for certain requests Apr 1, 2020
@ChristopherDavenport
Copy link
Member

Since this is associated in some way with the JDK, which JDK is being used?

@Billzabob
Copy link
Author

Billzabob commented Apr 1, 2020

@ChristopherDavenport I'm using JDK 11
Specifically:

openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

@Billzabob
Copy link
Author

Any idea what it could be about this particular response that causes this? I tried testing this against a few other URLs and wasn't able to reproduce the issue.

@amesgen
Copy link
Member

amesgen commented Apr 1, 2020

How do I create such a token? I created a discord "application", added a bot and got a token, but I get 401 unauthorized.

@amesgen
Copy link
Member

amesgen commented Apr 1, 2020

I can confirm that increased cpu usage does not happen on e.g. https://discordapp.com (no api/gateway/bot).

Can you test if this happens with #191 ? It is the only thing I can think of right now.

@Billzabob
Copy link
Author

I'm not sure why you're getting a 401. It's been a while since I created my bot but I think that's all I did. I just ran it using java.net.http.HttpClient directly and it never froze. I also am running it with #191 right now and so far it's made it further than before so maybe that's the problem. I'll leave it running and let you know if it ever hangs 👍

@Billzabob
Copy link
Author

It's hung again :( After running for 770 seconds it stopped and once again I'm seeing 100% CPU usage.

@amesgen
Copy link
Member

amesgen commented Apr 1, 2020

Ah, I thought I had to change "Bot".ci, now it works. Now staring at CPU usage...

@amesgen
Copy link
Member

amesgen commented Apr 1, 2020

It has been running for 1300 seconds and I noticed nothing suspicious about CPU or memory usage. I am on master, not on #191.

My Java version:

openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10)
OpenJDK 64-Bit Server VM (build 11.0.6+10, mixed mode)

Can you test it with this version?

@amesgen
Copy link
Member

amesgen commented Apr 1, 2020

Maybe JDK-8221395 is the reason?

@Billzabob
Copy link
Author

I was hopeful. I tried it with that JDK but it's still freezing up for me :( This time it did at 430 seconds.

@amesgen
Copy link
Member

amesgen commented Apr 1, 2020

Can you turn on logging?

@amesgen
Copy link
Member

amesgen commented Apr 2, 2020

Uh, now it froze for me, after 4010 seconds (accidentally cancelled the first run after ~1800 seconds).

@Billzabob
Copy link
Author

Wow yours took way longer to freeze than any of my runs. Glad I'm not the only one able to produce the bug though. I won't be able to mess with this again until tomorrow, but I'll try running it with logging then.

@amesgen
Copy link
Member

amesgen commented Apr 2, 2020

After ~8000 seconds, I got another freeze, and the following lines were spammed again and again:

DEBUG: [HttpClient-1-Worker-0] [8175s 935ms] SSL Writer(SocketTube(5)) handshaking
DEBUG: [HttpClient-1-Worker-0] [8175s 935ms] SSL Writer(SocketTube(5)) wrapping 0 bytes
DEBUG: [HttpClient-1-Worker-0] [8175s 935ms] SSL Writer(SocketTube(5)) SSLResult: Status = OK HandshakeStatus = NEED_WRAP
bytesConsumed = 0 bytesProduced = 0
DEBUG: [HttpClient-1-Worker-0] [8175s 935ms] SSL Writer(SocketTube(5)) OK => produced: 0 bytes into 0, not wrapped: 0
DEBUG: [HttpClient-1-Worker-0] [8175s 935ms] SSL Writer(SocketTube(5)) wrapBuffer returned Status = OK HandshakeStatus = NEED_WRAP
bytesConsumed = 0 bytesProduced = 0

I will try to look into this tomorrow.

Maybe similar to http4s/http4s#2192 ?

@rossabaker
Copy link
Member

Looks a little like http4s/blaze#359.

@amesgen
Copy link
Member

amesgen commented Apr 2, 2020

Indeed, the discord API endpoint provides TLS 1.3.

@Billzabob can you test with TLS 1.3 disabled? Like this:

for {
  sslParams <- IO {
    val ssl = javax.net.ssl.SSLContext.getDefault()
    val params = ssl.getDefaultSSLParameters()
    params.setProtocols(Array("TLSv1.2"))
    params
  }
  javaclient <- IO(java.net.http.HttpClient.newBuilder().sslParameters(sslParams).build())
  client = JdkHttpClient[IO](javaclient)
  _ <- Stream
    .awakeEvery[IO](5.seconds)
    .evalTap(_ =>
      client.expect[String](
        Request[IO](
          uri = uri"https://discordapp.com/api/gateway/bot",
          headers = Headers.of(headers)
        )
      )
    )
    .evalTap(time => IO(println(s"${time.toSeconds} seconds")))
    .compile
    .drain
} yield ExitCode.Success

And also with Java 14?

@Billzabob
Copy link
Author

So far it's been running for 1800 seconds with the SSL change so maybe that's it! I'll leave it running for a while to make sure and then I'll try Java 14.

@Billzabob
Copy link
Author

Running with Java 14 (and without the custom SSL parameters) and it also has made it much further. Currently, it's at 2400 seconds. Thanks for helping me track this down. I guess it's not really a change to this repo though, it's a JDK bug so sorry for wasting your time but thanks a bunch for helping me track it down.

@amesgen
Copy link
Member

amesgen commented Apr 2, 2020

No problem! We will add it to the documentation.

@amesgen
Copy link
Member

amesgen commented Apr 2, 2020

@Billzabob If you have the time: Can you try to reproduce the deadlock in "pure Java"?

import java.net.*;
import java.net.http.*;
import java.util.*;

class Main {
  public static void main(String[] args) throws Exception {
    System.out.println("starting");
    var http = HttpClient.newHttpClient();
    var token = args[0];
    var req = HttpRequest.newBuilder()
                  .uri(URI.create("https://discordapp.com/api/gateway/bot"))
                  .header("Authorization", "Bot " + token)
                  .build();
    int s = 0;
    while (true) {
      Thread.sleep(5000);
      s += 5;
      var res = http.send(req, HttpResponse.BodyHandlers.ofString());
      System.out.println("status code " + res.statusCode());
      System.out.println(s);
    }
  }
}

Run with javac Main.java && java Main "token".

@Billzabob
Copy link
Author

Bad news, I'm currently at 9755 seconds without hanging... I'm using your pure Java example on JDK 11.0.6.

@Billzabob
Copy link
Author

I guess it must be something to do with the way the library uses the Java HttpClient that exposes the bug.

@amesgen
Copy link
Member

amesgen commented Apr 2, 2020

Yeah, I am at 17930 seconds. Yay, more debugging!

@Billzabob
Copy link
Author

Did some more messing around and put this together to try and mimic what your client does without actually using it:

import cats.effect._
import cats.implicits._
import fs2._
import fs2.interop.reactivestreams._
import java.net.http._
import java.nio.ByteBuffer
import java.util.concurrent.Flow
import org.http4s._
import org.http4s.headers._
import org.http4s.implicits._
import org.reactivestreams.FlowAdapters
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import java.net.URI
import java.net.http.HttpResponse.BodyHandlers
import java.util.concurrent._
import java.util.function.BiFunction

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    IO(HttpClient.newHttpClient()).flatMap { client =>
    Stream
      .awakeEvery[IO](5.seconds)
      .evalTap(_ => makeRequest(client).flatMap(a => IO(println(a.status))))
      .evalTap(time => IO(println(s"${time.toSeconds} seconds")))
      .compile
      .drain
    }.as(ExitCode.Success)
  }

  val token = ???

  val headers =
    Authorization(Credentials.Token("Bot".ci, token))

  def makeRequest(client: HttpClient): IO[Response[IO]] = {
    val request = HttpRequest.newBuilder().uri(URI.create("https://discordapp.com/api/gateway/bot")).header("Authorization", s"Bot $token").build()
    val response = IO(client.sendAsync(request, BodyHandlers.ofPublisher()))
    fromCompletableFuture(response).flatMap(convertResponse)
  }

  def convertResponse(res: HttpResponse[Flow.Publisher[java.util.List[ByteBuffer]]]): IO[Response[IO]] =
    IO.fromEither(Status.fromInt(res.statusCode)).map { status =>
      Response(
        status = status,
        headers = Headers(res.headers.map.asScala.flatMap {
          case (k, vs) => vs.asScala.map(Header(k, _))
        }.toList),
        httpVersion = res.version match {
          case HttpClient.Version.HTTP_1_1 => HttpVersion.`HTTP/1.1`
          case HttpClient.Version.HTTP_2 => HttpVersion.`HTTP/2.0`
        },
        body = FlowAdapters
          .toPublisher(res.body)
          .toStream[IO]
          .flatMap(bs =>
            Stream.fromIterator[IO](bs.asScala.map(Chunk.byteBuffer).iterator).flatMap(Stream.chunk)
          )
      )
    }

  def fromCompletableFuture[A](fcf: IO[CompletableFuture[A]]): IO[A] =
    fcf.flatMap { cf =>
      IO.cancelable { cb =>
        cf.handle[Unit](new BiFunction[A, Throwable, Unit] {
          override def apply(result: A, err: Throwable): Unit = err match {
            case null => cb(Right(result))
            case _: CancellationException => ()
            case ex: CompletionException if ex.getCause ne null => cb(Left(ex.getCause))
            case ex => cb(Left(ex))
          }
        })
        IO(cf.cancel(true)).void
      }
    }
}

I've run it twice now and both times it fails with this exception after ~1200 seconds:

java.io.IOException: too many concurrent streams
        at java.net.http/jdk.internal.net.http.Http2Connection.reserveStream(Http2Connection.java:440)
        at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:103)
        at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:89)
        at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:299)
        at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:431)
        at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:336)
        at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:328)
        at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:346)
        at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:292)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

Think this is related?

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

I think that is a different issue: the body of the response is not consumed, to the connection is kept open. I should probably fix that.

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

I can't reproduce your error above, but I think this should fix it:

Diff
diff --git a/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkHttpClient.scala
index 9f0f93f..14b1471 100644
--- a/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkHttpClient.scala
+++ b/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkHttpClient.scala
@@ -1,10 +1,5 @@
 package org.http4s.client.jdkhttpclient
 
-import cats.ApplicativeError
-import cats.effect._
-import cats.implicits._
-import fs2.interop.reactivestreams._
-import fs2.{Chunk, Stream}
 import java.net.URI
 import java.net.http.HttpRequest.BodyPublishers
 import java.net.http.HttpResponse.BodyHandlers
@@ -12,6 +7,13 @@ import java.net.http.{HttpClient, HttpRequest, HttpResponse}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.Flow
+
+import cats.ApplicativeError
+import cats.effect._
+import cats.implicits._
+import fs2.concurrent.SignallingRef
+import fs2.interop.reactivestreams._
+import fs2.{Chunk, Stream}
 import org.http4s.client.Client
 import org.http4s.client.jdkhttpclient.compat.CollectionConverters._
 import org.http4s.internal.fromCompletionStage
@@ -58,34 +60,43 @@ object JdkHttpClient {
         (if (headers.isEmpty) rb else rb.headers(headers: _*)).build
       }
 
-    def convertResponse(res: HttpResponse[Flow.Publisher[util.List[ByteBuffer]]]): F[Response[F]] =
-      F.fromEither(Status.fromInt(res.statusCode)).map { status =>
-        Response(
-          status = status,
-          headers = Headers(res.headers.map.asScala.flatMap {
-            case (k, vs) => vs.asScala.map(Header(k, _))
-          }.toList),
-          httpVersion = res.version match {
-            case HttpClient.Version.HTTP_1_1 => HttpVersion.`HTTP/1.1`
-            case HttpClient.Version.HTTP_2 => HttpVersion.`HTTP/2.0`
-          },
-          body = FlowAdapters
-            .toPublisher(res.body)
-            .toStream[F]
-            .flatMap(bs =>
-              Stream.fromIterator(bs.asScala.map(Chunk.byteBuffer).iterator).flatMap(Stream.chunk)
-            )
-        )
-      }
+    def convertResponse(
+        res: HttpResponse[Flow.Publisher[util.List[ByteBuffer]]]
+    ): Resource[F, Response[F]] =
+      Resource(
+        (F.fromEither(Status.fromInt(res.statusCode)), SignallingRef[F, Boolean](false)).mapN {
+          case (status, signal) =>
+            Response(
+              status = status,
+              headers = Headers(res.headers.map.asScala.flatMap {
+                case (k, vs) => vs.asScala.map(Header(k, _))
+              }.toList),
+              httpVersion = res.version match {
+                case HttpClient.Version.HTTP_1_1 => HttpVersion.`HTTP/1.1`
+                case HttpClient.Version.HTTP_2 => HttpVersion.`HTTP/2.0`
+              },
+              body = FlowAdapters
+                .toPublisher(res.body)
+                .toStream[F]
+                .flatMap(bs =>
+                  Stream
+                    .fromIterator(bs.asScala.map(Chunk.byteBuffer).iterator)
+                    .flatMap(Stream.chunk)
+                    .interruptWhen(signal)
+                )
+            ) -> signal.set(true)
+        }
+      )
 
     Client[F] { req =>
-      Resource.liftF(
-        convertRequest(req)
-          .flatMap(r =>
-            fromCompletionStage(F.delay(jdkHttpClient.sendAsync(r, BodyHandlers.ofPublisher)))
-          )
-          .flatMap(convertResponse)
-      )
+      Resource
+        .liftF(
+          convertRequest(req)
+            .flatMap(r =>
+              fromCompletionStage(F.delay(jdkHttpClient.sendAsync(r, BodyHandlers.ofPublisher)))
+            )
+        )
+        .flatMap(convertResponse)
     }
   }

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

I corrected the diff, sry.

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

Randomly, I got the freezes much faster than before, which made debugging much easier. I think this issue (the freeze issue, not the too many concurrent streams issue, which I can't reproduce) is somehow related to fs2-reactive-streams, because if I use BodyHandlers.ofByteArray instead of BodyHandlers.ofPublisher, I get no freezes locally.

Branch: https://github.com/amesgen/http4s-jdk-http-client/tree/weird-deadlock-stuff

EDIT: false alarm

@Billzabob
Copy link
Author

Billzabob commented Apr 3, 2020

That makes sense and your fix makes sense, but I still got the same too many concurrent streams error after running it again with that change somehow.

I'll try with that branch and see if that fixes the problem for me.

@Billzabob
Copy link
Author

That branch froze up for me too :( 430 seconds in

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

I also got a freeze after 380 seconds, just after I cancelled the previous run which was 2000 seconds in (and did not freeze)...

So fs2-reactive-streams is not the culprit...

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

There really is not much code left in comparison to the (non-freezing) plain Java version, I will try to create a minimal example tomorrow.

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

Ok, "good" news: Both the freeze and the too many concurrent streams issue are not http4s-related.


Freezes

Code (HTTP 1.1)
import java.net.URI
import java.net.http.HttpClient.Version
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest}
import java.time.Instant
import java.util.concurrent.{CancellationException, CompletionException, CompletionStage}

import cats.effect._
import cats.effect.implicits._
import cats.implicits._
import fs2.Stream

import scala.concurrent.duration._

object Main extends IOApp {

  override def run(args: List[String]): IO[ExitCode] =
    for {
      client <- IO(HttpClient.newHttpClient())
      runRequest = fromCompletionStage(
        IO(
          client.sendAsync(
            HttpRequest
              .newBuilder()
              .uri(URI.create("https://discordapp.com/api/gateway/bot"))
              .method("GET", BodyPublishers.noBody())
              .header("Authorization", s"Bot $token")
              .version(Version.HTTP_1_1)
              .build(),
            BodyHandlers.ofString()
          )
        )
      )
      _ <- Stream
        .awakeEvery[IO](5.seconds)
        .evalTap(_ => runRequest.flatMap(s => IO(println(s))))
        .evalTap(time => IO(println(s"[${Instant.now()}] ${time.toSeconds} seconds")))
        .compile
        .drain
    } yield ExitCode.Success

  def fromCompletionStage[F[_], CF[x] <: CompletionStage[x], A](
      fcs: F[CF[A]]
  )(implicit F: Concurrent[F], CS: ContextShift[F]): F[A] =
    fcs.flatMap { cs =>
      F.async[A] { cb =>
          cs.handle[Unit] { (result, err) =>
            err match {
              case null => cb(Right(result))
              case _: CancellationException => ()
              case ex: CompletionException if ex.getCause ne null => cb(Left(ex.getCause))
              case ex => cb(Left(ex))
            }
          }
          ()
        }
        .guarantee(CS.shift)
    }

  val token = "foo"
}

Also without fs2, only cats-effect as a dependency:

Code without fs2 (HTTP 1.1)
import java.net.URI
import java.net.http.HttpClient.Version
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest}
import java.time.{Instant, Duration => JDuration}
import java.util.concurrent.{CancellationException, CompletionException, CompletionStage}

import cats.effect._
import cats.effect.implicits._
import cats.implicits._

import scala.concurrent.duration._

object Main extends IOApp {

  override def run(args: List[String]): IO[ExitCode] =
    for {
      client <- IO(HttpClient.newHttpClient())
      runRequest = fromCompletionStage(
        IO(
          client.sendAsync(
            HttpRequest
              .newBuilder()
              .uri(URI.create("https://discordapp.com/api/gateway/bot"))
              .method("GET", BodyPublishers.noBody())
              .header("Authorization", s"Bot $token")
              .version(Version.HTTP_1_1)
              .build(),
            BodyHandlers.ofString()
          )
        )
      )
      start <- IO(Instant.now())
      ec <- (runRequest <* IO.sleep(5.seconds)).flatMap(s =>
        IO {
          println(s.statusCode)
          val now = Instant.now()
          println(s"[$now] ${JDuration.between(start, now).toSeconds} seconds")
        }
      ).foreverM
    } yield ec

  def fromCompletionStage[F[_], CF[x] <: CompletionStage[x], A](
      fcs: F[CF[A]]
  )(implicit F: Concurrent[F], CS: ContextShift[F]): F[A] =
    fcs.flatMap { cs =>
      F.async[A] { cb =>
          cs.handle[Unit] { (result, err) =>
            err match {
              case null => cb(Right(result))
              case _: CancellationException => ()
              case ex: CompletionException if ex.getCause ne null => cb(Left(ex.getCause))
              case ex => cb(Left(ex))
            }
          }
          ()
        }
        .guarantee(CS.shift)
    }

  val token = "foo"

}

If I use HTTP 2 instead of HTTP 1.1 in the second example, I can reproduce the too many concurrent streams issue, so this issue is definitely unrelated to #207 sry, I am forgetting what I was testing. Actually, I also changed the body handler from BodyHandlers.ofString to BodyHandlers.ofPublisher in order to not close the connections properly, so it is actually expected to reproduce the too many concurrent streams issue... Sry for the confusion.

Code without fs2 (HTTP 2 and intended connection leakage)
import java.net.URI
import java.net.http.HttpClient.Version
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest}
import java.time.{Instant, Duration => JDuration}
import java.util.concurrent.{CancellationException, CompletionException, CompletionStage}

import cats.effect._
import cats.effect.implicits._
import cats.implicits._

import scala.concurrent.duration._

object Main extends IOApp {

  override def run(args: List[String]): IO[ExitCode] =
    for {
      client <- IO(HttpClient.newHttpClient())
      runRequest = fromCompletionStage(
        IO(
          client.sendAsync(
            HttpRequest
              .newBuilder()
              .uri(URI.create("https://discordapp.com/api/gateway/bot"))
              .method("GET", BodyPublishers.noBody())
              .header("Authorization", s"Bot $token")
              .version(Version.HTTP_2)
              .build(),
            BodyHandlers.ofPublisher()
          )
        )
      )
      start <- IO(Instant.now())
      ec <- (runRequest <* IO.sleep(5.seconds)).flatMap(s =>
        IO {
          println(s.statusCode)
          val now = Instant.now()
          println(s"[$now] ${JDuration.between(start, now).toSeconds} seconds")
        }
      ).foreverM
    } yield ec

  def fromCompletionStage[F[_], CF[x] <: CompletionStage[x], A](
      fcs: F[CF[A]]
  )(implicit F: Concurrent[F], CS: ContextShift[F]): F[A] =
    fcs.flatMap { cs =>
      F.async[A] { cb =>
          cs.handle[Unit] { (result, err) =>
            err match {
              case null => cb(Right(result))
              case _: CancellationException => ()
              case ex: CompletionException if ex.getCause ne null => cb(Left(ex.getCause))
              case ex => cb(Left(ex))
            }
          }
          ()
        }
        .guarantee(CS.shift)
    }

  val token = "foo"

}

In addition to the synchronous "plain Java" version above, here also a version with sendAsync.

Code without fs2 (HTTP 2)
import java.net.*;
import java.net.http.*;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.*;
import java.util.concurrent.*;

class Main {
  public static void main(String[] args) throws Exception {
    System.out.println("starting");
    var http = HttpClient.newHttpClient();
    var token = args[0];
    var req = HttpRequest.newBuilder()
                  .uri(URI.create("https://discordapp.com/api/gateway/bot"))
                  .header("Authorization", "Bot " + token)
                  .version(HttpClient.Version.HTTP_2)
                  .build();
    int s = 0;
    var fut = CompletableFuture.completedFuture(0);
    while (true) {
      Thread.sleep(5000);
      s += 5;
      final var ss = s;
      fut =
          fut.thenCompose(
                 $ -> http.sendAsync(req, HttpResponse.BodyHandlers.ofString()))
              .thenApply(res -> {
                System.out.println("status code " + res.statusCode());
                System.out.println(ss);
                return 0;
              });
    }
  }
}

Both HTTP 1.1 and HTTP 2 work fine, without any freezes.

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

This GH issue is already pretty big, and we have two (probably unrelated subissues), and none of them are related to http4s or http4s-jdk-http-client directly. Do we want to move the discussion elsewhere?

@amesgen
Copy link
Member

amesgen commented Apr 3, 2020

Ha, I just got a freeze with this "plain Java" code:

Code
import java.net.*;
import java.net.http.*;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.*;
import java.util.concurrent.*;

class Main {
  public static void main(String[] args) throws Exception {
    System.out.println("starting");
    var http = HttpClient.newHttpClient();
    var token = args[0];
    var req = HttpRequest.newBuilder()
                  .uri(URI.create("https://discordapp.com/api/gateway/bot"))
                  .header("Authorization", "Bot " + token)
                  .version(HttpClient.Version.HTTP_2)
                  .build();
    int s = 0;
    var fut = CompletableFuture.completedFuture(0);
    while (true) {
      Thread.sleep(5000);
      s += 5;
      final var ss = s;
      fut =
          fut.thenCompose(
                 $ -> http.sendAsync(req, HttpResponse.BodyHandlers.ofPublisher()))
              .thenApply(res -> {
                System.out.println("status code " + res.statusCode());
                System.out.println(ss);
                return 0;
              });
    }
  }
}

Now this seems to indicate that the freeze issue is indeed a Java 11 issue.

@Billzabob Can you confirm this?

@Billzabob
Copy link
Author

Yep! That froze for me too after 1280 seconds.

@Billzabob
Copy link
Author

I tried modifying your example and changing BodyHandlers.ofPublisher() to BodyHandlers.discarding() and now it's been going for 2850 seconds without hanging. Think it could have something to do with that? I'll leave it running to make sure. I know nothing about java.util.concurrent.Flow though.

@amesgen
Copy link
Member

amesgen commented Apr 4, 2020

Yes, that is still confusing me:

  • When using BodyPublishers.ofPublisher like in the "intended connection leakage" example from here, it results in a too many concurrent streams exception. This seems very reasonable.
  • When using BodyPublishers.ofPublisher with the plain Java version, we get a freeze (I confirmed that it still is the SSL issue). This is weird, I would expect an too many concurrent streams exception.

But when using an "eager" BodyPublisher like discarding or ofString, the cats-effect version does freeze (see here), although plain Java seems to work fine.

@Billzabob
Copy link
Author

Wow. That is extremely confusing. This type of stuff is such a pain to track down. Especially since it takes minutes or hours to determine if it's going to freeze or not.

@lhns
Copy link

lhns commented Aug 9, 2020

I think I'm also having this issue with my simple one-file http-retry-proxy microservice:
https://github.com/LolHens/http-wait/blob/cd87c443e76812dbecaeae9b7c51f5004f6aa181/src/main/scala/de/lolhens/httpwait/Main.scala
I am not using websockets, just simple http requests. I run this as a graal native-image in a docker container.
This runs fine sometimes for a few days and sometimes for weeks but now and then it just locks up and I also get 100% cpu on the container.
Maybe this info helps to narrow it down. For now I will try switching to another client backend.

@amesgen
Copy link
Member

amesgen commented Aug 9, 2020

Thanks for reminding me of this issue @LolHens

Note that this issue is not caused by http4s-jdk-http-client, but rather by some weird interaction of cats-effect and TLSv1.3 stuff (maybe cats-effect only increases the likelihood of this TLS bug to occur). src

There two easy workarounds (also see our docs):

  • Disable TLSv1.3 (also see here)
  • Use Java > 11

@lhns
Copy link

lhns commented Aug 9, 2020

Thanks for the quick info! I will try disabling TLSv1.3 then.

amesgen added a commit to amesgen/http4s-jdk-http-client that referenced this issue Aug 9, 2020
amesgen added a commit to amesgen/http4s-jdk-http-client that referenced this issue Aug 9, 2020
amesgen added a commit that referenced this issue Aug 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants