-
Notifications
You must be signed in to change notification settings - Fork 435
Add server connection state machine #1760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,365 @@ | ||
| /* | ||
| * Copyright 2024, gRPC Authors All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import NIOCore | ||
| import NIOHTTP2 | ||
|
|
||
| extension ServerConnectionHandler { | ||
| /// Tracks the state of TCP connections at the server. | ||
| /// | ||
| /// The state machine manages the state for the graceful shutdown procedure as well as policing | ||
| /// client-side keep alive. | ||
| struct StateMachine { | ||
| /// Current state. | ||
| private var state: State | ||
|
|
||
| /// Opaque data sent to the client in a PING frame after emitting the first GOAWAY frame | ||
| /// as part of graceful shutdown. | ||
| private let goAwayPingData: HTTP2PingData | ||
|
|
||
| /// Create a new state machine. | ||
| /// | ||
| /// - Parameters: | ||
| /// - allowKeepAliveWithoutCalls: Whether the client is permitted to send keep alive pings | ||
| /// when there are no active calls. | ||
| /// - minPingReceiveIntervalWithoutCalls: The minimum time interval required between keep | ||
| /// alive pings when there are no active calls. | ||
| /// - goAwayPingData: Opaque data sent to the client in a PING frame when the server | ||
| /// initiates graceful shutdown. | ||
| init( | ||
| allowKeepAliveWithoutCalls: Bool, | ||
| minPingReceiveIntervalWithoutCalls: TimeAmount, | ||
| goAwayPingData: HTTP2PingData = HTTP2PingData(withInteger: .random(in: .min ... .max)) | ||
| ) { | ||
| let keepAlive = KeepAlive( | ||
| allowWithoutCalls: allowKeepAliveWithoutCalls, | ||
| minPingReceiveIntervalWithoutCalls: minPingReceiveIntervalWithoutCalls | ||
| ) | ||
|
|
||
| self.state = .active(State.Active(keepAlive: keepAlive)) | ||
| self.goAwayPingData = goAwayPingData | ||
| } | ||
|
|
||
| /// Record that the stream with the given ID has been opened. | ||
| mutating func streamOpened(_ id: HTTP2StreamID) { | ||
| switch self.state { | ||
| case .active(var state): | ||
| state.lastStreamID = id | ||
| let (inserted, _) = state.openStreams.insert(id) | ||
| assert(inserted, "Can't open stream \(Int(id)), it's already open") | ||
| self.state = .active(state) | ||
|
|
||
| case .closing(var state): | ||
| state.lastStreamID = id | ||
| let (inserted, _) = state.openStreams.insert(id) | ||
| assert(inserted, "Can't open stream \(Int(id)), it's already open") | ||
| self.state = .closing(state) | ||
|
|
||
| case .closed: | ||
| () | ||
| } | ||
| } | ||
|
|
||
| enum OnStreamClosed: Equatable { | ||
| /// Start the idle timer, after which the connection should be closed gracefully. | ||
| case startIdleTimer | ||
| /// Close the connection. | ||
| case close | ||
| /// Do nothing. | ||
| case none | ||
| } | ||
|
|
||
| /// Record that the stream with the given ID has been closed. | ||
| mutating func streamClosed(_ id: HTTP2StreamID) -> OnStreamClosed { | ||
| let onStreamClosed: OnStreamClosed | ||
|
|
||
| switch self.state { | ||
| case .active(var state): | ||
| let removedID = state.openStreams.remove(id) | ||
| assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open") | ||
| onStreamClosed = state.openStreams.isEmpty ? .startIdleTimer : .none | ||
| self.state = .active(state) | ||
|
|
||
| case .closing(var state): | ||
| let removedID = state.openStreams.remove(id) | ||
| assert(removedID != nil, "Can't close stream \(Int(id)), it wasn't open") | ||
| // If the second GOAWAY hasn't been sent it isn't safe to close if there are no open | ||
| // streams: the client may have opened a stream which the server doesn't know about yet. | ||
| let canClose = state.sentSecondGoAway && state.openStreams.isEmpty | ||
| onStreamClosed = canClose ? .close : .none | ||
| self.state = .closing(state) | ||
|
|
||
| case .closed: | ||
| onStreamClosed = .none | ||
| } | ||
|
|
||
| return onStreamClosed | ||
| } | ||
|
|
||
| enum OnPing: Equatable { | ||
| /// Send a GOAWAY frame with the code "enhance your calm" and immediately close the connection. | ||
| case enhanceYourCalmThenClose(HTTP2StreamID) | ||
| /// Acknowledge the ping. | ||
| case sendAck | ||
| /// Ignore the ping. | ||
| case none | ||
| } | ||
|
|
||
| /// Received a ping with the given data. | ||
| /// | ||
| /// - Parameters: | ||
| /// - time: The time at which the ping was received. | ||
| /// - data: The data sent with the ping. | ||
| mutating func receivedPing(atTime time: NIODeadline, data: HTTP2PingData) -> OnPing { | ||
| let onPing: OnPing | ||
|
|
||
| switch self.state { | ||
| case .active(var state): | ||
| let tooManyPings = state.keepAlive.receivedPing( | ||
| atTime: time, | ||
| hasOpenStreams: !state.openStreams.isEmpty | ||
| ) | ||
|
|
||
| if tooManyPings { | ||
| onPing = .enhanceYourCalmThenClose(state.lastStreamID) | ||
| self.state = .closed | ||
| } else { | ||
| onPing = .sendAck | ||
| self.state = .active(state) | ||
| } | ||
|
|
||
| case .closing(var state): | ||
| let tooManyPings = state.keepAlive.receivedPing( | ||
| atTime: time, | ||
| hasOpenStreams: !state.openStreams.isEmpty | ||
| ) | ||
|
|
||
| if tooManyPings { | ||
| onPing = .enhanceYourCalmThenClose(state.lastStreamID) | ||
| self.state = .closed | ||
| } else { | ||
| onPing = .sendAck | ||
| self.state = .closing(state) | ||
| } | ||
|
|
||
| case .closed: | ||
| onPing = .none | ||
| } | ||
|
|
||
| return onPing | ||
| } | ||
|
|
||
| enum OnPingAck: Equatable { | ||
| /// Send a GOAWAY frame with no error and the given last stream ID, optionally closing the | ||
| /// connection immediately afterwards. | ||
| case sendGoAway(lastStreamID: HTTP2StreamID, close: Bool) | ||
| /// Ignore the ack. | ||
| case none | ||
| } | ||
|
|
||
| /// Received a PING frame with the 'ack' flag set. | ||
| mutating func receivedPingAck(data: HTTP2PingData) -> OnPingAck { | ||
| let onPingAck: OnPingAck | ||
|
|
||
| switch self.state { | ||
| case .closing(var state): | ||
| // If only one GOAWAY has been sent and the data matches the data from the GOAWAY ping then | ||
| // the server should send another GOAWAY ratcheting down the last stream ID. If no streams | ||
| // are open then the server can close the connection immediately after, otherwise it must | ||
| // wait until all streams are closed. | ||
| if !state.sentSecondGoAway, data == self.goAwayPingData { | ||
| state.sentSecondGoAway = true | ||
|
|
||
| if state.openStreams.isEmpty { | ||
| self.state = .closed | ||
| onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: true) | ||
| } else { | ||
| self.state = .closing(state) | ||
| onPingAck = .sendGoAway(lastStreamID: state.lastStreamID, close: false) | ||
| } | ||
| } else { | ||
| onPingAck = .none | ||
| } | ||
|
|
||
| case .active, .closed: | ||
| onPingAck = .none | ||
| } | ||
|
|
||
| return onPingAck | ||
| } | ||
|
|
||
| enum OnStartGracefulShutdown: Equatable { | ||
| /// Initiate graceful shutdown by sending a GOAWAY frame with the last stream ID set as the max | ||
| /// stream ID and no error. Follow it immediately with a PING frame with the given data. | ||
| case sendGoAwayAndPing(HTTP2PingData) | ||
| /// Ignore the request to start graceful shutdown. | ||
| case none | ||
| } | ||
|
|
||
| /// Request that the connection begins graceful shutdown. | ||
| mutating func startGracefulShutdown() -> OnStartGracefulShutdown { | ||
| let onStartGracefulShutdown: OnStartGracefulShutdown | ||
|
|
||
| switch self.state { | ||
| case .active(let state): | ||
| self.state = .closing(State.Closing(from: state)) | ||
| onStartGracefulShutdown = .sendGoAwayAndPing(self.goAwayPingData) | ||
|
|
||
| case .closing, .closed: | ||
| onStartGracefulShutdown = .none | ||
| } | ||
|
|
||
| return onStartGracefulShutdown | ||
| } | ||
|
|
||
| /// Reset the state of keep-alive policing. | ||
| mutating func resetKeepAliveState() { | ||
| switch self.state { | ||
| case .active(var state): | ||
| state.keepAlive.reset() | ||
| self.state = .active(state) | ||
|
|
||
| case .closing(var state): | ||
| state.keepAlive.reset() | ||
| self.state = .closing(state) | ||
|
|
||
| case .closed: | ||
| () | ||
| } | ||
| } | ||
|
|
||
| /// Marks the state as closed. | ||
| mutating func markClosed() { | ||
| self.state = .closed | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension ServerConnectionHandler.StateMachine { | ||
| fileprivate struct KeepAlive { | ||
| /// Allow the client to send keep alive pings when there are no active calls. | ||
| private let allowWithoutCalls: Bool | ||
|
|
||
| /// The minimum time interval which pings may be received at when there are no active calls. | ||
| private let minPingReceiveIntervalWithoutCalls: TimeAmount | ||
|
|
||
| /// The maximum number of "bad" pings sent by the client the server tolerates before closing | ||
| /// the connection. | ||
| private let maxPingStrikes: Int | ||
|
|
||
| /// The number of "bad" pings sent by the client. This can be reset when the server sends | ||
| /// DATA or HEADERS frames. | ||
| /// | ||
| /// Ping strikes account for pings being occasionally being used for purposes other than keep | ||
| /// alive (a low number of strikes is therefore expected and okay). | ||
| private var pingStrikes: Int | ||
|
|
||
| /// The last time a valid ping happened. This may be in the distant past if there is no such | ||
| /// time (for example the connection is new and there are no active calls). | ||
| /// | ||
| /// Note: `distantPast` isn't used to indicate no previous valid ping as `NIODeadline` uses | ||
| /// the monotonic clock on Linux which uses an undefined starting point and in some cases isn't | ||
| /// always that distant. | ||
| private var lastValidPingTime: NIODeadline? | ||
|
|
||
| init(allowWithoutCalls: Bool, minPingReceiveIntervalWithoutCalls: TimeAmount) { | ||
| self.allowWithoutCalls = allowWithoutCalls | ||
| self.minPingReceiveIntervalWithoutCalls = minPingReceiveIntervalWithoutCalls | ||
| self.maxPingStrikes = 2 | ||
| self.pingStrikes = 0 | ||
| self.lastValidPingTime = nil | ||
| } | ||
|
|
||
| /// Reset ping strikes and the time of the last valid ping. | ||
| mutating func reset() { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're never calling this, which makes me think we haven't yet implemented the ping strike reset logic when receiving
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. This will have to be done when the connection handler is done as it requires input from the stream channels as well (which is a bit annoying...). There's effectively nothing to do in the state machine beyond calling reset though.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add a test that the reset actually works though as that's missing right now. |
||
| self.lastValidPingTime = nil | ||
| self.pingStrikes = 0 | ||
| } | ||
|
|
||
| /// Returns whether the client has sent too many pings. | ||
| mutating func receivedPing(atTime time: NIODeadline, hasOpenStreams: Bool) -> Bool { | ||
| let interval: TimeAmount | ||
|
|
||
| if hasOpenStreams || self.allowWithoutCalls { | ||
| interval = self.minPingReceiveIntervalWithoutCalls | ||
| } else { | ||
| // If there are no open streams and keep alive pings aren't allowed without calls then | ||
| // use an interval of two hours. | ||
| // | ||
| // This comes from gRFC A8: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md | ||
| interval = .hours(2) | ||
| } | ||
|
|
||
| // If there's no last ping time then the first is acceptable. | ||
| let isAcceptablePing = self.lastValidPingTime.map { $0 + interval <= time } ?? true | ||
| let tooManyPings: Bool | ||
|
|
||
| if isAcceptablePing { | ||
| self.lastValidPingTime = time | ||
| tooManyPings = false | ||
|
Comment on lines
+310
to
+312
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume the bad pings don't have to be consecutive, right? Otherwise, we should reset the ping strikes to 0.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's my understanding from the various gRFCs. |
||
| } else { | ||
| self.pingStrikes += 1 | ||
| tooManyPings = self.pingStrikes > self.maxPingStrikes | ||
| } | ||
|
|
||
| return tooManyPings | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension ServerConnectionHandler.StateMachine { | ||
| fileprivate enum State { | ||
| /// The connection is active. | ||
| struct Active { | ||
| /// The number of open streams. | ||
| var openStreams: Set<HTTP2StreamID> | ||
| /// The ID of the most recently opened stream (zero indicates no streams have been opened yet). | ||
| var lastStreamID: HTTP2StreamID | ||
| /// The state of keep alive. | ||
| var keepAlive: KeepAlive | ||
|
|
||
| init(keepAlive: KeepAlive) { | ||
| self.openStreams = [] | ||
| self.lastStreamID = .rootStream | ||
| self.keepAlive = keepAlive | ||
| } | ||
| } | ||
|
|
||
| /// The connection is closing gracefully, an initial GOAWAY frame has been sent (with the | ||
| /// last stream ID set to max). | ||
| struct Closing { | ||
| /// The number of open streams. | ||
| var openStreams: Set<HTTP2StreamID> | ||
| /// The ID of the most recently opened stream (zero indicates no streams have been opened yet). | ||
| var lastStreamID: HTTP2StreamID | ||
| /// The state of keep alive. | ||
| var keepAlive: KeepAlive | ||
| /// Whether the second GOAWAY frame has been sent with a lower stream ID. | ||
| var sentSecondGoAway: Bool | ||
|
|
||
| init(from state: Active) { | ||
| self.openStreams = state.openStreams | ||
| self.lastStreamID = state.lastStreamID | ||
| self.keepAlive = state.keepAlive | ||
| self.sentSecondGoAway = false | ||
| } | ||
| } | ||
|
|
||
| case active(Active) | ||
| case closing(Closing) | ||
| case closed | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| /* | ||
| * Copyright 2024, gRPC Authors All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| // Temporary namespace. Will be replaced with a channel handler. | ||
| enum ServerConnectionHandler { | ||
| } |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two cases are pretty similar, except for the state that is set in the else branch. Could the duplication be avoided or do we want to see clearly the steps for each case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's somewhat unavoidable here as the associated state types are different. IMO deduplicating here would also result in less readable code.