Skip to content

Commit

Permalink
addressing comments #1
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hui committed Aug 1, 2019
1 parent 5cfa39c commit 3277402
Show file tree
Hide file tree
Showing 19 changed files with 60 additions and 62 deletions.
5 changes: 2 additions & 3 deletions Firestore/Example/Tests/Integration/FSTDatastoreTests.mm
Expand Up @@ -160,7 +160,6 @@ @interface FSTDatastoreTests : XCTestCase
@implementation FSTDatastoreTests {
std::shared_ptr<AsyncQueue> _testWorkerQueue;
FSTLocalStore *_localStore;
EmptyCredentialsProvider _credentials;

DatabaseInfo _databaseInfo;
std::shared_ptr<Datastore> _datastore;
Expand All @@ -184,8 +183,8 @@ - (void)setUp {
dispatch_queue_t queue = dispatch_queue_create(
"com.google.firestore.FSTDatastoreTestsWorkerQueue", DISPATCH_QUEUE_SERIAL);
_testWorkerQueue = std::make_shared<AsyncQueue>(absl::make_unique<ExecutorLibdispatch>(queue));
_datastore = std::make_shared<Datastore>(
_databaseInfo, _testWorkerQueue, std::shared_ptr<EmptyCredentialsProvider>(&_credentials));
_datastore = std::make_shared<Datastore>(_databaseInfo, _testWorkerQueue,
std::make_shared<EmptyCredentialsProvider>());

_remoteStore =
absl::make_unique<RemoteStore>(_localStore, _datastore, _testWorkerQueue, [](OnlineState) {});
Expand Down
4 changes: 2 additions & 2 deletions Firestore/Example/Tests/SpecTests/FSTMockDatastore.h
Expand Up @@ -38,7 +38,7 @@ class MockDatastore : public Datastore {
public:
MockDatastore(const core::DatabaseInfo& database_info,
const std::shared_ptr<util::AsyncQueue>& worker_queue,
auth::CredentialsProvider* credentials);
std::shared_ptr<auth::CredentialsProvider> credentials);

std::shared_ptr<WatchStream> CreateWatchStream(WatchStreamCallback* callback) override;
std::shared_ptr<WriteStream> CreateWriteStream(WriteStreamCallback* callback) override;
Expand Down Expand Up @@ -93,7 +93,7 @@ class MockDatastore : public Datastore {
// reduces the number of test-only methods in `Datastore`.
const core::DatabaseInfo* database_info_ = nullptr;
std::shared_ptr<util::AsyncQueue> worker_queue_;
auth::CredentialsProvider* credentials_ = nullptr;
std::shared_ptr<auth::CredentialsProvider> credentials_;

std::shared_ptr<MockWatchStream> watch_stream_;
std::shared_ptr<MockWriteStream> write_stream_;
Expand Down
12 changes: 5 additions & 7 deletions Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm
Expand Up @@ -66,13 +66,12 @@
class MockWatchStream : public WatchStream {
public:
MockWatchStream(const std::shared_ptr<AsyncQueue>& worker_queue,
CredentialsProvider* credentials_provider,
std::shared_ptr<CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WatchStreamCallback* callback,
MockDatastore* datastore)
: WatchStream{worker_queue, std::shared_ptr<CredentialsProvider>(credentials_provider),
serializer, grpc_connection, callback},
: WatchStream{worker_queue, credentials_provider, serializer, grpc_connection, callback},
datastore_{datastore},
callback_{callback} {
}
Expand Down Expand Up @@ -158,13 +157,12 @@ void WriteWatchChange(const WatchChange& change, SnapshotVersion snap) {
class MockWriteStream : public WriteStream {
public:
MockWriteStream(const std::shared_ptr<AsyncQueue>& worker_queue,
CredentialsProvider* credentials_provider,
std::shared_ptr<CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WriteStreamCallback* callback,
MockDatastore* datastore)
: WriteStream{worker_queue, std::shared_ptr<CredentialsProvider>(credentials_provider),
serializer, grpc_connection, callback},
: WriteStream{worker_queue, credentials_provider, serializer, grpc_connection, callback},
datastore_{datastore},
callback_{callback} {
}
Expand Down Expand Up @@ -242,7 +240,7 @@ int sent_mutations_count() const {

MockDatastore::MockDatastore(const core::DatabaseInfo& database_info,
const std::shared_ptr<util::AsyncQueue>& worker_queue,
auth::CredentialsProvider* credentials)
std::shared_ptr<auth::CredentialsProvider> credentials)
: Datastore{database_info, worker_queue, std::shared_ptr<CredentialsProvider>(credentials),
CreateNoOpConnectivityMonitor()},
database_info_{&database_info},
Expand Down
4 changes: 2 additions & 2 deletions Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm
Expand Up @@ -149,7 +149,6 @@ @implementation FSTSyncEngineTestDriver {

DatabaseInfo _databaseInfo;
User _currentUser;
EmptyCredentialsProvider _credentialProvider;

std::shared_ptr<MockDatastore> _datastore;
}
Expand Down Expand Up @@ -180,7 +179,8 @@ - (instancetype)initWithPersistence:(id<FSTPersistence>)persistence
_persistence = persistence;
_localStore = [[FSTLocalStore alloc] initWithPersistence:persistence initialUser:initialUser];

_datastore = std::make_shared<MockDatastore>(_databaseInfo, _workerQueue, &_credentialProvider);
_datastore = std::make_shared<MockDatastore>(_databaseInfo, _workerQueue,
std::make_shared<EmptyCredentialsProvider>());
_remoteStore = absl::make_unique<RemoteStore>(
_localStore, _datastore, _workerQueue, [self](OnlineState onlineState) {
[self.syncEngine applyChangedOnlineState:onlineState];
Expand Down
2 changes: 1 addition & 1 deletion Firestore/Source/API/FIRFirestore+Internal.h
Expand Up @@ -65,7 +65,7 @@ NS_ASSUME_NONNULL_BEGIN
+ (FIRFirestore *)recoverFromFirestore:(std::shared_ptr<api::Firestore>)firestore;

/**
* Shuts down this FirebaseFirestore instance.
* Shuts down this `FIRFirestore` instance.
*
* After shutdown only the `clearPersistence` method may be used. Any other method
* will throw an error.
Expand Down
10 changes: 7 additions & 3 deletions Firestore/Source/API/FSTFirestoreComponent.mm
Expand Up @@ -124,10 +124,14 @@ - (void)removeInstance:(NSString *)database {
#pragma mark - FIRComponentLifecycleMaintainer

- (void)appWillBeDeleted:(FIRApp *)app {
for (NSString *key in _instances) {
[_instances[key] shutdownInternalWithCompletion:nil];
NSDictionary<NSString *, FIRFirestore *> *instances;
@synchronized(self.instances) {
instances = [_instances copy];
[_instances removeAllObjects];
}
for (NSString *key in instances) {
[instances[key] shutdownInternalWithCompletion:nil];
}
[_instances removeAllObjects];
}

#pragma mark - Object Lifecycle
Expand Down
4 changes: 2 additions & 2 deletions Firestore/Source/Core/FSTFirestoreClient.h
Expand Up @@ -68,8 +68,8 @@ NS_ASSUME_NONNULL_BEGIN
*/
+ (instancetype)clientWithDatabaseInfo:(const core::DatabaseInfo &)databaseInfo
settings:(const api::Settings &)settings
credentialsProvider:(std::shared_ptr<auth::CredentialsProvider>)
credentialsProvider // no passing ownership
credentialsProvider:
(std::shared_ptr<auth::CredentialsProvider>)credentialsProvider
userExecutor:(std::shared_ptr<util::Executor>)userExecutor
workerQueue:(std::shared_ptr<util::AsyncQueue>)workerQueue;

Expand Down
12 changes: 5 additions & 7 deletions Firestore/Source/Core/FSTFirestoreClient.mm
Expand Up @@ -99,8 +99,7 @@ @interface FSTFirestoreClient () {

- (instancetype)initWithDatabaseInfo:(const DatabaseInfo &)databaseInfo
settings:(const Settings &)settings
credentialsProvider:(std::shared_ptr<CredentialsProvider>)
credentialsProvider // no passing ownership
credentialsProvider:(std::shared_ptr<CredentialsProvider>)credentialsProvider
userExecutor:(std::shared_ptr<Executor>)userExecutor
workerQueue:(std::shared_ptr<AsyncQueue>)queue NS_DESIGNATED_INITIALIZER;

Expand Down Expand Up @@ -148,8 +147,7 @@ - (bool)isShutdown {

+ (instancetype)clientWithDatabaseInfo:(const DatabaseInfo &)databaseInfo
settings:(const Settings &)settings
credentialsProvider:(std::shared_ptr<CredentialsProvider>)
credentialsProvider // no passing ownership
credentialsProvider:(std::shared_ptr<CredentialsProvider>)credentialsProvider
userExecutor:(std::shared_ptr<Executor>)userExecutor
workerQueue:(std::shared_ptr<AsyncQueue>)workerQueue {
return [[FSTFirestoreClient alloc] initWithDatabaseInfo:databaseInfo
Expand All @@ -161,8 +159,7 @@ + (instancetype)clientWithDatabaseInfo:(const DatabaseInfo &)databaseInfo

- (instancetype)initWithDatabaseInfo:(const DatabaseInfo &)databaseInfo
settings:(const Settings &)settings
credentialsProvider:(std::shared_ptr<CredentialsProvider>)
credentialsProvider // no passing ownership
credentialsProvider:(std::shared_ptr<CredentialsProvider>)credentialsProvider
userExecutor:(std::shared_ptr<Executor>)userExecutor
workerQueue:(std::shared_ptr<AsyncQueue>)workerQueue {
if (self = [super init]) {
Expand Down Expand Up @@ -318,8 +315,9 @@ - (void)shutdownWithCallback:(util::StatusCallback)callback {
_remoteStore->Shutdown();
[self.persistence shutdown];
});

// This separate enqueue ensures if shutdown is called multiple times
// every time the callback is triggered. If it is in the previous
// every time the callback is triggered. If it is in the above
// enqueue, it might not get executed because after first shutdown
// all operations are not executed.
_workerQueue->EnqueueEvenAfterShutdown([self, callback] {
Expand Down
Expand Up @@ -99,6 +99,10 @@ explicit ConnectivityMonitorApple(
return;
}

// Reachability events are fairly infrequent, we avoid using dispatch queue
// from `work_queue` and use main queue here to avoid leaking implemetation
// details of `work_queue`. The callback itself is still executed on
// `work_queue`.
success = SCNetworkReachabilitySetDispatchQueue(reachability_,
dispatch_get_main_queue());
if (!success) {
Expand Down
4 changes: 2 additions & 2 deletions Firestore/core/src/firebase/firestore/remote/datastore.h
Expand Up @@ -76,7 +76,7 @@ class Datastore : public std::enable_shared_from_this<Datastore> {

Datastore(const core::DatabaseInfo& database_info,
const std::shared_ptr<util::AsyncQueue>& worker_queue,
const std::shared_ptr<auth::CredentialsProvider> credentials);
std::shared_ptr<auth::CredentialsProvider> credentials);

virtual ~Datastore() {
}
Expand Down Expand Up @@ -142,7 +142,7 @@ class Datastore : public std::enable_shared_from_this<Datastore> {
/** Test-only constructor */
Datastore(const core::DatabaseInfo& database_info,
const std::shared_ptr<util::AsyncQueue>& worker_queue,
const std::shared_ptr<auth::CredentialsProvider> credentials,
std::shared_ptr<auth::CredentialsProvider> credentials,
std::unique_ptr<ConnectivityMonitor> connectivity_monitor);

/** Test-only method */
Expand Down
6 changes: 3 additions & 3 deletions Firestore/core/src/firebase/firestore/remote/datastore.mm
Expand Up @@ -91,17 +91,17 @@ void LogGrpcCallFinished(absl::string_view rpc_name,

Datastore::Datastore(const DatabaseInfo& database_info,
const std::shared_ptr<AsyncQueue>& worker_queue,
const std::shared_ptr<CredentialsProvider> credentials)
std::shared_ptr<CredentialsProvider> credentials)
: Datastore{database_info, worker_queue, credentials,
ConnectivityMonitor::Create(worker_queue)} {
}

Datastore::Datastore(const DatabaseInfo& database_info,
const std::shared_ptr<AsyncQueue>& worker_queue,
const std::shared_ptr<CredentialsProvider> credentials,
std::shared_ptr<CredentialsProvider> credentials,
std::unique_ptr<ConnectivityMonitor> connectivity_monitor)
: worker_queue_{NOT_NULL(worker_queue)},
credentials_{credentials},
credentials_{std::move(credentials)},
rpc_executor_{CreateExecutor()},
connectivity_monitor_{std::move(connectivity_monitor)},
grpc_connection_{database_info, worker_queue, &grpc_queue_,
Expand Down
2 changes: 1 addition & 1 deletion Firestore/core/src/firebase/firestore/remote/stream.h
Expand Up @@ -120,7 +120,7 @@ class Stream : public GrpcStreamObserver,
};

Stream(const std::shared_ptr<util::AsyncQueue>& worker_queue,
const std::shared_ptr<auth::CredentialsProvider> credentials_provider,
std::shared_ptr<auth::CredentialsProvider> credentials_provider,
GrpcConnection* grpc_connection,
util::TimerId backoff_timer_id,
util::TimerId idle_timer_id);
Expand Down
4 changes: 2 additions & 2 deletions Firestore/core/src/firebase/firestore/remote/stream.mm
Expand Up @@ -53,13 +53,13 @@
} // namespace

Stream::Stream(const std::shared_ptr<AsyncQueue>& worker_queue,
const std::shared_ptr<CredentialsProvider> credentials_provider,
std::shared_ptr<CredentialsProvider> credentials_provider,
GrpcConnection* grpc_connection,
TimerId backoff_timer_id,
TimerId idle_timer_id)
: backoff_{worker_queue, backoff_timer_id, kBackoffFactor,
kBackoffInitialDelay, kBackoffMaxDelay},
credentials_provider_{credentials_provider},
credentials_provider_{std::move(credentials_provider)},
worker_queue_{worker_queue},
grpc_connection_{grpc_connection},
idle_timer_id_{idle_timer_id} {
Expand Down
11 changes: 5 additions & 6 deletions Firestore/core/src/firebase/firestore/remote/watch_stream.h
Expand Up @@ -81,12 +81,11 @@ class WatchStreamCallback {
*/
class WatchStream : public Stream {
public:
WatchStream(
const std::shared_ptr<util::AsyncQueue>& async_queue,
const std::shared_ptr<auth::CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WatchStreamCallback* callback);
WatchStream(const std::shared_ptr<util::AsyncQueue>& async_queue,
std::shared_ptr<auth::CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WatchStreamCallback* callback);

/**
* Registers interest in the results of the given query. If the query includes
Expand Down
Expand Up @@ -37,7 +37,7 @@

WatchStream::WatchStream(
const std::shared_ptr<AsyncQueue>& async_queue,
const std::shared_ptr<CredentialsProvider> credentials_provider,
std::shared_ptr<CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WatchStreamCallback* callback)
Expand Down
11 changes: 5 additions & 6 deletions Firestore/core/src/firebase/firestore/remote/write_stream.h
Expand Up @@ -99,12 +99,11 @@ class WriteStreamCallback {
*/
class WriteStream : public Stream {
public:
WriteStream(
const std::shared_ptr<util::AsyncQueue>& async_queue,
const std::shared_ptr<auth::CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WriteStreamCallback* callback);
WriteStream(const std::shared_ptr<util::AsyncQueue>& async_queue,
std::shared_ptr<auth::CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WriteStreamCallback* callback);

void SetLastStreamToken(NSData* token);
/**
Expand Down
Expand Up @@ -36,7 +36,7 @@

WriteStream::WriteStream(
const std::shared_ptr<AsyncQueue>& async_queue,
const std::shared_ptr<CredentialsProvider> credentials_provider,
std::shared_ptr<CredentialsProvider> credentials_provider,
FSTSerializerBeta* serializer,
GrpcConnection* grpc_connection,
WriteStreamCallback* callback)
Expand Down
11 changes: 5 additions & 6 deletions Firestore/core/src/firebase/firestore/util/async_queue.cc
Expand Up @@ -28,7 +28,6 @@ namespace util {
AsyncQueue::AsyncQueue(std::unique_ptr<Executor> executor)
: executor_{std::move(executor)} {
is_operation_in_progress_ = false;
is_shutting_down_ = false;
}

// TODO(varconst): assert in destructor that the queue is empty.
Expand All @@ -50,10 +49,10 @@ void AsyncQueue::VerifyIsCurrentQueue() const {
}

void AsyncQueue::ExecuteBlocking(const Operation& operation) {
// This is not guarded by `shut_down_mutex_` and `is_shutting_down_`
// because it is the execution of the operation, not scheduling. Checking
// `is_shutting_down_` here would mean *all* operations will not run after
// shutdown, which is not expected.
// This is not guarded by `is_shutting_down_` because it is the execution
// of the operation, not scheduling. Checking `is_shutting_down_` here
// would mean *all* operations will not run after shutdown, which is not
// intended.
VerifyIsCurrentExecutor();
HARD_ASSERT(!is_operation_in_progress_,
"ExecuteBlocking may not be called "
Expand Down Expand Up @@ -84,7 +83,7 @@ void AsyncQueue::EnqueueEvenAfterShutdown(const Operation& operation) {
executor_->Execute(Wrap(operation));
}

bool AsyncQueue::is_shutting_down() {
bool AsyncQueue::is_shutting_down() const {
return is_shutting_down_;
}

Expand Down
12 changes: 5 additions & 7 deletions Firestore/core/src/firebase/firestore/util/async_queue.h
Expand Up @@ -98,27 +98,25 @@ class AsyncQueue {
// destroyed may invoke `Enqueue`).
//
// After the shutdown process has initiated (`is_shutting_down()` is true),
// calling `Enqueue` will return, but the operation will *NOT* be run.
// calling `Enqueue` is a no-op.
void Enqueue(const Operation& operation);

// Like `Enqueue`, but also starts the shutdown process. Once the shutdown
// process has started, this queue will not run any operations requested
// via `Enqueue*` methods. They will not report error, they will simply
// return with the requested operations not scheduled.
// process has started, calling any Enqueue* methods becomes a no-op
//
// The exception is `EnqueueEvenAfterShutdown`, operations requsted via
// this will still be scheduled.
void EnqueueAndInitializeShutdown(const Operation& operation);

// Like `Enqueue`, but it will proceed scheduling the requested operation
// regardless if the queue is shut down or not.
// regardless of whether the queue is shut down or not.
void EnqueueEvenAfterShutdown(const Operation& operation);

// Like `Enqueue`, but without applying any prerequisite checks.
void EnqueueRelaxed(const Operation& operation);

// Whether the queue has initiated its shutdown process.
bool is_shutting_down();
bool is_shutting_down() const;

// Puts the `operation` on the queue to be executed `delay` milliseconds from
// now, and returns a handle that allows to cancel the operation (provided it
Expand Down Expand Up @@ -185,7 +183,7 @@ class AsyncQueue {
std::atomic<bool> is_operation_in_progress_;
std::unique_ptr<Executor> executor_;

bool is_shutting_down_;
bool is_shutting_down_ = false;
mutable std::mutex shut_down_mutex_;
};

Expand Down

0 comments on commit 3277402

Please sign in to comment.