Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cats-effect-3.0.0-RC2 and friends #4471

Merged
merged 18 commits into from Feb 25, 2021
Merged

cats-effect-3.0.0-RC2 and friends #4471

merged 18 commits into from Feb 25, 2021

Conversation

rossabaker
Copy link
Member

@rossabaker rossabaker commented Feb 18, 2021

This is the new #4414.

Makes breaking changes to the ember interface to better support ip4s, which is probably something we should do on the other backends. But it only broke ember, so that goes first.

Copy link
Contributor

@yanns yanns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not an expert, but changes are looking right to me

@rossabaker
Copy link
Member Author

There'll be a couple more changes to get Ember compiling.

@yanns
Copy link
Contributor

yanns commented Feb 18, 2021

Could you reformat the sources so that is does not fail at this point?
Do you need help?

@rossabaker
Copy link
Member Author

I'm trying to get ember-server done, and then I think it's ready. I might push a WIP, as I've already exceeded my timebox.

@rossabaker
Copy link
Member Author

@yanns I think it's down to two errors in ember-server, and if you've got time, help would be splendid!

@rossabaker
Copy link
Member Author

EmberServerSuite is no longer releasing the port it binds. This is the failing test.

@@ -51,10 +51,9 @@ private[server] object ServerHelpers {
Response(Status.InternalServerError).putHeaders(org.http4s.headers.`Content-Length`.zero)

def server[F[_]](
bindAddress: InetSocketAddress,
serverResource: Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])],
Copy link
Member

@RaasAhsan RaasAhsan Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did fs2 change the signature here ? IIRC it was Stream[F, Resource[F, Socket[F]]] (on the RHS of the tuple)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. @yanns called that out on his PR merged into this. I think his misgivings were right, but I'm flying around too fast working on other dependencies. I think our problems start here, though.

Copy link
Member

@RaasAhsan RaasAhsan Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure if there is a relationship between this and the server not shutting down yet, but this signature isn't going to work for graceful shutdowns. By allocating the sockets on the server stream, there's no way we can interrupt that stream on shutdown while letting handler streams proceed for some more time.

Copy link
Member

@RaasAhsan RaasAhsan Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I feel like these signatures might be bad news:

def server(
        address: Option[Host],
        port: Option[Port],
        options: List[SocketOption]
    ): Stream[F, Socket[F]]

def serverResource(
        address: Option[Host],
        port: Option[Port],
        options: List[SocketOption]
    ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])]

Think about it: the server is a Stream loop that keeps on allocating a new Socket resource and moves onto accepting the next one. It's going to cause a memory leak if it runs long enough because the socket finalizers are piling up! Kind of in a similar vein to foreverM on a Resource with a finalizer. Am I utterly wrong about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, maybe the finalizers won't stack up like that: https://github.com/typelevel/fs2/blob/main/io/src/main/scala/fs2/io/net/SocketGroup.scala#L181

The ++ before the next accept means that the current scope introduced by Stream.resource is cleaned up

@@ -232,7 +228,7 @@ private[server] object ServerHelpers {
resp.headers.get(Connection).exists(_.hasClose)
)
}
.drain ++ Stream.eval(socket.close).drain
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually get rid of this on series/0.21 and series/0.22 as well? The socket resource is lifted into the stream on a new scope, so it should be closed at the end here anyways. Maybe we just wanted to be explicit here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're correct. I only noticed it here because it disappeared in fs2-3.

@@ -190,7 +186,7 @@ private[server] object ServerHelpers {
onWriteFailure: (Option[Request[F]], Response[F], Throwable) => F[Unit]
): Stream[F, Nothing] = {
val _ = logger
val read: F[Option[Chunk[Byte]]] = socket.read(receiveBufferSize, durationToFinite(idleTimeout))
val read: F[Option[Chunk[Byte]]] = timeoutMaybe(socket.read(receiveBufferSize), idleTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the impact of not using the underlying async timeout mechanism is here. seems like fs2 doesn't expose that anymore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. It might change the failure mode?

If a timeout is specified and the timeout elapses before the operation completes then the operation completes with the exception InterruptedByTimeoutException. Where a timeout occurs, and the implementation cannot guarantee that bytes have not been read, or will not be read from the channel into the given buffer, then further attempts to read from the channel will cause an unspecific runtime exception to be thrown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we close while something is pending:

Any outstanding asynchronous operations upon this channel will complete with the exception AsynchronousCloseException. After a channel is closed, further attempts to initiate asynchronous I/O operations complete immediately with cause ClosedChannelException.

I think if we cancel a read, and then perform another, we'd get a ReadPendingException because nothing canceled the underlying channel operation? I'm just speculating.

@rossabaker rossabaker mentioned this pull request Feb 21, 2021
@m-sp m-sp mentioned this pull request Feb 21, 2021
@amesgen
Copy link
Member

amesgen commented Feb 25, 2021

Is this now blocked on typelevel/fs2#2289?

@RaasAhsan
Copy link
Member

RaasAhsan commented Feb 25, 2021

It seems like we've got traction on converging to a solution to get the desired behavior on FS2-3, so I think I feel more comfortable merging in regressed behavior for now. I've got a branch that reverts part of graceful shutdown and some other fixes, so I'll PR it here in a bit

@RaasAhsan
Copy link
Member

#4515

@rossabaker
Copy link
Member Author

Oh, looks like I didn't need to merge #4515. I'll lop that off so it's not all in here twice, and we'll merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

None yet

5 participants