Skip to content

DbClientPg

do- edited this page Mar 2, 2025 · 74 revisions

DbClientPg is a DbClient descendant for PostgreSQL: a wrapper above pg for using in doix Jobs.

Methods

See parent class

General

invoke (name, params)

Calls a PostgreSQL function or a user defined DbFunction / DbProcedure (found by name). For DbProcedure, returns a DbCall instance; otherwise, the scalar value returned by the function.

Data fetching

getStream (q, p, options)

This inherited method has special support for the PostgreSQL COPY statement. Unlike for SELECT, getStream ('COPY ... TO STDOUT') returns a pg-copy-streams' to binary mode stream.

peek (queue)

For a given DbViewQueuePg, this asynchronous method returns the first record in queue.order, or null if the underlying SQL view is empty. This is a superposition of getObject and genPeekSql.

Data manipulation

insertRecord (tableName, record, options)

This inherited method supports an extra option:

Name Type Description
onlyIfMissing Boolean If true, the INSERT statement is appended with the ON CONFLICT DO NOTHING clause, so in case of any unique key violation no data is changed at all. (Note: the ON CONFLICT DO UPDATE feature is available through the use of the upsert method)
result String Type of the expected return value (see below)

result options

Name Default Return value
call unless onlyIfMissing The DbCall instance, as per the inherited implementation
status when onlyIfMissing true if a new record was inserted, false otherwise
record The record just created, if any, undefined otherwise

upsert (tableName, record, options)

This asynchronous method asserts that there exists a record in a given table with given values.

The record may be created or updated to the new state. Technically, an INSERT ... ON CONFLICT ... DO UPDATE statement is executed. (Note: the ON CONFLICT DO NOTHING feature is available through the use of the insert method with the onlyIfMissing option).

await db.upsert (
  'user_options', 
   {
     id_user: 1, 
     id_option: 10, 
     value: true.
   }, 
   {key: [
     'id_user', 
     'id_option',
   ]}
)

The 1st argument is the name of a DbRelation (most probably DbTable) in a db.model.

The 2nd agrument is the data object: the record to be stored. Normally, its properties must correspond by name to the columns. Unknown properties are silently ignored. And so are undefined properties -- unlike null valued fields that do appear in the generated SQL (and may cause errors when set to NOT NULL columns).

The 3rd agrument is the object containing the key option: the array of column names to be mentioned in the ON CONFLICT clause.

putObjectStream (tableName, columns, options)

This asynchronous method returns an instance of DbCsvPrinter piped into the putBinaryStream (tableName, columns, options) result. So, this is a writable object stream ready to accept records to be inserted into tableName. Only columns will be written, other records' fields are ignored.

putBinaryStream (tableName, columns, options)

This asynchronous method returns a binary Writable stream corresponding to a COPY FROM STDIN statement.

The result is an instance of a class provided by pg-copy-streams.

const {db} = this, os = await db.putStream (
  'payments'
  , ['article', 'amount'] // mandatory
  , {                     // optional; names are case insensitive
//  FORMAT: 'text',
//  FREEZE: false,
//  DELIMITER: '\t',
//  NULL '',
//  HEADER: false,
//  QUOTE: '"',
//  ESCAPE: '\',
//  FORCE_QUOTE: ['article'], // '*'
//  FORCE_NOT_NULL: ['article'],
//  FORCE_NULL: ['article'],
//  ENCODING: 'utf-8'    
  }
)

await new Promise ((ok, fail) => {

  os.on ('error', fail)
  os.on ('finish', ok)

  fs.createReadStream ('/tmp/payments.txt').pipe (os)

}

Transaction support

isAutoCommit ()

Returns the boolean value indicating whether txn is not set.

begin ()

This asynchronous method executes BEGIN and sets the txn property to {}.

commit ()

With txn previously set, this asynchronous method executes COMMIT and sets the txn property to null. Otherwise, does nothing. Called automatically by release () unless this.job.error is set.

rollback ()

With txn previously set, this asynchronous method executes ROLLBACK and sets the txn property to null. Otherwise, does nothing. Called automatically by release () if this.job.error is set.

Pool management

release ()

This asynchronous method is called automatically at Job's finish event, returns the raw connection to the pool.

If there is an active statement, it gets interrupted with terminate ().

Otherwise, unless isAutoCommit () is true on enter, either commit () or rollback () is called prior to releasing, based on this.job.error.

terminate ()

This asynchronous method ends the backing PostgreSQL session with pg_terminate_backend and removes the current connection from the pool.

Special

getStreamOfExistingTables ()

This asynchronous method fetches the list of all tables of a currently connected database as a Readable stream of DbTable objects.

Events

Name Description
released emitted after a successful release call
Clone this wiki locally