Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets support #404

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => {
The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging
documentation][doc-logging] for more information.

## WebSockets

You can use websocket as transport. But Cassandra doesn't support this protocol
so some proxy should be deployed in front of Cassandra, which can handle this transport protocol.

```javascript
const client = new cassandra.Client({
transport: 'WebSocket',
contactPoints: [
// some proxies that support websocket transport
'127.0.0.1:9043',
'localhost:9044'
],
webSocketOptions: {
// some client websocket options
protocolVersion: 13,
...
}
});
```

You can configure your websocket client with `webSocketOptions`.
To properly configure it follow [websocket/ws doc][ws-doc].

You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`.

## Compatibility

The driver supports all versions of Node.js, Cassandra, and DSE that are not EOL at the time of release. Only LTS eligible branches (i.e. even numbered releases) are supported for Node.js. See the [project documentation] for more information about the Node.js release cycle.
Expand Down Expand Up @@ -296,4 +322,5 @@ Unless required by applicable law or agreed to in writing, software distributed
[streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable
[cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts
[dse]: https://www.datastax.com/products/datastax-enterprise
[astra]: https://www.datastax.com/products/datastax-astra
[astra]: https://www.datastax.com/products/datastax-astra
[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options
6 changes: 6 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { metrics } from './lib/metrics';
import { tracker } from './lib/tracker';
import { metadata } from './lib/metadata';
import { datastax } from './lib/datastax/';
import { ClientRequestArgs } from 'http';
import Long = types.Long;
import Uuid = types.Uuid;
import graph = datastax.graph;
Expand Down Expand Up @@ -191,7 +192,11 @@ export interface ExecutionOptions {
setHints(hints: string[]): void;
}

export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs)
& {protocols?: string | string[] | undefined};

export interface ClientOptions {
transport?: 'SecureWebSocket' | 'WebSocket' | undefined
contactPoints?: string[];
localDataCenter?: string;
keyspace?: string;
Expand Down Expand Up @@ -253,6 +258,7 @@ export interface ClientOptions {
tcpNoDelay?: boolean;
};
sslOptions?: tls.ConnectionOptions;
webSocketOptions?: WebSocketClientOptions;
}

export interface QueryOptions {
Expand Down
81 changes: 61 additions & 20 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack');
const OperationState = require('./operation-state');
const promiseUtils = require('./promise-utils');
const { ExecutionOptions } = require('./execution-options');
const { WebSocketWrapper } = require('./websocket');

/**
* Represents a connection to a Cassandra node
Expand Down Expand Up @@ -171,30 +172,70 @@ class Connection extends events.EventEmitter {
const self = this;
this.log('info', `Connecting to ${this.endpointFriendlyName}`);

if (!this.options.sslOptions) {
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
}
else {
// Use TLS
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
if (this.options.transport) {
if (this.options.transport.toLowerCase() === 'securewebsocket') {
// Use secure WebSocket
const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport },
this.options.webSocketOptions);

if (!options.protocols) {
options.protocols = ['cql'];
}

this.netClient = new WebSocketWrapper(options);

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
} else {
// Use WebSocket
const options = utils.extend({
transport: this.options.transport,
highWaterMark: this.options.socketOptions.coalescingThreshold,
handshakeTimeout: this.options.socketOptions.connectTimeout,
}, this.options.webSocketOptions);

if (!options.protocols) {
options.protocols = ['cql'];
}

if (this.options.sni) {
sslOptions.servername = this._serverName;
this.netClient = new WebSocketWrapper(options);

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
}
} else {
// Use Socket
if (!this.options.sslOptions) {
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
} else {
// Use Socket with TLS
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);

this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
self.bindSocketListeners();
self.startup(callback);
});
if (this.options.sni) {
sslOptions.servername = this._serverName;
}

// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
self.bindSocketListeners();
self.startup(callback);
});

// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
}
}

this.netClient.once('error', function socketError(err) {
Expand Down
91 changes: 91 additions & 0 deletions lib/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const { EventEmitter } = require('events');
const { WebSocket } = require('ws');

/**
* WebSocketWrapper is a wrapper on the `ws.Websocket` which implements
* `net.Socket` interface to be used by the `cassandra.Connection`
*/
class WebSocketWrapper extends EventEmitter {
/**
* Creates a websocket wrapper instance. To connect use `connect` method
* @param {object} options client options for a websocket
*/
constructor(options) {
super();
this.options = options;
}

/**
* Creates an instance of a websocket and connects
* @param {String} port
* @param {String} address
* @param {() => void} connectionCallback is called when connection is successfully established
* @returns {WebSocketWrapper} wrapper itself
*/
connect(port, address, connectionCallback) {
const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws';

this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options);

if (connectionCallback) {
this.ws.on('open', connectionCallback);
}

const stream = WebSocket.createWebSocketStream(this.ws, this.options);

stream.on('error', err => {
this.emit('error', err);
});
stream.on('drain', () => {
this.emit('drain');
});
stream.on('close', () => {
this.emit('close');
});
stream.on('end', () => {
this.emit('end');
});

this.write = stream.write.bind(stream);
this.pipe = stream.pipe.bind(stream);
this.end = stream.end.bind(stream);
this.destroy = stream.destroy.bind(stream);

return this;
}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setTimeout() {}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setKeepAlive() {}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setNoDelay() {}
}

module.exports.WebSocketWrapper = WebSocketWrapper;
39 changes: 30 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"@types/long": "~5.0.0",
"@types/node": ">=8",
"adm-zip": "~0.5.10",
"long": "~5.2.3"
"long": "~5.2.3",
"ws": "^8.16.0"
},
"devDependencies": {
"chai": "~4.3.8",
Expand Down