Skip to content

Commit

Permalink
Merge branch 'series/0.23' into sanitize-0.23
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Sep 20, 2021
2 parents 303bbf7 + 56c50a9 commit 4295e05
Show file tree
Hide file tree
Showing 293 changed files with 5,078 additions and 4,359 deletions.
13 changes: 0 additions & 13 deletions .github/workflows/ci.yml
Expand Up @@ -138,19 +138,6 @@ jobs:
- name: Publish docs
env:
SSH_PRIVATE_KEY: ${{ secrets.SSH_PRIVATE_KEY }}
run: |
eval "$(ssh-agent -s)"
echo "$SSH_PRIVATE_KEY" | ssh-add -
git config --global user.name "GitHub Actions CI"
git config --global user.email "ghactions@invalid"
sbt ++2.12.15 docs/makeSite docs/ghpagesPushSite
website:
name: Build website
strategy:
Expand Down
2 changes: 1 addition & 1 deletion NOTICE
@@ -1,5 +1,5 @@
http4s
Copyright 2013-2018 http4s.org
Copyright 2013-2021 http4s.org
Licensed under Apache License 2.0 (see LICENSE)

This software contains portions of code derived from akka-http
Expand Down
2 changes: 1 addition & 1 deletion README.md
@@ -1,4 +1,4 @@
# Http4s [![Build Status](https://travis-ci.org/http4s/http4s.svg?branch=master)](https://travis-ci.org/http4s/http4s) [![Gitter chat](https://badges.gitter.im/http4s/http4s.png)](https://gitter.im/http4s/http4s) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.http4s/http4s-core_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.http4s/http4s-core_2.12) [![Typelevel library](https://img.shields.io/badge/typelevel-library-green.svg)](https://typelevel.org/projects/#http4s) <a href="https://typelevel.org/cats/"><img src="https://typelevel.org/cats/img/cats-badge.svg" height="40px" align="right" alt="Cats friendly" /></a>
# Http4s [![Build Status](https://github.com/http4s/http4s/workflows/Continuous%20Integration/badge.svg?branch=main)](https://github.com/http4s/http4s/actions?query=branch%3Amain+workflow%3A%22Continuous+Integration%22) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.http4s/http4s-core_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.http4s/http4s-core_2.12) [![Typelevel library](https://img.shields.io/badge/typelevel-library-green.svg)](https://typelevel.org/projects/#http4s) <a href="https://typelevel.org/cats/"><img src="https://typelevel.org/cats/img/cats-badge.svg" height="40px" align="right" alt="Cats friendly" /></a>

Http4s is a minimal, idiomatic Scala interface for HTTP services. Http4s is
Scala's answer to Ruby's Rack, Python's WSGI, Haskell's WAI, and Java's
Expand Down
13 changes: 8 additions & 5 deletions SECURITY.md
Expand Up @@ -6,11 +6,14 @@ We are currently providing security updates to the following versions:

| Version | Supported |
| ------- | ------------------ |
| 0.21.x | :white_check_mark: |
| 0.20.x | :white_check_mark: |
| 0.19.x | :x: |
| 0.18.x | :x: |
| < 0.18 | :x: |
| 1.0.x | :white_check_mark: |
| 0.23.x | :white_check_mark: |
| 0.22.x | :white_check_mark: |
| 0.21.x | :white_check_mark: |
| 0.20.x | :x: |
| 0.19.x | :x: |
| 0.18.x | :x: |
| < 0.18 | :x: |

## Reporting a Vulnerability

Expand Down
Expand Up @@ -19,9 +19,9 @@ package asynchttpclient
package client

import cats.effect._
import cats.effect.concurrent._
import cats.effect.kernel.{Async, Resource}
import cats.effect.std.Dispatcher
import cats.syntax.all._
import cats.effect.implicits._
import fs2.Stream._
import fs2._
import fs2.interop.reactivestreams.{StreamSubscriber, StreamUnicastPublisher}
Expand All @@ -48,28 +48,38 @@ object AsyncHttpClient {
.setCookieStore(new NoOpCookieStore)
.build()

def apply[F[_]](httpClient: AsyncHttpClient)(implicit F: ConcurrentEffect[F]): Client[F] =
Client[F] { req =>
Resource(F.async[(Response[F], F[Unit])] { cb =>
httpClient.executeRequest(toAsyncRequest(req), asyncHandler(cb))
()
})
/** Create a HTTP client with an existing AsyncHttpClient client. The supplied client is NOT
* closed by this Resource!
*/
def fromClient[F[_]](httpClient: AsyncHttpClient)(implicit F: Async[F]): Resource[F, Client[F]] =
Dispatcher[F].flatMap { dispatcher =>
val client = Client[F] { req =>
Resource(F.async[(Response[F], F[Unit])] { cb =>
F.delay(httpClient
.executeRequest(toAsyncRequest(req, dispatcher), asyncHandler(cb, dispatcher)))
.as(None)
})
}

Resource.eval(F.pure(client))
}

/** Allocates a Client and its shutdown mechanism for freeing resources.
*/
def allocate[F[_]](config: AsyncHttpClientConfig = defaultConfig)(implicit
F: ConcurrentEffect[F]): F[(Client[F], F[Unit])] =
F.delay(new DefaultAsyncHttpClient(config))
.map(c => (apply(c), F.delay(c.close())))
F: Async[F]): F[(Client[F], F[Unit])] =
resource(config).allocated

/** Create an HTTP client based on the AsyncHttpClient library
*
* @param config configuration for the client
*/
def resource[F[_]](config: AsyncHttpClientConfig = defaultConfig)(implicit
F: ConcurrentEffect[F]): Resource[F, Client[F]] =
Resource(allocate(config))
F: Async[F]): Resource[F, Client[F]] =
Resource.make(F.delay(new DefaultAsyncHttpClient(config)))(c => F.delay(c.close())).flatMap {
httpClient =>
fromClient(httpClient)
}

/** Create a bracketed HTTP client based on the AsyncHttpClient library.
*
Expand All @@ -78,7 +88,7 @@ object AsyncHttpClient {
* shutdown when the stream terminates.
*/
def stream[F[_]](config: AsyncHttpClientConfig = defaultConfig)(implicit
F: ConcurrentEffect[F]): Stream[F, Client[F]] =
F: Async[F]): Stream[F, Client[F]] =
Stream.resource(resource(config))

/** Create a custom AsyncHttpClientConfig
Expand All @@ -93,8 +103,8 @@ object AsyncHttpClient {
configurationFn(defaultConfigBuilder).build()
}

private def asyncHandler[F[_]](cb: Callback[(Response[F], F[Unit])])(implicit
F: ConcurrentEffect[F]) =
private def asyncHandler[F[_]](cb: Callback[(Response[F], F[Unit])], dispatcher: Dispatcher[F])(
implicit F: Async[F]) =
new StreamedAsyncHandler[Unit] {
var state: State = State.CONTINUE
var response: Response[F] = Response()
Expand All @@ -106,7 +116,7 @@ object AsyncHttpClient {
val eff = for {
_ <- onStreamCalled.set(true)

subscriber <- StreamSubscriber[F, HttpResponseBodyPart]
subscriber <- StreamSubscriber[F, HttpResponseBodyPart](dispatcher)

subscribeF = F.delay(publisher.subscribe(subscriber))

Expand All @@ -117,7 +127,7 @@ object AsyncHttpClient {
body =
subscriber
.stream(bodyDisposal.set(F.unit) >> subscribeF)
.flatMap(part => chunk(Chunk.bytes(part.getBodyPartBytes)))
.flatMap(part => chunk(Chunk.array(part.getBodyPartBytes)))
.mergeHaltBoth(Stream.eval(deferredThrowable.get.flatMap(F.raiseError[Byte])))

responseWithBody = response.copy(body = body)
Expand All @@ -126,7 +136,7 @@ object AsyncHttpClient {
invokeCallbackF[F](cb(Right(responseWithBody -> (dispose >> bodyDisposal.get.flatten))))
} yield ()

eff.runAsync(_ => IO.unit).unsafeRunSync()
dispatcher.unsafeRunSync(eff)

state
}
Expand All @@ -144,39 +154,46 @@ object AsyncHttpClient {
state
}

override def onThrowable(throwable: Throwable): Unit =
onStreamCalled.get
override def onThrowable(throwable: Throwable): Unit = {
val fa = onStreamCalled.get
.ifM(
ifTrue = deferredThrowable.complete(throwable),
ifTrue = deferredThrowable.complete(throwable).void,
ifFalse = invokeCallbackF(cb(Left(throwable))))
.runAsync(_ => IO.unit)
.unsafeRunSync()

override def onCompleted(): Unit =
onStreamCalled.get
dispatcher.unsafeRunSync(fa)
}

override def onCompleted(): Unit = {
val fa = onStreamCalled.get
.ifM(ifTrue = F.unit, ifFalse = invokeCallbackF[F](cb(Right(response -> dispose))))
.runAsync(_ => IO.unit)
.unsafeRunSync()

dispatcher.unsafeRunSync(fa)
}
}

// use fibers to access the ContextShift and ensure that we get off of the AHC thread pool
private def invokeCallbackF[F[_]](invoked: => Unit)(implicit F: Concurrent[F]): F[Unit] =
F.start(F.delay(invoked)).flatMap(_.join)
private def invokeCallbackF[F[_]](invoked: => Unit)(implicit F: Async[F]): F[Unit] =
F.start(F.delay(invoked)).flatMap(_.joinWithNever)

private def toAsyncRequest[F[_]: ConcurrentEffect](request: Request[F]): AsyncRequest = {
private def toAsyncRequest[F[_]: Async](
request: Request[F],
dispatcher: Dispatcher[F]): AsyncRequest = {
val headers = new DefaultHttpHeaders
for (h <- request.headers.headers)
headers.add(h.name.toString, h.value)
new RequestBuilder(request.method.renderString)
.setUrl(request.uri.renderString)
.setHeaders(headers)
.setBody(getBodyGenerator(request))
.setBody(getBodyGenerator(request, dispatcher))
.build()
}

private def getBodyGenerator[F[_]: ConcurrentEffect](req: Request[F]): BodyGenerator = {
private def getBodyGenerator[F[_]: Async](
req: Request[F],
dispatcher: Dispatcher[F]): BodyGenerator = {
val publisher = StreamUnicastPublisher(
req.body.chunks.map(chunk => Unpooled.wrappedBuffer(chunk.toArray)))
req.body.chunks.map(chunk => Unpooled.wrappedBuffer(chunk.toArray)),
dispatcher)
if (req.isChunked) new ReactiveStreamsBodyGenerator(publisher, -1)
else
req.contentLength match {
Expand Down
Expand Up @@ -23,7 +23,7 @@ import org.asynchttpclient.DefaultAsyncHttpClient
import org.asynchttpclient.HostStats
import org.http4s.client.{Client, ClientRouteTestBattery, DefaultClient, defaults}

class AsyncHttpClientSpec extends ClientRouteTestBattery("AsyncHttpClient") with Http4sSuite {
class AsyncHttpClientSuite extends ClientRouteTestBattery("AsyncHttpClient") with Http4sSuite {

def clientResource: Resource[IO, Client[IO]] = AsyncHttpClient.resource[IO]()

Expand Down Expand Up @@ -76,15 +76,13 @@ class AsyncHttpClientSpec extends ClientRouteTestBattery("AsyncHttpClient") with
}

test("AsyncHttpClientStats should correctly get the stats from the underlying ClientStats") {

val clientWithStats: Resource[IO, Client[IO]] = Resource(
IO.delay(new DefaultAsyncHttpClient(AsyncHttpClient.defaultConfig))
.map(c =>
(
new ClientWithStats(
AsyncHttpClient.apply(c),
new AsyncHttpClientStats[IO](c.getClientStats)),
IO.delay(c.close()))))
val clientWithStats: Resource[IO, Client[IO]] =
for {
httpClient <- Resource.make(
IO.delay(new DefaultAsyncHttpClient(AsyncHttpClient.defaultConfig)))(client =>
IO(client.close()))
client <- AsyncHttpClient.fromClient[IO](httpClient)
} yield new ClientWithStats(client, new AsyncHttpClientStats[IO](httpClient.getClientStats))

val clientStats: Resource[IO, AsyncHttpClientStats[IO]] = clientWithStats.map {
case client: ClientWithStats => client.getStats
Expand Down
1 change: 1 addition & 0 deletions bench/src/main/scala/org/http4s/bench/CirceJsonBench.scala
Expand Up @@ -20,6 +20,7 @@ package bench
import java.util.concurrent.TimeUnit

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.circe._
import io.circe.parser._
import org.http4s.circe._
Expand Down
Expand Up @@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit

import org.http4s._
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.openjdk.jmh.annotations._
import cats.effect.ContextShift
import org.http4s.ember.core.Parser

// sbt "bench/Jmh/run -i 5 -wi 5 -f 1 -t 1 org.http4s.ember.bench.EmberParserBench"
Expand Down Expand Up @@ -54,8 +54,6 @@ class EmberParserBench {

object EmberParserBench {

implicit val CS: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)

@State(Scope.Benchmark)
class BenchState {
val maxHeaderSize = 256 * 1024
Expand Down

0 comments on commit 4295e05

Please sign in to comment.