From e70a3f54bf228fa09b8d6a71f756a272e960c401 Mon Sep 17 00:00:00 2001 From: Tomasz Trela Date: Fri, 8 Mar 2024 18:13:18 +0100 Subject: [PATCH] fix(DataStore): dataStore cannot connect to model's sync subscriptions (AWS_LAMBDA auth type) #3549 --- .../Operation/RetryableGraphQLOperation.swift | 49 +++++++++- .../InitialSync/InitialSyncOperation.swift | 3 +- ...omingAsyncSubscriptionEventPublisher.swift | 91 ++++++++++++++++--- 3 files changed, 124 insertions(+), 19 deletions(-) diff --git a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift index ed2a6e2753..edee063afc 100644 --- a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift +++ b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift @@ -12,6 +12,7 @@ public protocol AnyGraphQLOperation { associatedtype Success associatedtype Failure: Error typealias ResultListener = (Result) -> Void + typealias ErrorListener = (Failure) -> Void } /// Abastraction for a retryable GraphQLOperation. @@ -24,6 +25,7 @@ public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger { typealias RequestFactory = () async -> GraphQLRequest typealias OperationFactory = (GraphQLRequest, @escaping OperationResultListener) -> OperationType typealias OperationResultListener = OperationType.ResultListener + typealias OperationErrorListener = OperationType.ErrorListener /// Operation unique identifier var id: UUID { get } @@ -45,9 +47,12 @@ public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger { var operationFactory: OperationFactory { get } var resultListener: OperationResultListener { get } + + var errorListener: OperationErrorListener { get } init(requestFactory: @escaping RequestFactory, maxRetries: Int, + errorListener: @escaping OperationErrorListener, resultListener: @escaping OperationResultListener, _ operationFactory: @escaping OperationFactory) @@ -71,6 +76,11 @@ extension RetryableGraphQLOperationBehavior { attempts += 1 log.debug("[\(id)] - Try [\(attempts)/\(maxRetries)]") let wrappedResultListener: OperationResultListener = { result in + if case let .failure(error) = result { + // Give an operation a chance to prepare itself for a retry after a failure + self.errorListener(error) + } + if case let .failure(error) = result, self.shouldRetry(error: error as? APIError) { self.log.debug("\(error)") Task { @@ -103,17 +113,20 @@ public final class RetryableGraphQLOperation: Operation, Ret public var attempts: Int = 0 public var requestFactory: RequestFactory public var underlyingOperation: AtomicValue?> = AtomicValue(initialValue: nil) + public var errorListener: OperationErrorListener public var resultListener: OperationResultListener public var operationFactory: OperationFactory public init(requestFactory: @escaping RequestFactory, maxRetries: Int, + errorListener: @escaping OperationErrorListener, resultListener: @escaping OperationResultListener, _ operationFactory: @escaping OperationFactory) { self.id = UUID() self.maxRetries = max(1, maxRetries) self.requestFactory = requestFactory self.operationFactory = operationFactory + self.errorListener = errorListener self.resultListener = resultListener } @@ -154,17 +167,21 @@ public final class RetryableGraphQLSubscriptionOperation: Op public var attempts: Int = 0 public var underlyingOperation: AtomicValue?> = AtomicValue(initialValue: nil) public var requestFactory: RequestFactory + public var errorListener: OperationErrorListener public var resultListener: OperationResultListener public var operationFactory: OperationFactory - + private var filterLimitRetried: Bool = false + public init(requestFactory: @escaping RequestFactory, maxRetries: Int, + errorListener: @escaping OperationErrorListener, resultListener: @escaping OperationResultListener, _ operationFactory: @escaping OperationFactory) { self.id = UUID() self.maxRetries = max(1, maxRetries) self.requestFactory = requestFactory self.operationFactory = operationFactory + self.errorListener = errorListener self.resultListener = resultListener } public override func main() { @@ -178,9 +195,35 @@ public final class RetryableGraphQLSubscriptionOperation: Op } public func shouldRetry(error: APIError?) -> Bool { - return attempts < maxRetries +// return attempts < maxRetries + + guard case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error else { + return false + } + + if let authError = underlyingError as? AuthError { + switch authError { + case .signedOut, .notAuthorized: + return attempts < maxRetries + default: + return false + } + } + + // TODO: - How to distinguish errors? + // TODO: - Handle other errors + if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") && + filterLimitRetried == false { + + // Just to be sure that endless retry won't happen + filterLimitRetried = true + maxRetries += 1 + + return true + } + + return false } - } // MARK: GraphQLOperation - GraphQLSubscriptionOperation + AnyGraphQLOperation diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift index e01f235b88..90856e75d7 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift @@ -194,7 +194,8 @@ final class InitialSyncOperation: AsynchronousOperation { lastSync: lastSyncTime, authType: authTypes.next()) }, - maxRetries: authTypes.count, + maxRetries: authTypes.count, + errorListener: { _ in }, resultListener: completionListener) { nextRequest, wrappedCompletionListener in api.query(request: nextRequest, listener: wrappedCompletionListener) }.main() diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift index 0066b7ab3a..d33f19037a 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift @@ -73,19 +73,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { // onCreate operation let onCreateValueListener = onCreateValueListenerHandler(event:) - let onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, + var onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, operations: [.create, .read]) + var onCreateAuthType: AWSAuthorizationType? = onCreateAuthTypeProvider.next() + var onCreateModelPredicate = modelPredicate + self.onCreateValueListener = onCreateValueListener self.onCreateOperation = RetryableGraphQLSubscriptionOperation( requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor( for: modelSchema, - where: modelPredicate, + where: { onCreateModelPredicate }, subscriptionType: .onCreate, api: api, auth: auth, awsAuthService: self.awsAuthService, - authTypeProvider: onCreateAuthTypeProvider), + authTypeProvider: { onCreateAuthType }), maxRetries: onCreateAuthTypeProvider.count, + errorListener: { error in + // TODO: - How to distinguish errors? + // TODO: - Handle other errors + if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") { + onCreateModelPredicate = nil + + } else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error, + let authError = underlyingError as? AuthError { + + switch authError { + case .signedOut, .notAuthorized: + onCreateAuthType = onCreateAuthTypeProvider.next() + default: + return + } + } + }, resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in api.subscribe(request: nextRequest, valueListener: onCreateValueListener, @@ -95,19 +115,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { // onUpdate operation let onUpdateValueListener = onUpdateValueListenerHandler(event:) - let onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, + var onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, operations: [.update, .read]) + var onUpdateAuthType: AWSAuthorizationType? = onUpdateAuthTypeProvider.next() + var onUpdateModelPredicate = modelPredicate + self.onUpdateValueListener = onUpdateValueListener self.onUpdateOperation = RetryableGraphQLSubscriptionOperation( requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor( for: modelSchema, - where: modelPredicate, + where: { onUpdateModelPredicate }, subscriptionType: .onUpdate, api: api, auth: auth, awsAuthService: self.awsAuthService, - authTypeProvider: onUpdateAuthTypeProvider), + authTypeProvider: { onUpdateAuthType }), maxRetries: onUpdateAuthTypeProvider.count, + errorListener: { error in + // TODO: - How to distinguish errors? + // TODO: - Handle other errors + if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") { + onUpdateModelPredicate = nil + + } else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error, + let authError = underlyingError as? AuthError { + + switch authError { + case .signedOut, .notAuthorized: + onUpdateAuthType = onUpdateAuthTypeProvider.next() + default: + return + } + } + }, resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in api.subscribe(request: nextRequest, valueListener: onUpdateValueListener, @@ -117,19 +157,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { // onDelete operation let onDeleteValueListener = onDeleteValueListenerHandler(event:) - let onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, + var onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, operations: [.delete, .read]) + var onDeleteAuthType: AWSAuthorizationType? = onDeleteAuthTypeProvider.next() + var onDeleteModelPredicate = modelPredicate + self.onDeleteValueListener = onDeleteValueListener self.onDeleteOperation = RetryableGraphQLSubscriptionOperation( requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor( for: modelSchema, - where: modelPredicate, + where: { onDeleteModelPredicate }, subscriptionType: .onDelete, api: api, auth: auth, awsAuthService: self.awsAuthService, - authTypeProvider: onDeleteAuthTypeProvider), + authTypeProvider: { onDeleteAuthType }), maxRetries: onUpdateAuthTypeProvider.count, + errorListener: { error in + // TODO: - How to distinguish errors? + // TODO: - Handle other errors + if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") { + onDeleteModelPredicate = nil + + } else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error, + let authError = underlyingError as? AuthError { + + switch authError { + case .signedOut, .notAuthorized: + onDeleteAuthType = onDeleteAuthTypeProvider.next() + default: + return + } + } + }, resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in api.subscribe(request: nextRequest, valueListener: onDeleteValueListener, @@ -204,6 +264,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { auth: AuthCategoryBehavior?, authType: AWSAuthorizationType?, awsAuthService: AWSAuthServiceBehavior) async -> GraphQLRequest { + let request: GraphQLRequest if modelSchema.hasAuthenticationRules, let _ = auth, @@ -303,20 +364,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { // MARK: - IncomingAsyncSubscriptionEventPublisher + API request factory extension IncomingAsyncSubscriptionEventPublisher { static func apiRequestFactoryFor(for modelSchema: ModelSchema, - where predicate: QueryPredicate?, + where predicate: @escaping () -> QueryPredicate?, subscriptionType: GraphQLSubscriptionType, api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?, awsAuthService: AWSAuthServiceBehavior, - authTypeProvider: AWSAuthorizationTypeIterator) -> RetryableGraphQLOperation.RequestFactory { - var authTypes = authTypeProvider + authTypeProvider: @escaping () -> AWSAuthorizationType?) -> RetryableGraphQLOperation.RequestFactory { + return { - return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema, - where: predicate, + await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema, + where: predicate(), subscriptionType: subscriptionType, api: api, auth: auth, - authType: authTypes.next(), + authType: authTypeProvider(), awsAuthService: awsAuthService) } }