Skip to content

Commit

Permalink
feat: SQLite driver should emit "connection" event when new connectio…
Browse files Browse the repository at this point in the history
…n is created
  • Loading branch information
cyjake committed Sep 13, 2021
1 parent 22a343a commit 0a77637
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 125 deletions.
82 changes: 82 additions & 0 deletions src/drivers/sqlite/connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
'use strict';

// SELECT users.id AS "users:id", ...
// => [ { users: { id, ... } } ]
function nest(rows, fields, spell) {
const { Model } = spell;
const { tableAlias } = Model;
const results = [];

for (const row of rows) {
const result = {};
const qualified = Object.keys(row).some(entry => entry.includes(':'));
for (const key in row) {
const parts = key.split(':');
const [qualifier, column] = qualified
? (parts.length > 1 ? parts : ['', key])
: [Model.attributeMap.hasOwnProperty(key) ? tableAlias : '', key];
const obj = result[qualifier] || (result[qualifier] = {});
obj[column] = row[key];
}
results.push(result);
}

return { rows: results, fields };
}

class Connection {
constructor({ client, database, mode, pool }) {
const { Database, OPEN_READWRITE, OPEN_CREATE } = client;
if (mode == null) mode = OPEN_READWRITE | OPEN_CREATE;
this.database = new Database(database, mode);
this.pool = pool;
}

async query(query, values, spell) {
const { sql, nestTables } = query.sql ? query : { sql: query };

if (/^(?:pragma|select)/i.test(sql)) {
const result = await this.all(sql, values);
if (nestTables) return nest(result.rows, result.fields, spell);
return result;
}
return await this.run(sql, values);
}

all(sql, values) {
return new Promise((resolve, reject) => {
this.database.all(sql, values, (err, rows, fields) => {
if (err) reject(err);
else resolve({ rows, fields });
});
});
}

run(sql, values) {
return new Promise((resolve, reject) => {
this.database.run(sql, values, function Leoric_sqliteRun(err) {
if (err) reject(err);
else resolve({ insertId: this.lastID, affectedRows: this.changes });
});
});
}

release() {
this.pool.releaseConnection(this);
}

async end() {
const { connections } = this.pool;
const index = connections.indexOf(this);
if (index >= 0) connections.splice(index, 1);

return await new Promise((resolve, reject) => {
this.database.close(function(err) {
if (err) reject(err);
resolve();
});
});
}
}

module.exports = Connection;
126 changes: 1 addition & 125 deletions src/drivers/sqlite/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

const EventEmitter = require('events');
const strftime = require('strftime');

const AbstractDriver = require('../abstract');
Expand All @@ -9,130 +8,7 @@ const DataTypes = require('./data_types');
const { escapeId, escape } = require('./sqlstring');
const schema = require('./schema');
const spellbook = require('./spellbook');

// SELECT users.id AS "users:id", ...
// => [ { users: { id, ... } } ]
function nest(rows, fields, spell) {
const { Model } = spell;
const { tableAlias } = Model;
const results = [];

for (const row of rows) {
const result = {};
const qualified = Object.keys(row).some(entry => entry.includes(':'));
for (const key in row) {
const parts = key.split(':');
const [qualifier, column] = qualified
? (parts.length > 1 ? parts : ['', key])
: [Model.attributeMap.hasOwnProperty(key) ? tableAlias : '', key];
const obj = result[qualifier] || (result[qualifier] = {});
obj[column] = row[key];
}
results.push(result);
}

return { rows: results, fields };
}

class Connection {
constructor({ client, database, mode, pool }) {
const { Database, OPEN_READWRITE, OPEN_CREATE } = client;
if (mode == null) mode = OPEN_READWRITE | OPEN_CREATE;
this.database = new Database(database, mode);
this.pool = pool;
}

async query(query, values, spell) {
const { sql, nestTables } = query.sql ? query : { sql: query };

if (/^(?:pragma|select)/i.test(sql)) {
const result = await this.all(sql, values);
if (nestTables) return nest(result.rows, result.fields, spell);
return result;
}
return await this.run(sql, values);
}

all(sql, values) {
return new Promise((resolve, reject) => {
this.database.all(sql, values, (err, rows, fields) => {
if (err) reject(err);
else resolve({ rows, fields });
});
});
}

run(sql, values) {
return new Promise((resolve, reject) => {
this.database.run(sql, values, function Leoric_sqliteRun(err) {
if (err) reject(err);
else resolve({ insertId: this.lastID, affectedRows: this.changes });
});
});
}

release() {
this.pool.releaseConnection(this);
}

async end() {
const { connections } = this.pool;
const index = connections.indexOf(this);
if (index >= 0) connections.splice(index, 1);

return await new Promise((resolve, reject) => {
this.database.close(function(err) {
if (err) reject(err);
resolve();
});
});
}
}

class Pool extends EventEmitter {
constructor(opts) {
super(opts);
this.options = {
connectionLimit: 10,
...opts,
client: opts.client || 'sqlite3',
};
this.client = require(this.options.client);
this.connections = [];
this.queue = [];
}

async getConnection() {
const { connections, queue, client, options } = this;
for (const connection of connections) {
if (connection.idle) {
connection.idle = false;
this.emit('acquire', connection);
return connection;
}
}
if (connections.length < options.connectionLimit) {
const connection = new Connection({ ...options, client, pool: this });
connections.push(connection);
this.emit('acquire', connection);
return connection;
}
await new Promise(resolve => queue.push(resolve));
return await this.getConnection();
}

releaseConnection(connection) {
connection.idle = true;
this.emit('release', connection);

const { queue } = this;
while (queue.length > 0) {
const task = queue.shift();
task();
}
}
}

const Pool = require('./pool');
class SqliteDriver extends AbstractDriver {
constructor(opts = {}) {
super(opts);
Expand Down
51 changes: 51 additions & 0 deletions src/drivers/sqlite/pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict';

const EventEmitter = require('events');
const Connection = require('./connection');

class Pool extends EventEmitter {
constructor(opts) {
super(opts);
this.options = {
connectionLimit: 10,
...opts,
client: opts.client || 'sqlite3',
};
this.client = require(this.options.client);
this.connections = [];
this.queue = [];
}

async getConnection() {
const { connections, queue, client, options } = this;
for (const connection of connections) {
if (connection.idle) {
connection.idle = false;
this.emit('acquire', connection);
return connection;
}
}
if (connections.length < options.connectionLimit) {
const connection = new Connection({ ...options, client, pool: this });
connections.push(connection);
this.emit('connection', connection);
this.emit('acquire', connection);
return connection;
}
await new Promise(resolve => queue.push(resolve));
return await this.getConnection();
}

releaseConnection(connection) {
connection.idle = true;
this.emit('release', connection);

const { queue } = this;
while (queue.length > 0) {
const task = queue.shift();
task();
}
}
}

module.exports = Pool;
17 changes: 17 additions & 0 deletions test/unit/drivers/sqlite/pool.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const assert = require('assert').strict;
const Pool = require('../../../../src/drivers/sqlite/pool');

describe('=> SQLite driver.pool', function() {
it('should emit connection', async function() {
const pool = new Pool({
database: '/tmp/leoric.sqlite3',
});
let result;
pool.on('connection', function(connection) {
result = connection;
});
assert.equal(await pool.getConnection(), result);
});
});

0 comments on commit 0a77637

Please sign in to comment.