Skip to content

Commit

Permalink
Merge branch 'master' into fix-whenNothingPending
Browse files Browse the repository at this point in the history
  • Loading branch information
ericyhwang committed Apr 10, 2019
2 parents 065ed9d + 5394fa4 commit fe1b7c0
Show file tree
Hide file tree
Showing 30 changed files with 1,602 additions and 154 deletions.
98 changes: 85 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Community Provided Pub/Sub Adapters
### Listening to WebSocket connections

```js
var WebSocketJSONStream = require('websocket-json-stream');
var WebSocketJSONStream = require('@teamwork/websocket-json-stream');

// 'ws' is a websocket server connection, as passed into
// new (require('ws').Server).on('connection', ...)
Expand Down Expand Up @@ -149,17 +149,24 @@ Register a new middleware.
* `'afterSubmit'`: An operation was successfully submitted to
the database.
* `'receive'`: Received a message from a client
* `fn` _(Function(request, callback))_
* `'reply'`: About to send a non-error reply to a client message
* `fn` _(Function(context, callback))_
Call this function at the time specified by `action`.
`request` contains a subset of the following properties, as relevant for the action:
* `action`: The action this middleware is handing
* `agent`: An object corresponding to the server agent handing this client
* `req`: The HTTP request being handled
* `collection`: The collection name being handled
* `id`: The document id being handled
* `snapshots`: The retrieved snapshots for the `readSnapshots` action
* `query`: The query object being handled
* `op`: The op being handled
* `context` will always have the following properties:
* `action`: The action this middleware is hanlding
* `agent`: A reference to the server agent handling this client
* `backend`: A reference to this ShareDB backend instance
* `context` can also have additional properties, as relevant for the action:
* `collection`: The collection name being handled
* `id`: The document id being handled
* `op`: The op being handled
* `req`: HTTP request being handled, if provided to `share.listen` (for 'connect')
* `stream`: The duplex Stream provided to `share.listen` (for 'connect')
* `query`: The query object being handled (for 'query')
* `snapshots`: Array of retrieved snapshots (for 'readSnapshots')
* `data`: Received client message (for 'receive')
* `request`: Client message being replied to (for 'reply')
* `reply`: Reply to be sent to the client (for 'reply')

### Projections

Expand All @@ -182,6 +189,27 @@ share.addProjection('users_limited', 'users', { name:true, profileUrl:true });

Note that only the [JSON0 OT type](https://github.com/ottypes/json0) is supported for projections.

### Logging

By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.

Methods can be overridden by passing a [`console`-like object](https://developer.mozilla.org/en-US/docs/Web/API/console) to `logger.setMethods`:

```javascript
var ShareDB = require('sharedb');
ShareDB.logger.setMethods({
info: () => {}, // Silence info
warn: () => alerts.warn(arguments), // Forward warnings to alerting service
error: () => alerts.critical(arguments) // Remap errors to critical alerts
});
```

ShareDB only supports the following logger methods:

- `info`
- `warn`
- `error`

### Shutdown

`share.close(callback)`
Expand Down Expand Up @@ -235,7 +263,28 @@ Get a read-only snapshot of a document at the requested version.
* `id` _(String)_
ID of the snapshot
* `version` _(number) [optional]_
The version number of the desired snapshot
The version number of the desired snapshot. If `null`, the latest version is fetched.
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

`connection.fetchSnapshotByTimestamp(collection, id, timestamp, callback): void;`
Get a read-only snapshot of a document at the requested version.

* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `timestamp` _(number) [optional]_
The timestamp of the desired snapshot. The returned snapshot will be the latest snapshot before the provided timestamp. If `null`, the latest version is fetched.
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

Expand Down Expand Up @@ -267,7 +316,7 @@ Populate the fields on `doc` with a snapshot of the document from the server, an
fire events on subsequent changes.

`doc.ingestSnapshot(snapshot, callback)`
Ingest snapshot data. This data must include a version, snapshot and type. This method is generally called interally as a result of fetch or subscribe and not directly. However, it may be called directly to pass data that was transferred to the client external to the client's ShareDB connection, such as snapshot data sent along with server rendering of a webpage.
Ingest snapshot data. The `snapshot` param must include the fields `v` (doc version), `data`, and `type` (OT type). This method is generally called interally as a result of fetch or subscribe and not directly from user code. However, it may still be called directly from user code to pass data that was transferred to the client external to the client's ShareDB connection, such as snapshot data sent along with server rendering of a webpage.

`doc.destroy()`
Unsubscribe and stop firing events.
Expand Down Expand Up @@ -358,6 +407,27 @@ after a sequence of diffs are handled.
`query.on('extra', function() {...}))`
(Only fires on subscription queries) `query.extra` changed.

### Logging

By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.

Methods can be overridden by passing a [`console`-like object](https://developer.mozilla.org/en-US/docs/Web/API/console) to `logger.setMethods`

```javascript
var ShareDB = require('sharedb/lib/client');
ShareDB.logger.setMethods({
info: () => {}, // Silence info
warn: () => alerts.warn(arguments), // Forward warnings to alerting service
error: () => alerts.critical(arguments) // Remap errors to critical alerts
});
```

ShareDB only supports the following logger methods:

- `info`
- `warn`
- `error`


## Error codes

Expand Down Expand Up @@ -422,3 +492,5 @@ The `41xx` and `51xx` codes are reserved for use by ShareDB DB adapters, and the
* 5018 - Required QueryEmitter listener not assigned
* 5019 - getMilestoneSnapshot MilestoneDB method unimplemented
* 5020 - saveMilestoneSnapshot MilestoneDB method unimplemented
* 5021 - getMilestoneSnapshotAtOrBeforeTime MilestoneDB method unimplemented
* 5022 - getMilestoneSnapshotAtOrAfterTime MilestoneDB method unimplemented
2 changes: 1 addition & 1 deletion examples/textarea/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ var doc = connection.get('examples', 'textarea');
doc.subscribe(function(err) {
if (err) throw err;

var binding = new StringBinding(element, doc);
var binding = new StringBinding(element, doc, ['content']);
binding.setup();
});
2 changes: 1 addition & 1 deletion examples/textarea/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function createDoc(callback) {
doc.fetch(function(err) {
if (err) throw err;
if (doc.type === null) {
doc.create('', callback);
doc.create({ content: '' }, callback);
return;
}
callback();
Expand Down
61 changes: 41 additions & 20 deletions lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var hat = require('hat');
var util = require('./util');
var types = require('./types');
var logger = require('./logger');

/**
* Agent deserializes the wire protocol messages received from the stream and
Expand Down Expand Up @@ -47,7 +48,7 @@ module.exports = Agent;
// Close the agent with the client.
Agent.prototype.close = function(err) {
if (err) {
console.warn('Agent closed due to error', this.clientId, err.stack || err);
logger.warn('Agent closed due to error', this.clientId, err.stack || err);
}
if (this.closed) return;
// This will end the writable stream and emit 'finish'
Expand Down Expand Up @@ -95,7 +96,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
// direct request by the client
console.error('Doc subscription stream error', collection, id, data.error);
logger.error('Doc subscription stream error', collection, id, data.error);
return;
}
if (agent._isOwnOp(collection, data)) return;
Expand All @@ -104,7 +105,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
stream.on('end', function() {
// The op stream is done sending, so release its reference
var streams = agent.subscribedDocs[collection];
if (!streams) return;
if (!streams || streams[id] !== stream) return;
delete streams[id];
if (util.hasKeys(streams)) return;
delete agent.subscribedDocs[collection];
Expand Down Expand Up @@ -137,7 +138,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
// direct request by the client
console.error('Query subscription stream error', collection, query, err);
logger.error('Query subscription stream error', collection, query, err);
};

emitter.onOp = function(op) {
Expand Down Expand Up @@ -186,23 +187,29 @@ Agent.prototype._sendOps = function(collection, id, ops) {
}
};

function getReplyErrorObject(err) {
if (typeof err === 'string') {
return {
code: 4001,
message: err
};
} else {
if (err.stack) {
logger.warn(err.stack);
}
return {
code: err.code,
message: err.message
};
}
}

Agent.prototype._reply = function(request, err, message) {
var agent = this;
var backend = agent.backend;
if (err) {
if (typeof err === 'string') {
request.error = {
code: 4001,
message: err
};
} else {
if (err.stack) {
console.warn(err.stack);
}
request.error = {
code: err.code,
message: err.message
};
}
this.send(request);
request.error = getReplyErrorObject(err);
agent.send(request);
return;
}
if (!message) message = {};
Expand All @@ -216,7 +223,15 @@ Agent.prototype._reply = function(request, err, message) {
if (request.b && !message.data) message.b = request.b;
}

this.send(message);
var middlewareContext = {request: request, reply: message};
backend.trigger(backend.MIDDLEWARE_ACTIONS.reply, agent, middlewareContext, function(err) {
if (err) {
request.error = getReplyErrorObject(err);
agent.send(request);
} else {
agent.send(middlewareContext.reply);
}
});
};

// Start processing events from the stream
Expand Down Expand Up @@ -302,6 +317,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -588,3 +605,7 @@ Agent.prototype._createOp = function(request) {
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};
Loading

0 comments on commit fe1b7c0

Please sign in to comment.