Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade core to cats-effect-3 #3784

Merged
merged 11 commits into from Oct 28, 2020
7 changes: 7 additions & 0 deletions blaze-core/src/main/scala/org/http4s/blazecore/package.scala
Expand Up @@ -10,6 +10,13 @@ import cats.effect.{Resource, Sync}
import org.http4s.blaze.util.{Cancelable, TickWheelExecutor}

package object blazecore {

// Like fs2.async.unsafeRunAsync before 1.0. Convenient for when we
// have an ExecutionContext but not a Timer.
private[http4s] def unsafeRunAsync[F[_], A](fa: F[A])(
f: Either[Throwable, A] => IO[Unit])(implicit F: Effect[F], ec: ExecutionContext): Unit =
F.runAsync(Async.shift(ec) *> fa)(f).unsafeRunSync()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really keen to get to blaze and destroy this


private[http4s] def tickWheelResource[F[_]](implicit F: Sync[F]): Resource[F, TickWheelExecutor] =
Resource(F.delay {
val s = new TickWheelExecutor()
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/scala/org/http4s/EntityDecoder.scala
Expand Up @@ -7,13 +7,14 @@
package org.http4s

import cats.{Applicative, Functor, Monad, SemigroupK}
import cats.effect.{Blocker, ContextShift, Sync}
import cats.effect.Sync
import cats.implicits._
import fs2._
import fs2.io.file.writeAll
import fs2.io.file.Files
import java.io.File
import org.http4s.multipart.{Multipart, MultipartDecoder}
import scala.annotation.implicitNotFound
import cats.effect.Concurrent

/** A type that can be used to decode a [[Message]]
* EntityDecoder is used to attempt to decode a [[Message]] returning the
Expand Down Expand Up @@ -224,23 +225,19 @@ object EntityDecoder {
text.map(_.toArray)

// File operations
def binFile[F[_]](file: File, blocker: Blocker)(implicit
F: Sync[F],
cs: ContextShift[F]): EntityDecoder[F, File] =
def binFile[F[_]: Files: Concurrent](file: File): EntityDecoder[F, File] =
EntityDecoder.decodeBy(MediaRange.`*/*`) { msg =>
val pipe = writeAll[F](file.toPath, blocker)
val pipe = Files[F].writeAll(file.toPath)
DecodeResult.success(msg.body.through(pipe).compile.drain).map(_ => file)
}

def textFile[F[_]](file: File, blocker: Blocker)(implicit
F: Sync[F],
cs: ContextShift[F]): EntityDecoder[F, File] =
def textFile[F[_]: Files: Concurrent](file: File): EntityDecoder[F, File] =
EntityDecoder.decodeBy(MediaRange.`text/*`) { msg =>
val pipe = writeAll[F](file.toPath, blocker)
val pipe = Files[F].writeAll(file.toPath)
DecodeResult.success(msg.body.through(pipe).compile.drain).map(_ => file)
}

implicit def multipart[F[_]: Sync]: EntityDecoder[F, Multipart[F]] =
implicit def multipart[F[_]: Concurrent]: EntityDecoder[F, Multipart[F]] =
MultipartDecoder.decoder

/** An entity decoder that ignores the content and returns unit. */
Expand Down
23 changes: 10 additions & 13 deletions core/src/main/scala/org/http4s/EntityEncoder.scala
Expand Up @@ -7,10 +7,10 @@
package org.http4s

import cats.{Contravariant, Show}
import cats.effect.{Blocker, ContextShift, Effect, Sync}
import cats.effect.Sync
import cats.implicits._
import fs2.{Chunk, Stream}
import fs2.io.file.readAll
import fs2.io.file.Files
import fs2.io.readInputStream
import java.io._
import java.nio.CharBuffer
Expand Down Expand Up @@ -146,36 +146,33 @@ object EntityEncoder {

// TODO parameterize chunk size
// TODO if Header moves to Entity, can add a Content-Disposition with the filename
def fileEncoder[F[_]](
blocker: Blocker)(implicit F: Effect[F], cs: ContextShift[F]): EntityEncoder[F, File] =
filePathEncoder[F](blocker).contramap(_.toPath)
def fileEncoder[F[_]: Files]: EntityEncoder[F, File] =
filePathEncoder[F].contramap(_.toPath)

// TODO parameterize chunk size
// TODO if Header moves to Entity, can add a Content-Disposition with the filename
def filePathEncoder[F[_]: Sync: ContextShift](blocker: Blocker): EntityEncoder[F, Path] =
def filePathEncoder[F[_]: Files]: EntityEncoder[F, Path] =
encodeBy[F, Path](`Transfer-Encoding`(TransferCoding.chunked)) { p =>
Entity(readAll[F](p, blocker, 4096)) //2 KB :P
Entity(Files[F].readAll(p, 4096)) //2 KB :P
}

// TODO parameterize chunk size
def inputStreamEncoder[F[_]: Sync: ContextShift, IS <: InputStream](
blocker: Blocker): EntityEncoder[F, F[IS]] =
def inputStreamEncoder[F[_]: Sync, IS <: InputStream]: EntityEncoder[F, F[IS]] =
entityBodyEncoder[F].contramap { (in: F[IS]) =>
readInputStream[F](in.widen[InputStream], DefaultChunkSize, blocker)
readInputStream[F](in.widen[InputStream], DefaultChunkSize)
}

// TODO parameterize chunk size
implicit def readerEncoder[F[_], R <: Reader](blocker: Blocker)(implicit
implicit def readerEncoder[F[_], R <: Reader](implicit
F: Sync[F],
cs: ContextShift[F],
charset: Charset = DefaultCharset): EntityEncoder[F, F[R]] =
entityBodyEncoder[F].contramap { (fr: F[R]) =>
// Shared buffer
val charBuffer = CharBuffer.allocate(DefaultChunkSize)
def readToBytes(r: Reader): F[Option[Chunk[Byte]]] =
for {
// Read into the buffer
readChars <- blocker.delay(r.read(charBuffer))
readChars <- F.blocking(r.read(charBuffer))
} yield {
// Flip to read
charBuffer.flip()
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/http4s/HttpDate.scala
Expand Up @@ -12,7 +12,6 @@ import org.http4s.util.{Renderable, Writer}
import cats.Functor
import cats.implicits._
import cats.effect.Clock
import scala.concurrent.duration.SECONDS

/** An HTTP-date value represents time as an instance of Coordinated Universal
* Time (UTC). It expresses time at a resolution of one second. By using it
Expand Down Expand Up @@ -70,7 +69,7 @@ object HttpDate {
* problem for future generations.
*/
def current[F[_]: Functor: Clock]: F[HttpDate] =
Clock[F].realTime(SECONDS).map(unsafeFromEpochSecond)
Clock[F].realTime.map(v => unsafeFromEpochSecond(v.toSeconds))

/** The `HttpDate` equal to `Thu, Jan 01 1970 00:00:00 GMT` */
val Epoch: HttpDate =
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/http4s/Message.scala
Expand Up @@ -9,7 +9,7 @@ package org.http4s
import cats.{Applicative, Functor, Monad, ~>}
import cats.data.NonEmptyList
import cats.implicits._
import cats.effect.IO
import cats.effect.SyncIO
import fs2.{Pure, Stream}
import fs2.text.utf8Encode
import _root_.io.chrisdavenport.vault._
Expand Down Expand Up @@ -186,7 +186,7 @@ sealed trait Message[F[_]] extends Media[F] { self =>
object Message {
private[http4s] val logger = getLogger
object Keys {
private[this] val trailerHeaders: Key[Any] = Key.newKey[IO, Any].unsafeRunSync()
private[this] val trailerHeaders: Key[Any] = Key.newKey[SyncIO, Any].unsafeRunSync()
def TrailerHeaders[F[_]]: Key[F[Headers]] = trailerHeaders.asInstanceOf[Key[F[Headers]]]
}
}
Expand Down Expand Up @@ -495,10 +495,10 @@ object Request {
final case class Connection(local: InetSocketAddress, remote: InetSocketAddress, secure: Boolean)

object Keys {
val PathInfoCaret: Key[Int] = Key.newKey[IO, Int].unsafeRunSync()
val PathTranslated: Key[File] = Key.newKey[IO, File].unsafeRunSync()
val ConnectionInfo: Key[Connection] = Key.newKey[IO, Connection].unsafeRunSync()
val ServerSoftware: Key[ServerSoftware] = Key.newKey[IO, ServerSoftware].unsafeRunSync()
val PathInfoCaret: Key[Int] = Key.newKey[SyncIO, Int].unsafeRunSync()
val PathTranslated: Key[File] = Key.newKey[SyncIO, File].unsafeRunSync()
val ConnectionInfo: Key[Connection] = Key.newKey[SyncIO, Connection].unsafeRunSync()
val ServerSoftware: Key[ServerSoftware] = Key.newKey[SyncIO, ServerSoftware].unsafeRunSync()
}
}

Expand Down