forked from codex-digital/cypher-stream
/
CypherStream.js
85 lines (67 loc) · 1.95 KB
/
CypherStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
'use strict';
var $ = require('highland');
var neo4j = require('neo4j-driver').v1;
var normalize = require('./util/normalize-query-statement');
var observableToStream = require('./util/observable-to-stream');
var R = require('ramda');
var Readable = require('stream').Readable;
var toNative = require('./util/to-native');
var compose = R.compose;
var cond = R.cond;
var curry = R.curry;
var has = R.has;
var map = R.map;
var prop = R.prop;
// var tap = R.tap;
// var log = tap(console.log.bind(console));
// session => statement => observable
var run = curry((runner, statement) =>
runner.run(statement.statement, statement.parameters)
);
var runStream = curry(compose(observableToStream, run));
var emitError = R.curry((stream, error) =>
stream.emit('error', error)
);
var handleNeo4jError = emit => compose(
map(compose(
emit,
error => new neo4j.Neo4jError(error.message, error.code)
)),
prop('fields')
);
var handleError = emit => cond([
[isNeo4jError, handleNeo4jError(emit)],
[R.T, emit]
]);
var isNeo4jError = R.has('fields');
class CypherStream extends Readable {
constructor(runner, statements, options) {
super({ objectMode: true });
this.statements = statements;
this.runner = runner;
this.options = options || {};
this.start();
}
start() {
$([this.statements])
.flatMap(normalize)
.filter(has('statement'))
.flatMap(statement => {
var stream = runStream(this.runner, statement);
if('neo4j' !== this.options.returnType) {
stream = stream.map(toNative);
}
if(statement.callback) {
statement.callback(stream.observe());
}
return stream;
})
.errors(handleError(emitError(this)))
.doto(x => this.push(x))
.on('end', () => this.push(null))
.resume()
;
}
_read() {}
}
module.exports = CypherStream;