Skip to content

Commit

Permalink
chore(datastore): add subscribe() impl (#4846)
Browse files Browse the repository at this point in the history
  • Loading branch information
Equartey committed May 13, 2024
1 parent 102e991 commit 69a6d71
Show file tree
Hide file tree
Showing 66 changed files with 2,194 additions and 508 deletions.
107 changes: 100 additions & 7 deletions packages/amplify_datastore/ios/Classes/FlutterApiPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,19 @@ import Combine
public class FlutterApiPlugin: APICategoryPlugin
{
public var key: PluginKey = "awsAPIPlugin"
init() {

private let apiAuthFactory: APIAuthProviderFactory
private let nativeApiPlugin: NativeApiPlugin
private let nativeSubscriptionEvents: PassthroughSubject<NativeGraphQLSubscriptionResponse, Never>
private var cancellables = Set<AnyCancellable>()

init(
apiAuthProviderFactory: APIAuthProviderFactory,
nativeApiPlugin: NativeApiPlugin,
subscriptionEventBus: PassthroughSubject<NativeGraphQLSubscriptionResponse, Never>
) {
self.apiAuthFactory = apiAuthProviderFactory
self.nativeApiPlugin = nativeApiPlugin
self.nativeSubscriptionEvents = subscriptionEventBus
}

// TODO: Implment in Async Swift v2
Expand All @@ -23,15 +34,97 @@ public class FlutterApiPlugin: APICategoryPlugin
preconditionFailure("method not supported")
}

// TODO: Implment in Async Swift v2
public func subscribe<R>(request: GraphQLRequest<R>) -> AmplifyAsyncThrowingSequence<GraphQLSubscriptionEvent<R>> where R : Decodable {
preconditionFailure("method not supported")
public func subscribe<R: Decodable>(
request: GraphQLRequest<R>
) -> AmplifyAsyncThrowingSequence<GraphQLSubscriptionEvent<R>> where R : Decodable {
var subscriptionId: String? = ""

// TODO: write a e2e test to ensure we don't go over 100 AppSync connections
func unsubscribe(subscriptionId: String?){
if let subscriptionId {
DispatchQueue.main.async {
self.nativeApiPlugin.unsubscribe(subscriptionId: subscriptionId) {}
}
}
}

// TODO: shouldn't there be a timeout if there is no start_ack returned in a certain period of time
let (sequence, cancellable) = nativeSubscriptionEvents
.receive(on: DispatchQueue.global())
.filter { $0.subscriptionId == subscriptionId }
.handleEvents(receiveCompletion: {_ in
unsubscribe(subscriptionId: subscriptionId)
}, receiveCancel: {
unsubscribe(subscriptionId: subscriptionId)
})
.compactMap { [weak self] event -> GraphQLSubscriptionEvent<R>? in
switch event.type {
case "connecting":
return .connection(.connecting)
case "start_ack":
return .connection(.connected)
case "complete":
return .connection(.disconnected)
case "data":
if let responseDecoded: GraphQLResponse<R> =
try? self?.decodeGraphQLPayloadJson(request: request, payload: event.payloadJson)
{
return .data(responseDecoded)
}
return nil
case "error":
if let payload = event.payloadJson {
return .data(.fromAppSyncSubscriptionErrorResponse(string: payload))
}
return nil
default:
print("ERROR unsupported subscription event type! \(String(describing: event.type))")
return nil
}
}
.toAmplifyAsyncThrowingSequence()
cancellables.insert(cancellable) // the subscription is bind with class instance lifecycle, it should be released when stream is finished or unsubscribed
sequence.send(.connection(.connecting))
DispatchQueue.main.async {
self.nativeApiPlugin.subscribe(request: request.toNativeGraphQLRequest()) { response in
subscriptionId = response.subscriptionId
}
}
return sequence
}

private func decodeGraphQLPayloadJson<R>(
request: GraphQLRequest<R>,
payload: String?
) throws -> GraphQLResponse<R> {
guard let payload else {
throw DataStoreError.decodingError("Request payload could not be empty", "")
}

return GraphQLResponse<R>.fromAppSyncResponse(
string: payload,
decodePath: request.decodePath
)
}

private func decodeGraphQLSubscriptionPayloadJson<R>(
request: GraphQLRequest<R>,
payload: String?
) throws -> GraphQLResponse<R> {
guard let payload else {
throw DataStoreError.decodingError("Request payload could not be empty", "")
}

return GraphQLResponse<R>.fromAppSyncSubscriptionResponse(
string: payload,
decodePath: request.decodePath
)
}

public func configure(using configuration: Any?) throws { }

public func apiAuthProviderFactory() -> APIAuthProviderFactory {
preconditionFailure("method not supported")
return self.apiAuthFactory
}

public func add(interceptor: any URLRequestInterceptor, for apiName: String) throws {
Expand Down Expand Up @@ -67,7 +160,7 @@ public class FlutterApiPlugin: APICategoryPlugin
}

public func reachabilityPublisher() throws -> AnyPublisher<ReachabilityUpdate, Never>? {
preconditionFailure("method not supported")
return nil
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class SwiftAmplifyDataStorePlugin: NSObject, FlutterPlugin, NativeAmplify
private let customTypeSchemaRegistry: FlutterSchemaRegistry
private let dataStoreObserveEventStreamHandler: DataStoreObserveEventStreamHandler?
private let dataStoreHubEventStreamHandler: DataStoreHubEventStreamHandler?
private let nativeSubscriptionEventBus = PassthroughSubject<NativeGraphQLSubscriptionResponse, Never>()
private var channel: FlutterMethodChannel?
private var observeSubscription: AnyCancellable?
private let nativeAuthPlugin: NativeAuthPlugin
Expand Down Expand Up @@ -88,7 +89,14 @@ public class SwiftAmplifyDataStorePlugin: NSObject, FlutterPlugin, NativeAmplify
AWSAuthorizationType(rawValue: $0)
}
try Amplify.add(
plugin: FlutterApiPlugin()
plugin: FlutterApiPlugin(
apiAuthProviderFactory: FlutterAuthProviders(
authProviders: authProviders,
nativeApiPlugin: nativeApiPlugin
),
nativeApiPlugin: nativeApiPlugin,
subscriptionEventBus: nativeSubscriptionEventBus
)
)
return completion(.success(()))
} catch let apiError as APIError {
Expand Down Expand Up @@ -608,7 +616,7 @@ public class SwiftAmplifyDataStorePlugin: NSObject, FlutterPlugin, NativeAmplify
}

func sendSubscriptionEvent(event: NativeGraphQLSubscriptionResponse, completion: @escaping (Result<Void, any Error>) -> Void) {
fatalError("not implemented")
nativeSubscriptionEventBus.send(event)
}

private func checkArguments(args: Any) throws -> [String: Any] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//


import Foundation
import Amplify

extension GraphQLRequest {
func toNativeGraphQLRequest() -> NativeGraphQLRequest {
let variablesJson = self.variables
.flatMap { try? JSONSerialization.data(withJSONObject: $0, options: []) }
.flatMap { String(data: $0, encoding: .utf8) }

return NativeGraphQLRequest(
document: self.document,
apiName: self.apiName,
variablesJson: variablesJson ?? "{}",
responseType: String(describing: self.responseType),
decodePath: self.decodePath
)
}
}
Loading

0 comments on commit 69a6d71

Please sign in to comment.