Skip to content

Upgrade core to cats-effect-3 #3784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 28, 2020
Merged

Upgrade core to cats-effect-3 #3784

merged 11 commits into from
Oct 28, 2020

Conversation

domaspoliakas
Copy link
Contributor

Hello.

Took me a little bit of time to get my bearings with regards to figuring out the right artifacts to import and such. For now this is a pretty simple set of changes just to make everything compile.

  • I got rid of AsyncSyntax; I don't think it made any sense to keep it.
  • Removed all mention of Blocker and ContextShift (naturally)
  • There's a couple of instances where unsafeRunSync was done where I used the global IORuntime. I'm not sure if that's the right thing to do. If that could be avoided core could probably import just kernel and std.
  • I don't know if I used the dispatcher correctly. That is, I added Dispatcher as an implicit in a couple of places, which I'm concerned about since it might have a knock-on effect and I don't know if that's desired.

I was primarily focused on getting this to compile and trying to understand how to migrate without changes, but now that it compiles I think that some places where Sync is used could be loosened due to the presence of Files.

Anyway, let me know how I could improve this. I will try to run tests tomorrow.

def textFile[F[_]](file: File, blocker: Blocker)(implicit
F: Sync[F],
cs: ContextShift[F]): EntityDecoder[F, File] =
def textFile[F[_]: Files](file: File)(implicit F: Sync[F]): EntityDecoder[F, File] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Sync here? Also if Sync is unavoidable, you can remove Files from there

Copy link
Contributor Author

@domaspoliakas domaspoliakas Oct 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a single call that needs to be suspended downstream. I'll try to see if I can eliminate that.
Nevermind, I was thinking of a different place. This can be changed to Concurrent.

@@ -186,6 +186,7 @@ sealed trait Message[F[_]] extends Media[F] { self =>
object Message {
private[http4s] val logger = getLogger
object Keys {
import cats.effect.unsafe.implicits.global
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can avoid this by using SyncIO for this

@@ -495,6 +496,7 @@ object Request {
final case class Connection(local: InetSocketAddress, remote: InetSocketAddress, secure: Boolean)

object Keys {
import cats.effect.unsafe.implicits.global
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@@ -25,15 +26,13 @@ object StaticFile {

val DefaultBufferSize = 10240

def fromString[F[_]: Sync: ContextShift](
def fromString[F[_]: Files: Sync](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't want Files: Sync. It it works with just Files plus non effect capturing classes (e.g. Monad or Concurrent), then great, if not Sync is enough.

@@ -116,37 +111,31 @@ object StaticFile {
Sync[F].delay(
if (f.isFile) s"${f.lastModified().toHexString}-${f.length().toHexString}" else "")

def fromFile[F[_]: Sync: ContextShift](
def fromFile[F[_]: Files: Sync](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

.handleErrorWith { err =>
logger.error(err)("Caught error during file cleanup for multipart")
//Swallow and report io exceptions in case
F.unit
}

private[this] def tailrecPartsFileStream[F[_]: Sync: ContextShift](
private[this] def tailrecPartsFileStream[F[_]: Sync: Files](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

case None => Part(hdrs, body)
}

/** Split the stream on `values`, but when
*/
private def splitWithFileStream[F[_]](
private def splitWithFileStream[F[_]: Files](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@@ -7,10 +7,10 @@
package org.http4s
package multipart

import cats.effect.{Blocker, ContextShift, Sync}
import cats.effect.kernel.Sync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

blocker: Blocker,
headers: Header*): Part[F] =
fileData(name, file.getName, readAll[F](file.toPath, blocker, ChunkSize), headers: _*)
def fileData[F[_]: Sync: Files](name: String, file: File, headers: Header*): Part[F] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@@ -9,7 +9,7 @@ package syntax

import cats.{Functor, ~>}
import cats.syntax.functor._
import cats.effect.Sync
import cats.effect.kernel.Sync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

@SystemFw
Copy link
Member

SystemFw commented Oct 22, 2020

Left a lot of comments but they are all instances of the same 4-5 things. @djspiewak two interesting points btw:

  • do we want to encourage importing from kernel?
  • look at Dispatcher, there are a couple of "mini-ends-of-the-world" cases here

@djspiewak
Copy link
Contributor

IMO if you have core, import from it. Kernel imports are for when you don't depend on core.

@djspiewak
Copy link
Contributor

I'll look at the dispatcher call sites when I get back to my laptop

private[http4s] def unsafeRunAsync[F[_]: Async, A](fa: F[A])(
f: Either[Throwable, A] => F[Unit])(implicit
dispatcher: Dispatcher[F],
ec: ExecutionContext): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you actually need to thread the ExecutionContext through anymore? Cats Effect contains its own EC, so in theory it shouldn't need to appear anywhere in the signatures.

F: ConcurrentEffect[F]): Unit =
F.runAsync(F.start(F.delay(f)).flatMap(_.join))(loggingAsyncCallback(logger)).unsafeRunSync()
private[http4s] def invokeCallback[F[_]](logger: Logger)(
f: => Unit)(implicit F: Async[F], dispatcher: Dispatcher[F]): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't generally recommend making the Dispatcher implicit. You can, but it's a bit weird given that it needs to live within a Resource.

@deprecated(
"Replaced by cats.effect.Async.fromFuture. You will need a ContextShift[F].",
"0.21.4")
@deprecated("Replaced by cats.effect.Async.fromFuture", "0.21.4")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this should just be removed entirely since this has to be a bincompat breaking release anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like to err on the side of keeping deprecations for graceful migrations, but this is private, so we can be more aggressive here.

})
F.delay { cf.cancel(true); () }
})) >>
F.delay(Some(F.delay(cf.cancel(true)).void))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want F.pure for the outer call here.

Copy link
Member

@rossabaker rossabaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is an excellent start.

As you address the reviews (thanks, @SystemFw, @RaasAhsan, and @djspiewak. We're in good hands here!), please just add commits instead of force-pushing. This will make it easier for everyone to review the incremental changes as this progresses.

f: Either[Throwable, A] => F[Unit])(implicit
dispatcher: Dispatcher[F],
ec: ExecutionContext): Unit =
dispatcher.unsafeRunSync(fa.evalOn(ec).attemptTap(f).void)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the usages are in blaze. We could punt it to blaze-core right now, and see if we can eliminate entirely when we get to that backend.

@deprecated(
"Replaced by cats.effect.Async.fromFuture. You will need a ContextShift[F].",
"0.21.4")
@deprecated("Replaced by cats.effect.Async.fromFuture", "0.21.4")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like to err on the side of keeping deprecations for graceful migrations, but this is private, so we can be more aggressive here.

@djspiewak
Copy link
Contributor

Do you actually need to thread the ExecutionContext through anymore? Cats Effect contains its own EC, so in theory it shouldn't need to appear anywhere in the signatures.

All the usages are in blaze. We could punt it to blaze-core right now, and see if we can eliminate entirely when we get to that backend.

Oh is this the worker pool for blaze? Or is it blaze's Future-shifting pool? If it's the latter, then Async[F].executionContext should give you what you need (an F[ExecutionContext] that produces the compute pool). If it's the former, then continuing to thread it through makes sense, though it does result in some unfortunate API shenanigans higher up in the stack (my fault, I know!).

@domaspoliakas
Copy link
Contributor Author

I think I resolved most of the feedback now.

  • Farewell kernel imports
  • : Sync: Files eliminated; it is now either just : Files or : Files: Concurrent
  • global IORuntime imports removed in favour of SyncIO
  • couple of misc things

The things that are incomplete right now:

  • the unsafeRunAsync that @rossabaker mentioned should go into blaze core
  • One section on multipart parser (line 756). I'll leave a comment outlining my confusion there.

} yield split)
.handleErrorWith(e => Pull.eval(cleanupFile(path)) >> Pull.raiseError[F](e))
}
???
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My confusion is as thus: it seems a temp file is created but not always cleaned up. That does not sound right to me, so I tried to read through it to get a complete picture but I have to admit - I don't quite understand what's supposed to be happening here so I am left a little confused. Any help would be greatly appreciated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I might be able to work it through. I'll give it a shot now.

split <- streamAndWrite(s, state, Stream.empty, racc, 0, path)
} yield split)
.handleErrorWith(e => Pull.eval(cleanupFile(path)) >> Pull.raiseError[F](e))
.eval(Files[F].tempFile(???, "", "").allocated)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone could check if this logic correctly replaces previous logic that'd be very good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ??? is the system property "java.io.tmpdir". The underlying Java API supports null there, and I don't know what else you'd do if that property is None, but I don't know what fs2 would do with that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic is right. .allocate is always a smell. There's not a stream finalizer on the happy path to clean up the resource, which makes me think this function should return a Resource, which would bubble up and force the caller to use the temp files before they are disposed. I don't know what ramifications that would have, but we could track that as a separate issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked java.io.File temp file creation allows nulls, but the nio temp file creation requires the dir property to be non-null and forces you to use the method which has no dir property for the default directory. So one way or the other the fs2 interface has to change unless we introduce a Sync requirement here again, but that'd be rather unfortunate. I'll see if I can get a change merged to fs2 to allow this.

Copy link
Member

@rossabaker rossabaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great progress.

f: Either[Throwable, A] => F[Unit])(implicit
dispatcher: Dispatcher[F],
ec: ExecutionContext): Unit =
dispatcher.unsafeRunSync(fa.evalOn(ec).attemptTap(f).void)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move it to blaze-core, under its natural package. That will help get core clean so other ports can commence, and give me more time to relearn and answer to @djspiewak's question. 😄

split <- streamAndWrite(s, state, Stream.empty, racc, 0, path)
} yield split)
.handleErrorWith(e => Pull.eval(cleanupFile(path)) >> Pull.raiseError[F](e))
.eval(Files[F].tempFile(???, "", "").allocated)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ??? is the system property "java.io.tmpdir". The underlying Java API supports null there, and I don't know what else you'd do if that property is None, but I don't know what fs2 would do with that.

split <- streamAndWrite(s, state, Stream.empty, racc, 0, path)
} yield split)
.handleErrorWith(e => Pull.eval(cleanupFile(path)) >> Pull.raiseError[F](e))
.eval(Files[F].tempFile(???, "", "").allocated)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic is right. .allocate is always a smell. There's not a stream finalizer on the happy path to clean up the resource, which makes me think this function should return a Resource, which would bubble up and force the caller to use the temp files before they are disposed. I don't know what ramifications that would have, but we could track that as a separate issue.

@domaspoliakas
Copy link
Contributor Author

core now compiles; I will proceed to fixing tests

Copy link
Member

@rossabaker rossabaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic work. I'm happy going module-by-module on the PRs to keep them tractable and let others jump in.

val catsEffectTesting = "0.4.1"
val circe = "0.13.0"
val cryptobits = "1.3"
val disciplineSpecs2 = "1.1.0"
val dropwizardMetrics = "4.1.13"
val fs2 = "3.0-5158029"
val fs2 = "3.0-cd73a32"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, one question: Does this have anything that -M2 doesn't?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the tempFile and tempDirectory changes which unblock this PR, and were added today

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@domaspoliakas
Copy link
Contributor Author

Great! I presume I need to "undraft" this PR now, so I'll do that and then make a new branch for test upgrade.

@domaspoliakas domaspoliakas marked this pull request as ready for review October 27, 2020 14:23
@rossabaker rossabaker changed the title [WIP] Core upgrade Upgrade core to cats-effect-3 Oct 27, 2020
// have an ExecutionContext but not a Timer.
private[http4s] def unsafeRunAsync[F[_], A](fa: F[A])(
f: Either[Throwable, A] => IO[Unit])(implicit F: Effect[F], ec: ExecutionContext): Unit =
F.runAsync(Async.shift(ec) *> fa)(f).unsafeRunSync()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really keen to get to blaze and destroy this

.handleError(_ => ())
.as(Some(Response(NotModified)))
}
})
}

def calcETag[F[_]: Sync]: File => F[String] =
def calcETag[F[_]: Files: MonadError[*[_], Throwable]]: File => F[String] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Functor here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must've mistakenly replaced it along with some other Functors with MonadError while testing the waters. I'll get it changed to Functor.

@rossabaker rossabaker merged commit ffc0444 into http4s:cats-effect-3 Oct 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants