Skip to content

Commit

Permalink
Query get() method for RTDB (#3812)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwski committed Nov 17, 2020
1 parent d5baaff commit 34973cd
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changeset/many-snails-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@firebase/database": minor
"firebase": minor
---

Add a `get` method for database queries that returns server result when connected
7 changes: 7 additions & 0 deletions packages/database/src/api/Query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ export class Query {
this.repo.removeEventCallbackForQuery(this, container);
}

/**
* Get the server-value for this query, or return a cached value if not connected.
*/
get(): Promise<DataSnapshot> {

This comment has been minimized.

Copy link
@Feiyang1

Feiyang1 Nov 19, 2020

Member

@IanWyszynski Can you please add it to the typings file - firebase/index.d.ts?

return this.repo.getValue(this);
}

/**
* Attaches a listener, waits for the first event, and then removes the listener
* @param {!string} eventType
Expand Down
82 changes: 81 additions & 1 deletion packages/database/src/core/PersistentConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import {
isValidFormat,
isMobileCordova,
isReactNative,
isNodeSdk
isNodeSdk,
Deferred
} from '@firebase/util';

import { error, log, logWrapper, warn, ObjectToUniqueKey } from './util/util';
Expand All @@ -44,6 +45,7 @@ import { SDK_VERSION } from './version';

const RECONNECT_MIN_DELAY = 1000;
const RECONNECT_MAX_DELAY_DEFAULT = 60 * 5 * 1000; // 5 minutes in milliseconds (Case: 1858)
const GET_CONNECT_TIMEOUT = 3 * 1000;
const RECONNECT_MAX_DELAY_FOR_ADMINS = 30 * 1000; // 30 seconds for admin clients (likely to be a backend server)
const RECONNECT_DELAY_MULTIPLIER = 1.3;
const RECONNECT_DELAY_RESET_TIMEOUT = 30000; // Reset delay back to MIN_DELAY after being connected for 30sec.
Expand Down Expand Up @@ -75,6 +77,11 @@ interface OutstandingPut {
onComplete: (a: string, b?: string) => void;
}

interface OutstandingGet {
request: object;
onComplete: (response: { [k: string]: unknown }) => void;
}

/**
* Firebase connection. Abstracts wire protocol and handles reconnecting.
*
Expand All @@ -93,7 +100,9 @@ export class PersistentConnection extends ServerActions {
Map</* queryId */ string, ListenSpec>
> = new Map();
private outstandingPuts_: OutstandingPut[] = [];
private outstandingGets_: OutstandingGet[] = [];
private outstandingPutCount_ = 0;
private outstandingGetCount_ = 0;
private onDisconnectRequestQueue_: OnDisconnectRequest[] = [];
private connected_ = false;
private reconnectDelay_ = RECONNECT_MIN_DELAY;
Expand Down Expand Up @@ -184,6 +193,57 @@ export class PersistentConnection extends ServerActions {
}
}

get(query: Query): Promise<string> {
const deferred = new Deferred<string>();
const request = {
p: query.path.toString(),
q: query.queryObject()
};
const outstandingGet = {
action: 'g',
request,
onComplete: (message: { [k: string]: unknown }) => {
const payload = message['d'] as string;
if (message['s'] === 'ok') {
this.onDataUpdate_(
request['p'],
payload,
/*isMerge*/ false,
/*tag*/ null
);
deferred.resolve(payload);
} else {
deferred.reject(payload);
}
}
};
this.outstandingGets_.push(outstandingGet);
this.outstandingGetCount_++;
const index = this.outstandingGets_.length - 1;

if (!this.connected_) {
setTimeout(() => {
const get = this.outstandingGets_[index];
if (get === undefined || outstandingGet !== get) {
return;
}
delete this.outstandingGets_[index];
this.outstandingGetCount_--;
if (this.outstandingGetCount_ === 0) {
this.outstandingGets_ = [];
}
this.log_('get ' + index + ' timed out on connection');
deferred.reject(new Error('Client is offline.'));
}, GET_CONNECT_TIMEOUT);
}

if (this.connected_) {
this.sendGet_(index);
}

return deferred.promise;
}

/**
* @inheritDoc
*/
Expand Down Expand Up @@ -221,6 +281,20 @@ export class PersistentConnection extends ServerActions {
}
}

private sendGet_(index: number) {
const get = this.outstandingGets_[index];
this.sendRequest('g', get.request, (message: { [k: string]: unknown }) => {
delete this.outstandingGets_[index];
this.outstandingGetCount_--;
if (this.outstandingGetCount_ === 0) {
this.outstandingGets_ = [];
}
if (get.onComplete) {
get.onComplete(message);
}
});
}

private sendListen_(listenSpec: ListenSpec) {
const query = listenSpec.query;
const pathString = query.path.toString();
Expand Down Expand Up @@ -950,6 +1024,12 @@ export class PersistentConnection extends ServerActions {
request.onComplete
);
}

for (let i = 0; i < this.outstandingGets_.length; i++) {
if (this.outstandingGets_[i]) {
this.sendGet_(i);
}
}
}

/**
Expand Down
44 changes: 43 additions & 1 deletion packages/database/src/core/ReadonlyRestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
* limitations under the License.
*/

import { assert, jsonEval, safeGet, querystring } from '@firebase/util';
import {
assert,
jsonEval,
safeGet,
querystring,
Deferred
} from '@firebase/util';
import { logWrapper, warn } from './util/util';

import { ServerActions } from './ServerActions';
Expand Down Expand Up @@ -139,6 +145,42 @@ export class ReadonlyRestClient extends ServerActions {
delete this.listens_[listenId];
}

get(query: Query): Promise<string> {
const queryStringParameters = query
.getQueryParams()
.toRestQueryStringParameters();

const pathString = query.path.toString();

const deferred = new Deferred<string>();

this.restRequest_(
pathString + '.json',
queryStringParameters,
(error, result) => {
let data = result;

if (error === 404) {
data = null;
error = null;
}

if (error === null) {
this.onDataUpdate_(
pathString,
data,
/*isMerge=*/ false,
/*tag=*/ null
);
deferred.resolve(data as string);
} else {
deferred.reject(new Error(data as string));
}
}
);
return deferred.promise;
}

/** @inheritDoc */
refreshAuthToken(token: string) {
// no-op since we just always call getToken.
Expand Down
66 changes: 66 additions & 0 deletions packages/database/src/core/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { ReadonlyRestClient } from './ReadonlyRestClient';
import { FirebaseApp } from '@firebase/app-types';
import { RepoInfo } from './RepoInfo';
import { Database } from '../api/Database';
import { DataSnapshot } from '../api/DataSnapshot';
import { ServerActions } from './ServerActions';
import { Query } from '../api/Query';
import { EventRegistration } from './view/EventRegistration';
Expand Down Expand Up @@ -304,6 +305,71 @@ export class Repo {
return this.nextWriteId_++;
}

/**
* The purpose of `getValue` is to return the latest known value
* satisfying `query`.
*
* If the client is connected, this method will send a request
* to the server. If the client is not connected, then either:
*
* 1. The client was once connected, but not anymore.
* 2. The client has never connected, this is the first operation
* this repo is handling.
*
* In case (1), it's possible that the client still has an active
* listener, with cached data. Since this is the latest known
* value satisfying the query, that's what getValue will return.
* If there is no cached data, `getValue` surfaces an "offline"
* error.
*
* In case (2), `getValue` will trigger a time-limited connection
* attempt. If the client is unable to connect to the server, it
* will surface an "offline" error because there cannot be any
* cached data. On the other hand, if the client is able to connect,
* `getValue` will return the server's value for the query, if one
* exists.
*
* @param query - The query to surface a value for.
*/
getValue(query: Query): Promise<DataSnapshot> {
return this.server_.get(query).then(
payload => {
const node = nodeFromJSON(payload as string);
const events = this.serverSyncTree_.applyServerOverwrite(
query.path,
node
);
this.eventQueue_.raiseEventsAtPath(query.path, events);
return Promise.resolve(
new DataSnapshot(
node,
query.getRef(),
query.getQueryParams().getIndex()
)
);
},
err => {
this.log_(
'get for query ' +
stringify(query) +
' falling back to cache after error: ' +
err
);
const cached = this.serverSyncTree_.calcCompleteEventCache(query.path);
if (!cached.isEmpty()) {
return Promise.resolve(
new DataSnapshot(
cached,
query.getRef(),
query.getQueryParams().getIndex()
)
);
}
return Promise.reject(new Error(err as string));
}
);
}

setWithPriority(
path: Path,
newVal: unknown,
Expand Down
5 changes: 5 additions & 0 deletions packages/database/src/core/ServerActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ export abstract class ServerActions {
*/
abstract unlisten(query: Query, tag: number | null): void;

/**
* Get the server value satisfying this query.
*/
abstract get(query: Query): Promise<string>;

/**
* @param {string} pathString
* @param {*} data
Expand Down

0 comments on commit 34973cd

Please sign in to comment.