Skip to content

Commit

Permalink
I can't sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Sep 22, 2018
1 parent 1dfdbf2 commit 630aaa1
Showing 1 changed file with 26 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.http4s.client.okhttp

import java.io.IOException

import cats.data._
import cats.effect._
import cats.implicits._
import cats.effect.implicits._
Expand All @@ -21,7 +20,7 @@ import okhttp3.{
}
import okio.BufferedSink
import org.http4s.{Header, Headers, HttpVersion, Method, Request, Response, Status}
import org.http4s.client.{Client, DisposableResponse}
import org.http4s.client.Client
import org.http4s.internal.invokeCallback
import org.log4s.getLogger
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -62,14 +61,14 @@ sealed abstract class OkHttpBuilder[F[_]] private (
*
* The shutdown method on this client is a no-op. $WHYNOSHUTDOWN
*/
def create(implicit F: ConcurrentEffect[F], cs: ContextShift[F]): Client[F] = Client(open, F.unit)
def create(implicit F: ConcurrentEffect[F], cs: ContextShift[F]): Client[F] = Client(run)

/** Creates the [[org,http4s.client.Client]] as a resource.
*
* The release on this resource is a no-op. $WHYNOSHUTDOWN
*/
def resource(implicit F: ConcurrentEffect[F], cs: ContextShift[F]): Resource[F, Client[F]] =
Resource.make(F.delay(create))(_.shutdown)
Resource.make(F.delay(create))(_ => F.unit)

/** Creates the [[org,http4s.client.Client]] as a singleton stream.
*
Expand All @@ -78,15 +77,13 @@ sealed abstract class OkHttpBuilder[F[_]] private (
def stream(implicit F: ConcurrentEffect[F], cs: ContextShift[F]): Stream[F, Client[F]] =
Stream.resource(resource)

private def open(implicit F: ConcurrentEffect[F], cs: ContextShift[F]) =
Kleisli { req: Request[F] =>
F.async[DisposableResponse[F]] { cb =>
okHttpClient.newCall(toOkHttpRequest(req)).enqueue(handler(cb))
()
}
}
private def run(req: Request[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F]) =
Resource.suspend(F.async[Resource[F, Response[F]]] { cb =>
okHttpClient.newCall(toOkHttpRequest(req)).enqueue(handler(cb))
()
})

private def handler(cb: Either[Throwable, DisposableResponse[F]] => Unit)(
private def handler(cb: Either[Throwable, Resource[F, Response[F]]] => Unit)(
implicit F: ConcurrentEffect[F],
cs: ContextShift[F]): Callback =
new Callback {
Expand All @@ -102,24 +99,31 @@ sealed abstract class OkHttpBuilder[F[_]] private (
}
val status = Status.fromInt(response.code())
val bodyStream = response.body.byteStream()
val dr = status
val body = readInputStream(F.pure(bodyStream), 1024, blockingExecutionContext, false)
val dispose = F.delay {
bodyStream.close()
()
}
val r = status
.map { s =>
new DisposableResponse[F](
Response[F](headers = getHeaders(response), httpVersion = protocol)
.withStatus(s)
.withBodyStream(
readInputStream(F.pure(bodyStream), 1024, blockingExecutionContext, true)),
F.delay({
bodyStream.close(); ()
})
Resource[F, Response[F]](
F.pure(
(
Response[F](
status = s,
headers = getHeaders(response),
httpVersion = protocol,
body = body),
dispose
))
)
}
.leftMap { t =>
// we didn't understand the status code, close the body and return a failure
bodyStream.close()
t
}
invokeCallback(logger)(cb(dr))
invokeCallback(logger)(cb(r))
}
}

Expand Down

0 comments on commit 630aaa1

Please sign in to comment.