Skip to content
This repository has been archived by the owner on Dec 19, 2019. It is now read-only.

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
brianc committed Oct 7, 2013
0 parents commit 5d27cf2
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
116 changes: 116 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
var path = require('path')

var resultPath = path.dirname(require.resolve('pg.js')) + '/lib/result'
var Result = require(resultPath)
var Client = require('pg.js').Client

var Cursor = function(text, values) {
this.text = text
this.values = values
this._connection = null
}

Cursor.prototype._connect = function(cb) {
if(this._connected) return setImmediate(cb);
this._connected = true
var self = this
var client = new Client()
client.connect(function(err) {
if(err) return cb(err);

//remove all listeners from
//client's connection and discard the client
self.connection = client.connection
self.connection.removeAllListeners()

var con = self.connection

con.parse({
text: self.text
}, true)

con.bind({
values: self.values
}, true)

con.describe({
type: 'P',
name: '' //use unamed portal
}, true)

con.flush()

var onError = function(err) {
cb(err)
con.end()
}

con.once('error', onError)

con.on('rowDescription', function(msg) {
self.rowDescription = msg
con.removeListener('error', onError)
cb(null, con)
})

var onRow = function(msg) {
var row = self.result.parseRow(msg.fields)
self.result.addRow(row)
}

con.on('dataRow', onRow)

con.once('readyForQuery', function() {
con.end()
})

con.once('commandComplete', function() {
self._complete = true
con.sync()
})
})
}

Cursor.prototype._getRows = function(con, n, cb) {
if(this._done) {
return cb(null, [], false)
}
var msg = {
portal: '',
rows: n
}
con.execute(msg, true)
con.flush()
this.result = new Result()
this.result.addFields(this.rowDescription.fields)

var self = this

var onComplete = function() {
self._done = true
cb(null, self.result.rows, self.result)
}
con.once('commandComplete', onComplete)

con.once('portalSuspended', function() {
cb(null, self.result.rows, self.result)
con.removeListener('commandComplete', onComplete)
})
}

Cursor.prototype.end = function(cb) {
this.connection.end()
this.connection.stream.once('end', cb)
}

Cursor.prototype.read = function(rows, cb) {
var self = this
this._connect(function(err) {
if(err) return cb(err);
self._getRows(self.connection, rows, cb)
})
}

module.exports = function(query, params) {
return new Cursor(query, params)
}
20 changes: 20 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "node-pg-cursor",
"version": "0.0.0",
"description": "",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "BSD",
"devDependencies": {
"gonna": "0.0.0"
},
"dependencies": {
"pg.js": "~2.7.0"
}
}
94 changes: 94 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
var assert = require('assert')
var pgCursor = require('../')
var gonna = require('gonna')

var text = 'SELECT generate_series as num FROM generate_series(0, 5)'
var values = []

var test = function(name, fn, timeout) {
timeout = timeout || 1000
var done = gonna(name, timeout, function(err) {
console.log(name)
assert.ifError(err)
})
fn(done)
}

test('fetch 6 when asking for 10', function(done) {
var cursor = pgCursor(text)
cursor.read(10, function(err, res) {
assert.ifError(err)
assert.equal(res.length, 6)
done()
})
})

test('end before reading to end', function(done) {
var cursor = pgCursor(text)
cursor.read(3, function(err, res) {
assert.equal(res.length, 3)
cursor.end(done)
})
})

test('callback with error', function(done) {
var cursor = pgCursor('select asdfasdf')
cursor.read(1, function(err) {
assert(err)
done()
})
})


test('read a partial chunk of data', function(done) {
var cursor = pgCursor(text)
cursor.read(2, function(err, res) {
assert.equal(res.length, 2)
cursor.read(3, function(err, res) {
assert.equal(res.length, 3)
cursor.read(1, function(err, res) {
assert.equal(res.length, 1)
cursor.read(1, function(err, res) {
assert.ifError(err)
assert.strictEqual(res.length, 0)
done()
})
})
})
})
})

test('read return length 0 past the end', function(done) {
var cursor = pgCursor(text)
cursor.read(2, function(err, res) {
cursor.read(100, function(err, res) {
assert.equal(res.length, 4)
cursor.read(100, function(err, res) {
assert.equal(res.length, 0)
done()
})
})
})
})

test('read huge result', function(done) {
var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)'
var values = []
cursor = pgCursor(text, values);
var count = 0;
var more = function() {
cursor.read(1000, function(err, rows) {
if(err) return done(err);
if(!rows.length) {
assert.equal(count, 1000001)
return done()
}
count += rows.length;
if(count%100000 == 0) {
console.log(count)
}
setImmediate(more)
})
}
more()
}, 100000)

0 comments on commit 5d27cf2

Please sign in to comment.