Skip to content

Commit

Permalink
fix(DataStore): dataStore cannot connect to model's sync subscription…
Browse files Browse the repository at this point in the history
…s (AWS_LAMBDA auth type) aws-amplify#3549
  • Loading branch information
MuniekMg committed Mar 8, 2024
1 parent b96fbda commit e70a3f5
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 19 deletions.
49 changes: 46 additions & 3 deletions Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public protocol AnyGraphQLOperation {
associatedtype Success
associatedtype Failure: Error
typealias ResultListener = (Result<Success, Failure>) -> Void
typealias ErrorListener = (Failure) -> Void
}

/// Abastraction for a retryable GraphQLOperation.
Expand All @@ -24,6 +25,7 @@ public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger {
typealias RequestFactory = () async -> GraphQLRequest<Payload>
typealias OperationFactory = (GraphQLRequest<Payload>, @escaping OperationResultListener) -> OperationType
typealias OperationResultListener = OperationType.ResultListener
typealias OperationErrorListener = OperationType.ErrorListener

/// Operation unique identifier
var id: UUID { get }
Expand All @@ -45,9 +47,12 @@ public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger {
var operationFactory: OperationFactory { get }

var resultListener: OperationResultListener { get }

Check warning on line 50 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing whitespace (trailing_whitespace)
var errorListener: OperationErrorListener { get }

init(requestFactory: @escaping RequestFactory,
maxRetries: Int,
errorListener: @escaping OperationErrorListener,
resultListener: @escaping OperationResultListener,
_ operationFactory: @escaping OperationFactory)

Expand All @@ -71,6 +76,11 @@ extension RetryableGraphQLOperationBehavior {
attempts += 1
log.debug("[\(id)] - Try [\(attempts)/\(maxRetries)]")
let wrappedResultListener: OperationResultListener = { result in
if case let .failure(error) = result {
// Give an operation a chance to prepare itself for a retry after a failure
self.errorListener(error)
}

Check warning on line 83 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing whitespace (trailing_whitespace)
if case let .failure(error) = result, self.shouldRetry(error: error as? APIError) {
self.log.debug("\(error)")
Task {
Expand Down Expand Up @@ -103,17 +113,20 @@ public final class RetryableGraphQLOperation<Payload: Decodable>: Operation, Ret
public var attempts: Int = 0
public var requestFactory: RequestFactory
public var underlyingOperation: AtomicValue<GraphQLOperation<Payload>?> = AtomicValue(initialValue: nil)
public var errorListener: OperationErrorListener
public var resultListener: OperationResultListener
public var operationFactory: OperationFactory

public init(requestFactory: @escaping RequestFactory,
maxRetries: Int,
errorListener: @escaping OperationErrorListener,
resultListener: @escaping OperationResultListener,
_ operationFactory: @escaping OperationFactory) {
self.id = UUID()
self.maxRetries = max(1, maxRetries)
self.requestFactory = requestFactory
self.operationFactory = operationFactory
self.errorListener = errorListener
self.resultListener = resultListener
}

Expand Down Expand Up @@ -154,17 +167,21 @@ public final class RetryableGraphQLSubscriptionOperation<Payload: Decodable>: Op
public var attempts: Int = 0
public var underlyingOperation: AtomicValue<GraphQLSubscriptionOperation<Payload>?> = AtomicValue(initialValue: nil)
public var requestFactory: RequestFactory
public var errorListener: OperationErrorListener
public var resultListener: OperationResultListener
public var operationFactory: OperationFactory

private var filterLimitRetried: Bool = false

Check warning on line 174 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing whitespace (trailing_whitespace)
public init(requestFactory: @escaping RequestFactory,
maxRetries: Int,
errorListener: @escaping OperationErrorListener,
resultListener: @escaping OperationResultListener,
_ operationFactory: @escaping OperationFactory) {
self.id = UUID()
self.maxRetries = max(1, maxRetries)
self.requestFactory = requestFactory
self.operationFactory = operationFactory
self.errorListener = errorListener
self.resultListener = resultListener
}
public override func main() {
Expand All @@ -178,9 +195,35 @@ public final class RetryableGraphQLSubscriptionOperation<Payload: Decodable>: Op
}

public func shouldRetry(error: APIError?) -> Bool {
return attempts < maxRetries
// return attempts < maxRetries

Check warning on line 199 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing whitespace (trailing_whitespace)
guard case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error else {
return false
}

Check warning on line 203 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing whitespace (trailing_whitespace)
if let authError = underlyingError as? AuthError {
switch authError {
case .signedOut, .notAuthorized:
return attempts < maxRetries
default:
return false
}
}

Check warning on line 212 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing whitespace (trailing_whitespace)
// TODO: - How to distinguish errors?

Check warning on line 213 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

TODOs should be resolved (- How to distinguish errors?) (todo)
// TODO: - Handle other errors

Check warning on line 214 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

TODOs should be resolved (- Handle other errors) (todo)
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") &&
filterLimitRetried == false {

// Just to be sure that endless retry won't happen
filterLimitRetried = true
maxRetries += 1

return true
}

return false
}

}

// MARK: GraphQLOperation - GraphQLSubscriptionOperation + AnyGraphQLOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ final class InitialSyncOperation: AsynchronousOperation {
lastSync: lastSyncTime,
authType: authTypes.next())
},
maxRetries: authTypes.count,
maxRetries: authTypes.count,
errorListener: { _ in },
resultListener: completionListener) { nextRequest, wrappedCompletionListener in
api.query(request: nextRequest, listener: wrappedCompletionListener)
}.main()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {

// onCreate operation
let onCreateValueListener = onCreateValueListenerHandler(event:)
let onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
var onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
operations: [.create, .read])
var onCreateAuthType: AWSAuthorizationType? = onCreateAuthTypeProvider.next()
var onCreateModelPredicate = modelPredicate

self.onCreateValueListener = onCreateValueListener
self.onCreateOperation = RetryableGraphQLSubscriptionOperation(
requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
for: modelSchema,
where: modelPredicate,
where: { onCreateModelPredicate },
subscriptionType: .onCreate,
api: api,
auth: auth,
awsAuthService: self.awsAuthService,
authTypeProvider: onCreateAuthTypeProvider),
authTypeProvider: { onCreateAuthType }),
maxRetries: onCreateAuthTypeProvider.count,
errorListener: { error in
// TODO: - How to distinguish errors?
// TODO: - Handle other errors
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") {
onCreateModelPredicate = nil

} else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error,
let authError = underlyingError as? AuthError {

Check failure on line 99 in AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Opening braces should be preceded by a single space and on the same line as the declaration (opening_brace)

switch authError {
case .signedOut, .notAuthorized:
onCreateAuthType = onCreateAuthTypeProvider.next()
default:
return
}
}
},
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
api.subscribe(request: nextRequest,
valueListener: onCreateValueListener,
Expand All @@ -95,19 +115,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {

// onUpdate operation
let onUpdateValueListener = onUpdateValueListenerHandler(event:)
let onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
var onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
operations: [.update, .read])
var onUpdateAuthType: AWSAuthorizationType? = onUpdateAuthTypeProvider.next()
var onUpdateModelPredicate = modelPredicate

self.onUpdateValueListener = onUpdateValueListener
self.onUpdateOperation = RetryableGraphQLSubscriptionOperation(
requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
for: modelSchema,
where: modelPredicate,
where: { onUpdateModelPredicate },
subscriptionType: .onUpdate,
api: api,
auth: auth,
awsAuthService: self.awsAuthService,
authTypeProvider: onUpdateAuthTypeProvider),
authTypeProvider: { onUpdateAuthType }),
maxRetries: onUpdateAuthTypeProvider.count,
errorListener: { error in
// TODO: - How to distinguish errors?
// TODO: - Handle other errors
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") {
onUpdateModelPredicate = nil

} else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error,
let authError = underlyingError as? AuthError {

Check failure on line 141 in AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Opening braces should be preceded by a single space and on the same line as the declaration (opening_brace)

switch authError {
case .signedOut, .notAuthorized:
onUpdateAuthType = onUpdateAuthTypeProvider.next()
default:
return
}
}
},
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
api.subscribe(request: nextRequest,
valueListener: onUpdateValueListener,
Expand All @@ -117,19 +157,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {

// onDelete operation
let onDeleteValueListener = onDeleteValueListenerHandler(event:)
let onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
var onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema,
operations: [.delete, .read])
var onDeleteAuthType: AWSAuthorizationType? = onDeleteAuthTypeProvider.next()
var onDeleteModelPredicate = modelPredicate

self.onDeleteValueListener = onDeleteValueListener
self.onDeleteOperation = RetryableGraphQLSubscriptionOperation(
requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor(
for: modelSchema,
where: modelPredicate,
where: { onDeleteModelPredicate },
subscriptionType: .onDelete,
api: api,
auth: auth,
awsAuthService: self.awsAuthService,
authTypeProvider: onDeleteAuthTypeProvider),
authTypeProvider: { onDeleteAuthType }),
maxRetries: onUpdateAuthTypeProvider.count,
errorListener: { error in
// TODO: - How to distinguish errors?
// TODO: - Handle other errors
if error.debugDescription.contains("Filters combination exceed maximum limit 10 for subscription.") {
onDeleteModelPredicate = nil

} else if case let .operationError(errorDescription, recoverySuggestion, underlyingError) = error,
let authError = underlyingError as? AuthError {

Check failure on line 183 in AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Opening braces should be preceded by a single space and on the same line as the declaration (opening_brace)

switch authError {
case .signedOut, .notAuthorized:
onDeleteAuthType = onDeleteAuthTypeProvider.next()
default:
return
}
}
},
resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
api.subscribe(request: nextRequest,
valueListener: onDeleteValueListener,
Expand Down Expand Up @@ -204,6 +264,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
auth: AuthCategoryBehavior?,
authType: AWSAuthorizationType?,
awsAuthService: AWSAuthServiceBehavior) async -> GraphQLRequest<Payload> {

let request: GraphQLRequest<Payload>
if modelSchema.hasAuthenticationRules,
let _ = auth,
Expand Down Expand Up @@ -303,20 +364,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
// MARK: - IncomingAsyncSubscriptionEventPublisher + API request factory
extension IncomingAsyncSubscriptionEventPublisher {
static func apiRequestFactoryFor(for modelSchema: ModelSchema,
where predicate: QueryPredicate?,
where predicate: @escaping () -> QueryPredicate?,
subscriptionType: GraphQLSubscriptionType,
api: APICategoryGraphQLBehaviorExtended,
auth: AuthCategoryBehavior?,
awsAuthService: AWSAuthServiceBehavior,
authTypeProvider: AWSAuthorizationTypeIterator) -> RetryableGraphQLOperation<Payload>.RequestFactory {
var authTypes = authTypeProvider
authTypeProvider: @escaping () -> AWSAuthorizationType?) -> RetryableGraphQLOperation<Payload>.RequestFactory {

return {
return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
where: predicate,
await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema,
where: predicate(),
subscriptionType: subscriptionType,
api: api,
auth: auth,
authType: authTypes.next(),
authType: authTypeProvider(),
awsAuthService: awsAuthService)
}
}
Expand Down

0 comments on commit e70a3f5

Please sign in to comment.