Skip to content

Commit

Permalink
fix: Fix implicitly unwrapped subscriptionItem (#35)
Browse files Browse the repository at this point in the history
* fix: Fix data race in staleConnectionTimeout
* chore: Enable ThreadSanitizer for tests
* chore: Fix data race in Timer tests
* chore: Update Pod dependencies
  • Loading branch information
palpatim committed Nov 4, 2020
1 parent 89a0e5e commit 860152e
Show file tree
Hide file tree
Showing 17 changed files with 494 additions and 358 deletions.
6 changes: 5 additions & 1 deletion AppSyncRealTimeClient.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
FA91B20A24D306430017404D /* LaunchScreen.storyboard in Resources */ = {isa = PBXBuildFile; fileRef = FA91B20824D306430017404D /* LaunchScreen.storyboard */; };
FA91B20F24D306550017404D /* amplifyconfiguration.json in Resources */ = {isa = PBXBuildFile; fileRef = 21D38B4B2409B6C000EC2A8D /* amplifyconfiguration.json */; };
FA91B21024D306730017404D /* AppSyncRTCProvider.swift in Sources */ = {isa = PBXBuildFile; fileRef = FAB5AA1D24D1CD31001F370F /* AppSyncRTCProvider.swift */; };
FAAEF946255229DA009DA4D5 /* AtomicValue.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA2EFABB2550CAC6007698C7 /* AtomicValue.swift */; };
FAB5AA1F24D1CD84001F370F /* amplifyconfiguration.json in Resources */ = {isa = PBXBuildFile; fileRef = 21D38B4B2409B6C000EC2A8D /* amplifyconfiguration.json */; };
FAB7E91224D2644E00DF1EA1 /* RealtimeConnectionProvider+StaleConnection.swift in Sources */ = {isa = PBXBuildFile; fileRef = 217F39B12406E98300F1A0B3 /* RealtimeConnectionProvider+StaleConnection.swift */; };
/* End PBXBuildFile section */
Expand Down Expand Up @@ -190,6 +191,7 @@
BBF14AA5E9A4D036CA87B952 /* Pods-AppSyncRTCSample.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AppSyncRTCSample.debug.xcconfig"; path = "Target Support Files/Pods-AppSyncRTCSample/Pods-AppSyncRTCSample.debug.xcconfig"; sourceTree = "<group>"; };
C73EA796F50FB0196FDF4865 /* Pods-AppSyncRealTimeClient.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AppSyncRealTimeClient.release.xcconfig"; path = "Target Support Files/Pods-AppSyncRealTimeClient/Pods-AppSyncRealTimeClient.release.xcconfig"; sourceTree = "<group>"; };
E65A19C826699DA299A49284 /* Pods-HostApp-AppSyncRealTimeClientIntegrationTests.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-HostApp-AppSyncRealTimeClientIntegrationTests.release.xcconfig"; path = "Target Support Files/Pods-HostApp-AppSyncRealTimeClientIntegrationTests/Pods-HostApp-AppSyncRealTimeClientIntegrationTests.release.xcconfig"; sourceTree = "<group>"; };
FA2EFABB2550CAC6007698C7 /* AtomicValue.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AtomicValue.swift; sourceTree = "<group>"; };
FA67507724D3244A005A1345 /* MockWebsocketProvider.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockWebsocketProvider.swift; sourceTree = "<group>"; };
FA67507A24D338C6005A1345 /* Error+Extension.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Error+Extension.swift"; sourceTree = "<group>"; };
FA67507C24D338FA005A1345 /* RealtimeConnectionProviderTestBase.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RealtimeConnectionProviderTestBase.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -381,6 +383,7 @@
21D38B6C240A262800EC2A8D /* AppSyncJSONHelper.swift */,
217F39C92406E98400F1A0B3 /* AppSyncJSONValue.swift */,
217F39C72406E98400F1A0B3 /* AppSyncLogger.swift */,
FA2EFABB2550CAC6007698C7 /* AtomicValue.swift */,
FA67508124D33A7A005A1345 /* CountdownTimer.swift */,
21D38B93240C4A2A00EC2A8D /* OIDCAuthProvider.swift */,
217F39CB2406E98400F1A0B3 /* SubscriptionConnectionType.swift */,
Expand Down Expand Up @@ -419,8 +422,8 @@
217F39EE2406EA4000F1A0B3 /* Support */ = {
isa = PBXGroup;
children = (
217F39EF2406EA4000F1A0B3 /* RealtimeGatewayURLInterceptorTests.swift */,
FA67508324D33ACC005A1345 /* CountdownTimerTests.swift */,
217F39EF2406EA4000F1A0B3 /* RealtimeGatewayURLInterceptorTests.swift */,
);
path = Support;
sourceTree = "<group>";
Expand Down Expand Up @@ -1083,6 +1086,7 @@
217F39DB2406E98400F1A0B3 /* AppSyncSubscriptionConnection+DataHandler.swift in Sources */,
21D38B94240C4A2A00EC2A8D /* OIDCAuthProvider.swift in Sources */,
217F39DF2406E98400F1A0B3 /* SubscriptionItem.swift in Sources */,
FAAEF946255229DA009DA4D5 /* AtomicValue.swift in Sources */,
217F39CF2406E98400F1A0B3 /* AppSyncMessage+Encodable.swift in Sources */,
217F39CD2406E98400F1A0B3 /* InterceptableConnection.swift in Sources */,
21D38B8E240A3C2300EC2A8D /* ConnectionProviderFactory.swift in Sources */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
shouldUseLaunchSchemeArgsEnv = "YES"
enableThreadSanitizer = "YES">
<Testables>
<TestableReference
skipped = "NO">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
shouldUseLaunchSchemeArgsEnv = "YES"
enableThreadSanitizer = "YES">
<Testables>
<TestableReference
skipped = "NO">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,22 @@ extension AppSyncSubscriptionConnection {
// MARK: - Private implementations

private func startSubscription() {
guard subscriptionState == .notSubscribed else {
guard
let subscriptionItem = subscriptionItem,
subscriptionState == .notSubscribed
else {
return
}

subscriptionState = .inProgress
let payload = convertToPayload(for: subscriptionItem.requestString, variables: subscriptionItem.variables)

guard let payload = convertToPayload(
for: subscriptionItem.requestString,
variables: subscriptionItem.variables
) else {
return
}

let message = AppSyncMessage(
id: subscriptionItem.identifier,
payload: payload,
Expand All @@ -39,7 +50,12 @@ extension AppSyncSubscriptionConnection {
connectionProvider?.write(message)
}

private func convertToPayload(for query: String, variables: [String: Any?]?) -> AppSyncMessage.Payload {
private func convertToPayload(for query: String, variables: [String: Any?]?) -> AppSyncMessage.Payload? {
guard let subscriptionItem = subscriptionItem else {
AppSyncLogger.debug("\(#function): no subscription item")
return nil
}

var dataDict: [String: Any] = ["query": query]
if let subVariables = variables {
dataDict["variables"] = subVariables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@ import Foundation
extension AppSyncSubscriptionConnection {

func handleDataEvent(response: AppSyncResponse) {
guard let subscriptionItem = subscriptionItem else {
AppSyncLogger.debug("\(#function): no subscription item")
return
}

guard response.id == subscriptionItem.identifier else {
AppSyncLogger.verbose("\(#function): ignoring data event for \(response.id ?? "(null)")")
return
}

switch response.responseType {
case .data:
let jsonEncode = JSONEncoder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import Starscream

extension AppSyncSubscriptionConnection {
func handleError(error: Error) {
guard let subscriptionItem = subscriptionItem else {
AppSyncLogger.debug("\(#function): no subscription item")
return
}

// If the error identifier is not for the this connection
// we return immediately without handling the error.
if case let ConnectionProviderError.subscription(identifier, _) = error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class AppSyncSubscriptionConnection: SubscriptionConnection, RetryableCon
/// The current state of subscription
var subscriptionState: SubscriptionState = .notSubscribed

/// Current item that is subscriped
var subscriptionItem: SubscriptionItem!
/// Current item that is subscribed
private(set) var subscriptionItem: SubscriptionItem?

/// Retry logic to handle
var retryHandler: ConnectionRetryHandler?
Expand All @@ -37,11 +37,12 @@ public class AppSyncSubscriptionConnection: SubscriptionConnection, RetryableCon
variables: [String: Any?]?,
eventHandler: @escaping (SubscriptionItemEvent, SubscriptionItem) -> Void
) -> SubscriptionItem {
subscriptionItem = SubscriptionItem(
let subscriptionItem = SubscriptionItem(
requestString: requestString,
variables: variables,
eventHandler: eventHandler
)
self.subscriptionItem = subscriptionItem
addListener()
subscriptionItem.subscriptionEventHandler(.connection(.connecting), subscriptionItem)
connectionProvider?.connect()
Expand All @@ -50,13 +51,35 @@ public class AppSyncSubscriptionConnection: SubscriptionConnection, RetryableCon

public func unsubscribe(item: SubscriptionItem) {
AppSyncLogger.debug("Unsubscribe - \(item.identifier)")

let message = AppSyncMessage(id: item.identifier, type: .unsubscribe("stop"))
connectionProvider?.write(message)
connectionProvider?.removeListener(identifier: subscriptionItem.identifier)

guard let connectionProvider = connectionProvider else {
AppSyncLogger.debug("\(#function): no connection provider")
return
}

guard let subscriptionItem = subscriptionItem else {
AppSyncLogger.debug("\(#function): no subscription item")
return
}

connectionProvider.write(message)
connectionProvider.removeListener(identifier: subscriptionItem.identifier)
}

private func addListener() {
connectionProvider?.addListener(identifier: subscriptionItem.identifier) { [weak self] event in
guard let connectionProvider = connectionProvider else {
AppSyncLogger.debug("\(#function): no connection provider")
return
}

guard let subscriptionItem = subscriptionItem else {
AppSyncLogger.debug("\(#function): no subscription item")
return
}

connectionProvider.addListener(identifier: subscriptionItem.identifier) { [weak self] event in
guard let self = self else {
AppSyncLogger.debug("Self is nil, listener is not called.")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ extension RealtimeConnectionProvider {

/// Start a stale connection timer, first invalidating and destroying any existing timer
func startStaleConnectionTimer() {
AppSyncLogger.debug("Starting stale connection timer for \(staleConnectionTimeout)s")
AppSyncLogger.debug("Starting stale connection timer for \(staleConnectionTimeout.get())s")
if staleConnectionTimer != nil {
stopStaleConnectionTimer()
}
staleConnectionTimer = CountdownTimer(interval: staleConnectionTimeout) {
staleConnectionTimer = CountdownTimer(interval: staleConnectionTimeout.get()) {
self.disconnectStaleConnection()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,80 +42,92 @@ extension RealtimeConnectionProvider: AppSyncWebsocketDelegate {
}

// MARK: - Handle websocket response

func handleResponse(_ response: RealtimeConnectionProviderResponse) {
resetStaleConnectionTimer()

switch response.responseType {
case .connectionAck:

// Only from in progress state, the connection can transition to connected state.
// The below guard statement make sure that. If we get connectionAck in other
// state means that we have initiated a disconnect parallely.
guard status == .inProgress else {
return
handleConnectionAck(response: response)
case .error:
handleError(response: response)
case .subscriptionAck, .unsubscriptionAck, .data:
if let appSyncResponse = response.toAppSyncResponse() {
updateCallback(event: .data(appSyncResponse))
}
serialConnectionQueue.async {[weak self] in
guard let self = self else {
return
}
self.status = .connected
self.updateCallback(event: .connection(self.status))
case .keepAlive:
AppSyncLogger.debug("\(self) received keepAlive")
}
}

// If the service returns a connection timeout, use that instead of the default
if case let .number(value) = response.payload?["connectionTimeoutMs"] {
let interval = value / 1_000
if interval != self.staleConnectionTimer?.interval {
AppSyncLogger.debug(
"""
Resetting keep alive timer in response to service timeout \
instructions \(interval)s
"""
)
self.staleConnectionTimeout = interval
self.startStaleConnectionTimer()
}
}
private func handleConnectionAck(response: RealtimeConnectionProviderResponse) {
// Only from in progress state, the connection can transition to connected state.
// The below guard statement make sure that. If we get connectionAck in other
// state means that we have initiated a disconnect parallely.
guard status == .inProgress else {
return
}
serialConnectionQueue.async {[weak self] in
guard let self = self else {
return
}
self.status = .connected
self.updateCallback(event: .connection(self.status))

case .error:
// If we get an error in connection inprogress state, return back as connection error.
if status == .inProgress {
serialConnectionQueue.async {[weak self] in
guard let self = self else {
return
}
self.status = .notConnected
self.updateCallback(event: .error(ConnectionProviderError.connection))
}
// If the service returns a connection timeout, use that instead of the default
guard case let .number(value) = response.payload?["connectionTimeoutMs"] else {
return
}

// Return back as generic error if there is no identifier.
guard let identifier = response.id else {
let genericError = ConnectionProviderError.other
updateCallback(event: .error(genericError))
let interval = value / 1_000

guard interval != self.staleConnectionTimer?.interval else {
return
}

// Map to limit exceed error if we get MaxSubscriptionsReachedException
if let errorType = response.payload?["errorType"],
errorType == "MaxSubscriptionsReachedException" {
let limitExceedError = ConnectionProviderError.limitExceeded(identifier)
updateCallback(event: .error(limitExceedError))
return
AppSyncLogger.debug(
"""
Resetting keep alive timer in response to service timeout \
instructions: \(interval)s
"""
)
self.staleConnectionTimeout.set(interval)
self.startStaleConnectionTimer()
}
}

private func handleError(response: RealtimeConnectionProviderResponse) {
// If we get an error in connection inprogress state, return back as connection error.
if status == .inProgress {
serialConnectionQueue.async {[weak self] in
guard let self = self else {
return
}
self.status = .notConnected
self.updateCallback(event: .error(ConnectionProviderError.connection))
}
return
}

let subscriptionError = ConnectionProviderError.subscription(identifier, response.payload)
updateCallback(event: .error(subscriptionError))
// Return back as generic error if there is no identifier.
guard let identifier = response.id else {
let genericError = ConnectionProviderError.other
updateCallback(event: .error(genericError))
return
}

case .subscriptionAck, .unsubscriptionAck, .data:
if let appSyncResponse = response.toAppSyncResponse() {
updateCallback(event: .data(appSyncResponse))
}
case .keepAlive:
AppSyncLogger.debug("\(self) received keepAlive")
// Map to limit exceed error if we get MaxSubscriptionsReachedException
if let errorType = response.payload?["errorType"],
errorType == "MaxSubscriptionsReachedException" {
let limitExceedError = ConnectionProviderError.limitExceeded(identifier)
updateCallback(event: .error(limitExceedError))
return
}

let subscriptionError = ConnectionProviderError.subscription(identifier, response.payload)
updateCallback(event: .error(subscriptionError))
}

}

extension RealtimeConnectionProviderResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class RealtimeConnectionProvider: ConnectionProvider {

/// Maximum number of seconds a connection may go without receiving a keep alive
/// message before we consider it stale and force a disconnect
var staleConnectionTimeout: TimeInterval = 5 * 60
let staleConnectionTimeout = AtomicValue<TimeInterval>(initialValue: 5 * 60)

/// A timer that automatically disconnects the current connection if it goes longer
/// than `staleConnectionTimeout` without activity. Receiving any data or "keep
Expand Down
Loading

0 comments on commit 860152e

Please sign in to comment.