diff --git a/README.md b/README.md index 96830f61..e70cd8eb 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => { The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging documentation][doc-logging] for more information. +## WebSockets + +You can use websocket as transport. But Cassandra doesn't support this protocol +so some proxy should be deployed in front of Cassandra, which can handle this transport protocol. + +```javascript + const client = new cassandra.Client({ + transport: 'WebSocket', + contactPoints: [ + // some proxies that support websocket transport + '127.0.0.1:9043', + 'localhost:9044' + ], + webSocketOptions: { + // some client websocket options + protocolVersion: 13, + ... + } +}); +``` + +You can configure your websocket client with `webSocketOptions`. +To properly configure it follow [websocket/ws doc][ws-doc]. + +You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`. + ## Compatibility The driver supports all versions of Node.js, Cassandra, and DSE that are not EOL at the time of release. Only LTS eligible branches (i.e. even numbered releases) are supported for Node.js. See the [project documentation] for more information about the Node.js release cycle. @@ -296,4 +322,5 @@ Unless required by applicable law or agreed to in writing, software distributed [streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable [cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts [dse]: https://www.datastax.com/products/datastax-enterprise -[astra]: https://www.datastax.com/products/datastax-astra \ No newline at end of file +[astra]: https://www.datastax.com/products/datastax-astra +[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options \ No newline at end of file diff --git a/index.d.ts b/index.d.ts index cf44d76e..994f246b 100644 --- a/index.d.ts +++ b/index.d.ts @@ -24,6 +24,7 @@ import { metrics } from './lib/metrics'; import { tracker } from './lib/tracker'; import { metadata } from './lib/metadata'; import { datastax } from './lib/datastax/'; +import { ClientRequestArgs } from 'http'; import Long = types.Long; import Uuid = types.Uuid; import graph = datastax.graph; @@ -191,7 +192,11 @@ export interface ExecutionOptions { setHints(hints: string[]): void; } +export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs) + & {protocols?: string | string[] | undefined}; + export interface ClientOptions { + transport?: 'SecureWebSocket' | 'WebSocket' | undefined contactPoints?: string[]; localDataCenter?: string; keyspace?: string; @@ -253,6 +258,7 @@ export interface ClientOptions { tcpNoDelay?: boolean; }; sslOptions?: tls.ConnectionOptions; + webSocketOptions?: WebSocketClientOptions; } export interface QueryOptions { diff --git a/lib/connection.js b/lib/connection.js index 84382872..de52feb2 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack'); const OperationState = require('./operation-state'); const promiseUtils = require('./promise-utils'); const { ExecutionOptions } = require('./execution-options'); +const { WebSocketWrapper } = require('./websocket'); /** * Represents a connection to a Cassandra node @@ -171,30 +172,70 @@ class Connection extends events.EventEmitter { const self = this; this.log('info', `Connecting to ${this.endpointFriendlyName}`); - if (!this.options.sslOptions) { - this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold }); - this.netClient.connect(this.port, this.address, function connectCallback() { - self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`); - self.bindSocketListeners(); - self.startup(callback); - }); - } - else { - // Use TLS - const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions); + if (this.options.transport) { + if (this.options.transport.toLowerCase() === 'securewebsocket') { + // Use secure WebSocket + const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport }, + this.options.webSocketOptions); + + if (!options.protocols) { + options.protocols = ['cql']; + } + + this.netClient = new WebSocketWrapper(options); + + this.netClient.connect(this.port, this.address, function connectCallback() { + self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`); + self.bindSocketListeners(); + self.startup(callback); + }); + } else { + // Use WebSocket + const options = utils.extend({ + transport: this.options.transport, + highWaterMark: this.options.socketOptions.coalescingThreshold, + handshakeTimeout: this.options.socketOptions.connectTimeout, + }, this.options.webSocketOptions); + + if (!options.protocols) { + options.protocols = ['cql']; + } - if (this.options.sni) { - sslOptions.servername = this._serverName; + this.netClient = new WebSocketWrapper(options); + + this.netClient.connect(this.port, this.address, function connectCallback() { + self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`); + self.bindSocketListeners(); + self.startup(callback); + }); } + } else { + // Use Socket + if (!this.options.sslOptions) { + this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold }); + + this.netClient.connect(this.port, this.address, function connectCallback() { + self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`); + self.bindSocketListeners(); + self.startup(callback); + }); + } else { + // Use Socket with TLS + const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions); - this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() { - self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`); - self.bindSocketListeners(); - self.startup(callback); - }); + if (this.options.sni) { + sslOptions.servername = this._serverName; + } - // TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version) - this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold); + this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() { + self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`); + self.bindSocketListeners(); + self.startup(callback); + }); + + // TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version) + this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold); + } } this.netClient.once('error', function socketError(err) { diff --git a/lib/websocket.js b/lib/websocket.js new file mode 100644 index 00000000..8102121b --- /dev/null +++ b/lib/websocket.js @@ -0,0 +1,91 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const { EventEmitter } = require('events'); +const { WebSocket } = require('ws'); + +/** + * WebSocketWrapper is a wrapper on the `ws.Websocket` which implements + * `net.Socket` interface to be used by the `cassandra.Connection` + */ +class WebSocketWrapper extends EventEmitter { + /** + * Creates a websocket wrapper instance. To connect use `connect` method + * @param {object} options client options for a websocket + */ + constructor(options) { + super(); + this.options = options; + } + + /** + * Creates an instance of a websocket and connects + * @param {String} port + * @param {String} address + * @param {() => void} connectionCallback is called when connection is successfully established + * @returns {WebSocketWrapper} wrapper itself + */ + connect(port, address, connectionCallback) { + const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws'; + + this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options); + + if (connectionCallback) { + this.ws.on('open', connectionCallback); + } + + const stream = WebSocket.createWebSocketStream(this.ws, this.options); + + stream.on('error', err => { + this.emit('error', err); + }); + stream.on('drain', () => { + this.emit('drain'); + }); + stream.on('close', () => { + this.emit('close'); + }); + stream.on('end', () => { + this.emit('end'); + }); + + this.write = stream.write.bind(stream); + this.pipe = stream.pipe.bind(stream); + this.end = stream.end.bind(stream); + this.destroy = stream.destroy.bind(stream); + + return this; + } + + /** + * It is not implemented because `ws` lib doesn't provide API to work with + */ + setTimeout() {} + + /** + * It is not implemented because `ws` lib doesn't provide API to work with + */ + setKeepAlive() {} + + /** + * It is not implemented because `ws` lib doesn't provide API to work with + */ + setNoDelay() {} +} + +module.exports.WebSocketWrapper = WebSocketWrapper; diff --git a/package-lock.json b/package-lock.json index c30bacf4..079c269a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,22 +9,23 @@ "version": "4.7.2", "license": "Apache-2.0", "dependencies": { - "@types/long": "^5.0.0", + "@types/long": "~5.0.0", "@types/node": ">=8", - "adm-zip": "^0.5.10", - "long": "^5.2.3" + "adm-zip": "~0.5.10", + "long": "~5.2.3", + "ws": "^8.16.0" }, "devDependencies": { - "chai": "^4.3.8", - "kerberos": "^2.0.3", - "mocha": "^10.2.0", - "mocha-jenkins-reporter": "^0.4.8", + "chai": "~4.3.8", + "kerberos": "~2.0.3", + "mocha": "~10.2.0", + "mocha-jenkins-reporter": "~0.4.8", "proxyquire": "~2.1.3", - "sinon": "^15.2.0", + "sinon": "~15.2.0", "temp": ">= 0.8.3" }, "engines": { - "node": ">=8" + "node": ">=16" } }, "node_modules/@sinonjs/commons": { @@ -1747,6 +1748,26 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", "dev": true }, + "node_modules/ws": { + "version": "8.16.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz", + "integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xml": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/xml/-/xml-1.0.1.tgz", diff --git a/package.json b/package.json index f4464be5..aaa42721 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,8 @@ "@types/long": "~5.0.0", "@types/node": ">=8", "adm-zip": "~0.5.10", - "long": "~5.2.3" + "long": "~5.2.3", + "ws": "^8.16.0" }, "devDependencies": { "chai": "~4.3.8",