Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use http2 for streaming when is nodeJS #360

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 25 additions & 13 deletions src/_http.js
Expand Up @@ -55,6 +55,27 @@ HttpClient.prototype.syncLastTxnTime = function(time) {
}
}

/**
* Get HTTP request headers
*
* @param {?Object} options The request options.
*/
HttpClient.prototype.getHeaders = function(options) {
options = util.defaults(options, {})

var secret = options.secret || this._secret
var queryTimeout = options.queryTimeout || this._queryTimeout
var headers = this._headers

headers['Authorization'] = secret && secretHeader(secret)
headers['X-FaunaDB-API-Version'] = APIVersion
headers['X-Fauna-Driver'] = 'Javascript'
headers['X-Last-Seen-Txn'] = this._lastSeen
headers['X-Query-Timeout'] = queryTimeout

return util.removeNullAndUndefinedValues(headers)
}

/**
* Executes an HTTP request.
*
Expand All @@ -71,28 +92,19 @@ HttpClient.prototype.syncLastTxnTime = function(time) {
* @returns {Promise} The response promise.
*/
HttpClient.prototype.execute = function(method, path, body, query, options) {
var url = parse(this._baseUrl)
url.set('pathname', path)
url.set('query', query)
options = util.defaults(options, {})

var signal = options.signal
var fetch = options.fetch || this._fetch
var secret = options.secret || this._secret
var queryTimeout = options.queryTimeout || this._queryTimeout

var headers = this._headers
headers['Authorization'] = secret && secretHeader(secret)
headers['X-FaunaDB-API-Version'] = APIVersion
headers['X-Fauna-Driver'] = 'Javascript'
headers['X-Last-Seen-Txn'] = this._lastSeen
headers['X-Query-Timeout'] = queryTimeout
var url = parse(this._baseUrl)
url.set('pathname', path)
url.set('query', query)

return fetch(url.href, {
agent: this._keepAliveEnabledAgent,
body: body,
signal: signal,
headers: util.removeNullAndUndefinedValues(headers),
headers: this.getHeaders(options),
method: method,
timeout: this._timeout,
})
Expand Down
39 changes: 39 additions & 0 deletions src/_http2.js
@@ -0,0 +1,39 @@
var http2 = require('http2')

function request({
url,
body,
headers,
timeout,
onError,
scheme = 'https',
} = {}) {
var method = http2.constants.HTTP2_METHOD_POST
var client = http2.connect(url)

client.on('error', onError)

var request = client.request(
{
...headers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ES5 compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think this is not.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atm indeed "Uncaught ReferenceError: http2 is not defined"

[http2.constants.HTTP2_HEADER_SCHEME]: scheme,
[http2.constants.HTTP2_HEADER_METHOD]: method,
[http2.constants.HTTP2_HEADER_PATH]: `/stream`,
'Content-Length': buffer.length,
},
{ endStream: false }
)

if (timeout) {
request.setTimeout(timeout)
}

request.setEncoding('utf8')
request.write(Buffer.from(body))

return request
}

module.exports = {
request,
}
36 changes: 25 additions & 11 deletions src/stream.js
Expand Up @@ -19,6 +19,7 @@ var http = require('./_http')
var json = require('./_json')
var q = require('./query')
var util = require('./_util')
var http2 = require('./_http2')

var DefaultEvents = ['start', 'error', 'version', 'history_rewrite']
var DocumentStreamEvents = DefaultEvents.concat(['snapshot'])
Expand Down Expand Up @@ -49,7 +50,6 @@ function StreamClient(client, expression, options, onEvent) {
this._onEvent = onEvent
this._query = q.wrap(expression)
this._urlParams = options.fields ? { fields: options.fields.join(',') } : null
this._fetch = platformCompatibleFetchOverride()
this._abort = new AbortController()
this._state = 'idle'

Expand Down Expand Up @@ -194,7 +194,7 @@ StreamClient.prototype.subscribe = function() {
function platformSpecificEventRead(response) {
try {
if (util.isNodeEnv()) {
response.body.on('data', onData).on('error', onError)
response.on('data', onData).on('error', onError)
} else {
// ATENTION: The following code is meant to run in browsers and is not
// covered by current test automation. Manual testing on major browsers
Expand All @@ -221,17 +221,31 @@ StreamClient.prototype.subscribe = function() {
}
}

self._client._http
.execute('POST', 'stream', body, self._urlParams, {
fetch: self._fetch,
signal: self._abort.signal,
if (util.isNodeEnv()) {
const http = self._client._http
const request = http2.request({
url: http._baseUrl,
scheme: http._scheme,
timeout: http._timeout,
headers: http.getHeaders(),
body,
onError,
})
.then(function(response) {
return handleResponse(response).then(function() {
platformSpecificEventRead(response, onData, onError)

platformSpecificEventRead(request)
} else {
self._client._http
.execute('POST', 'stream', body, self._urlParams, {
fetch: self._fetch,
signal: self._abort.signal,
})
})
.catch(onError)
.then(function(response) {
return handleResponse(response).then(function() {
platformSpecificEventRead(response, onData, onError)
})
})
.catch(onError)
}
}

/** Closes the stream subscription by aborting its underlying http request. */
Expand Down