Skip to content

Commit

Permalink
[feature] postgres: Added primitive support for transactions and curs…
Browse files Browse the repository at this point in the history
…ors.
  • Loading branch information
nrs135 committed Dec 7, 2012
1 parent 767d18a commit 1c8b82a
Showing 1 changed file with 170 additions and 22 deletions.
192 changes: 170 additions & 22 deletions lib/stdlib/apis/postgres/postgres.opa
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type Postgres.failure =
/ { bad_format : int } / { bad_format : int }
/ { bad_ssl_response : string } / { bad_ssl_response : string }
/ { bad_type : string } / { bad_type : string }
/ { sql : string }
/ { no_key } / { no_key }
/ { not_found } / { not_found }


Expand Down Expand Up @@ -180,27 +181,28 @@ type Postgres.rows = (list(Postgres.rowdesc),list(Postgres.row))
/** A Postgres driver connection object. /** A Postgres driver connection object.
*/ */
type Postgres.connection = { type Postgres.connection = {
name : string /** A name for the connection */ name : string /** A name for the connection */
secure : option(SSL.secure_type) /** Optional SSL security information */ secure : option(SSL.secure_type) /** Optional SSL security information */
ssl_accepted : bool /** The server has accepted an SSL connection */ ssl_accepted : bool /** The server has accepted an SSL connection */
conn : ApigenLib.connection /** The underlying ApigenLib connection object */ conn : ApigenLib.connection /** The underlying ApigenLib connection object */
major_version : int /** The major version for the protocol */ major_version : int /** The major version for the protocol */
minor_version : int /** The minor version for the protocol */ minor_version : int /** The minor version for the protocol */
dbase : string /** The name of the database to connected to */ dbase : string /** The name of the database to connected to */
params : stringmap(string) /** Map of parameter values returned during authentication */ params : stringmap(string) /** Map of parameter values returned during authentication */
processid : int /** The processid for the connection at the server */ processid : int /** The processid for the connection at the server */
secret_key : int /** The connection's secret data (used in cancel requests) */ secret_key : int /** The connection's secret data (used in cancel requests) */
query : string /** A note of the last query command */ query : string /** A note of the last query command */
status : string /** The last received status code from the server */ status : string /** The last received status code from the server */
suspended : bool /** Whether an execution was suspended or not */ suspended : bool /** Whether an execution was suspended or not */
error : option(Postgres.failure) /** The last received error value (driver internal) */ in_transaction : bool /** Set during a block (not for external queries) */
empty : bool /** Whether the last query returned an empty reply */ error : option(Postgres.failure) /** The last received error value (driver internal) */
completed : list(string) /** List of [CommandComplete] messages received */ empty : bool /** Whether the last query returned an empty reply */
rows : int /** The number of rows received during a query */ completed : list(string) /** List of [CommandComplete] messages received */
rowdescs : Postgres.rowdescs /** Stored value of the last row description received */ rows : int /** The number of rows received during a query */
paramdescs : list(int) /** List of the last-received parameter descriptions */ rowdescs : Postgres.rowdescs /** Stored value of the last row description received */
handlers : intmap((string,OpaType.ty,Postgres.abstract_handler)) /** Handlers for unknown data types */ paramdescs : list(int) /** List of the last-received parameter descriptions */
backhandlers : stringmap(int) /** Reverse map for outgoing data */ handlers : intmap((string,OpaType.ty,Postgres.abstract_handler)) /** Handlers for unknown data types */
backhandlers : stringmap(int) /** Reverse map for outgoing data */
} }


/** Defines whether an operation is for a prepared statement or a portal */ /** Defines whether an operation is for a prepared statement or a portal */
Expand All @@ -217,6 +219,12 @@ type Postgres.listener_def = {
on_notice : option(Postgres.connection, Postgres.msg -> void) on_notice : option(Postgres.connection, Postgres.msg -> void)
} }


/** Cursor direction flags. */
type Postgres.cursor_direction = {forward} / {backward}

/** Cursor amount indicator. */
type Postgres.cursor_amount = {num:int} / {all} / {next} / {prior}

/** /**
* {1 Interface} * {1 Interface}
*/ */
Expand Down Expand Up @@ -263,7 +271,7 @@ Postgres = {{
{success={ ~name ~secure ~conn ~dbase ssl_accepted=false {success={ ~name ~secure ~conn ~dbase ssl_accepted=false
major_version=default_major_version minor_version=default_minor_version major_version=default_major_version minor_version=default_minor_version
params=StringMap.empty processid=-1 secret_key=-1 params=StringMap.empty processid=-1 secret_key=-1
query="" status="" suspended=false error=none query="" status="" suspended=false in_transaction=false error=none
empty=true completed=[] paramdescs=[] rows=0 rowdescs=[] empty=true completed=[] paramdescs=[] rows=0 rowdescs=[]
handlers=IntMap.empty backhandlers=StringMap.empty }} handlers=IntMap.empty backhandlers=StringMap.empty }}
| {~failure} -> {~failure} | {~failure} -> {~failure}
Expand Down Expand Up @@ -807,6 +815,15 @@ Postgres = {{
end end
| _ -> {failure=(conn,{bad_type="create_enum: bad type {raw_ty}"})} | _ -> {failure=(conn,{bad_type="create_enum: bad type {raw_ty}"})}
/** Fold a function over a list, with connection.
*
* Fold a function f(conn, element) for each element in a list.
*
* @param f The function to fold.
* @param conn The connection object.
* @param l The list of elements.
* @returns The final result or the first error.
*/
fold(f:Postgres.connection, 'a -> Postgres.result, conn:Postgres.connection, l:list('a)) : Postgres.result = fold(f:Postgres.connection, 'a -> Postgres.result, conn:Postgres.connection, l:list('a)) : Postgres.result =
rec aux(conn, l) = rec aux(conn, l) =
match l with match l with
Expand All @@ -819,6 +836,19 @@ Postgres = {{
end end
aux(conn, l) aux(conn, l)
/** Create a table on the PostgreSQL server for a given type instantiating all enum types.
*
* A table is created to host the given value type (must be a record).
* If there are any enum-suitable types in the record, a handler will
* be generated and installed in the connection.
*
* @param conn The connection object.
* @param dbase The name of the database.
* @param temp Whether the table is temporary or not.
* @param value An instance of the type to be used (not added to the table).
* @param k The listener.
* @returns An updated connection object.
*/
create_table(conn:Postgres.connection, dbase:string, temp:bool, value:'a, k:Postgres.listener) : Postgres.connection = create_table(conn:Postgres.connection, dbase:string, temp:bool, value:'a, k:Postgres.listener) : Postgres.connection =
enums = PostgresTypes.record_enum_types(@typeval('a)) enums = PostgresTypes.record_enum_types(@typeval('a))
do jlog("enums:{enums}") do jlog("enums:{enums}")
Expand All @@ -830,15 +860,133 @@ Postgres = {{
| {failure=(conn,failure)} -> error(conn,failure,k) | {failure=(conn,failure)} -> error(conn,failure,k)
end end
/** Insert a row into a database.
*
* @param conn The connection object.
* @param dbase The name of the database.
* @param value The value to be inserted.
* @param k The listener.
* @returns An updated connection object.
*/
insert(conn:Postgres.connection, dbase:string, value:'a, k:Postgres.listener) : Postgres.connection = insert(conn:Postgres.connection, dbase:string, value:'a, k:Postgres.listener) : Postgres.connection =
query(conn,PostgresTypes.insert(conn, dbase, value),k) query(conn,PostgresTypes.insert(conn, dbase, value),k)
/** Update rows in a database. Sets individual fields only.
*
* @param conn The connection object.
* @param dbase The name of the database.
* @param value A value representing the fields to be updated.
* @param select A value representing which fields select the data to be updated.
* @param k The listener.
* @returns An updated connection object.
*/
update(conn:Postgres.connection, dbase:string, value:'a, select:'b, k:Postgres.listener) : Postgres.connection = update(conn:Postgres.connection, dbase:string, value:'a, select:'b, k:Postgres.listener) : Postgres.connection =
query(conn,PostgresTypes.update(conn, dbase, value, select),k) query(conn,PostgresTypes.update(conn, dbase, value, select),k)
/** Delete a value from a database.
*
* @param conn The connection object.
* @param dbase The name of the database.
* @param select A value representing which fields select the rows to be deleted.
* @param k The listener.
* @returns An updated connection object.
*/
delete(conn:Postgres.connection, dbase:string, select:'b, k:Postgres.listener) : Postgres.connection = delete(conn:Postgres.connection, dbase:string, select:'b, k:Postgres.listener) : Postgres.connection =
query(conn,PostgresTypes.delete(conn, dbase, select),k) query(conn,PostgresTypes.delete(conn, dbase, select),k)
/** Open a transaction block.
*
* @param conn The connection object.
* @param k The listener.
* @returns An updated connection object.
*/
begin(conn:Postgres.connection, k:Postgres.listener) : Postgres.connection =
{query(conn,"BEGIN",k) with in_transaction=true}
/** Close a transaction block.
*
* @param conn The connection object.
* @param k The listener.
* @returns An updated connection object.
*/
commit(conn:Postgres.connection, k:Postgres.listener) : Postgres.connection =
{query(conn,"COMMIT",k) with in_transaction=false}
/** Roll back a transaction.
*
* @param conn The connection object.
* @param k The listener.
* @returns An updated connection object.
*/
rollback(conn:Postgres.connection, k:Postgres.listener) : Postgres.connection =
{query(conn,"ROLLBACK",k) with in_transaction=false}
/** Declare a cursor.
*
* @param conn The connection object.
* @param binary Whether the data should be binary or not.
* @param name A name for the cursor.
* @param query The query string for the cursor.
* @param k The listener.
* @returns An updated connection object.
*/
declare_cursor(conn:Postgres.connection, binary:bool, name:string, query:string, k:Postgres.listener) : Postgres.connection =
if conn.in_transaction
then Postgres.query(conn,"DECLARE {name} {if binary then "BINARY " else ""}CURSOR FOR {query}",k)
else error(conn, {sql="Cannot declare cursor outside of transaction"}, k)
@private string_of_cursor_direction(cd:option(Postgres.cursor_direction)) : string =
match cd with
| {some={forward}} -> "FORWARD "
| {some={backward}} -> "BACKWARD "
| {none} -> ""
@private string_of_cursor_amount(ca:option(Postgres.cursor_amount)) : string =
match ca with
| {some={~num}} -> Int.to_string(num)^" "
| {some={all}} -> "ALL "
| {some={next}} -> "NEXT "
| {some={prior}} -> "PRIOR "
| {none} -> ""
/** Fetch rows from a cursor.
*
* @param conn The connection object.
* @param name A name for the cursor.
* @param direction Optional direction flag (default: forward).
* @param amount Optional number of rows to return (default: all).
* @param k The listener.
* @returns An updated connection object.
*/
fetch(conn:Postgres.connection, name:string,
direction:option(Postgres.cursor_direction), amount:option(Postgres.cursor_amount),
k:Postgres.listener) : Postgres.connection =
query(conn,"FETCH {string_of_cursor_direction(direction)}{string_of_cursor_amount(amount)}FROM {name}",k)
/** Move a cursor pointer.
*
* @param conn The connection object.
* @param name A name for the cursor.
* @param direction Optional direction flag (default: forward).
* @param amount Optional number of rows to move (default: all).
* @param k The listener.
* @returns An updated connection object.
*/
move(conn:Postgres.connection, name:string,
direction:option(Postgres.cursor_direction), amount:option(Postgres.cursor_amount),
k:Postgres.listener) : Postgres.connection =
query(conn,"MOVE {string_of_cursor_direction(direction)}{string_of_cursor_amount(amount)}IN {name}",k)
/** Close and destroy a cursor.
*
* @param conn The connection object.
* @param name A name for the cursor.
* @param k The listener.
* @returns An updated connection object.
*/
close_cursor(conn:Postgres.connection, name:string, k:Postgres.listener) : Postgres.connection =
query(conn,"CLOSE {name}",k)
}} }}
// End of file postgres.opa // End of file postgres.opa

0 comments on commit 1c8b82a

Please sign in to comment.