Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

[feature] postgres: Added primitive support for transactions and curs…

…ors.
  • Loading branch information...
commit 1c8b82ae4b82ae881d2736ef144bc0030ffae02a 1 parent 767d18a
@nrs135 nrs135 authored
Showing with 170 additions and 22 deletions.
  1. +170 −22 lib/stdlib/apis/postgres/postgres.opa
View
192 lib/stdlib/apis/postgres/postgres.opa
@@ -116,6 +116,7 @@ type Postgres.failure =
/ { bad_format : int }
/ { bad_ssl_response : string }
/ { bad_type : string }
+ / { sql : string }
/ { no_key }
/ { not_found }
@@ -180,27 +181,28 @@ type Postgres.rows = (list(Postgres.rowdesc),list(Postgres.row))
/** A Postgres driver connection object.
*/
type Postgres.connection = {
- name : string /** A name for the connection */
- secure : option(SSL.secure_type) /** Optional SSL security information */
- ssl_accepted : bool /** The server has accepted an SSL connection */
- conn : ApigenLib.connection /** The underlying ApigenLib connection object */
- major_version : int /** The major version for the protocol */
- minor_version : int /** The minor version for the protocol */
- dbase : string /** The name of the database to connected to */
- params : stringmap(string) /** Map of parameter values returned during authentication */
- processid : int /** The processid for the connection at the server */
- secret_key : int /** The connection's secret data (used in cancel requests) */
- query : string /** A note of the last query command */
- status : string /** The last received status code from the server */
- suspended : bool /** Whether an execution was suspended or not */
- error : option(Postgres.failure) /** The last received error value (driver internal) */
- empty : bool /** Whether the last query returned an empty reply */
- completed : list(string) /** List of [CommandComplete] messages received */
- rows : int /** The number of rows received during a query */
- rowdescs : Postgres.rowdescs /** Stored value of the last row description received */
- paramdescs : list(int) /** List of the last-received parameter descriptions */
- handlers : intmap((string,OpaType.ty,Postgres.abstract_handler)) /** Handlers for unknown data types */
- backhandlers : stringmap(int) /** Reverse map for outgoing data */
+ name : string /** A name for the connection */
+ secure : option(SSL.secure_type) /** Optional SSL security information */
+ ssl_accepted : bool /** The server has accepted an SSL connection */
+ conn : ApigenLib.connection /** The underlying ApigenLib connection object */
+ major_version : int /** The major version for the protocol */
+ minor_version : int /** The minor version for the protocol */
+ dbase : string /** The name of the database to connected to */
+ params : stringmap(string) /** Map of parameter values returned during authentication */
+ processid : int /** The processid for the connection at the server */
+ secret_key : int /** The connection's secret data (used in cancel requests) */
+ query : string /** A note of the last query command */
+ status : string /** The last received status code from the server */
+ suspended : bool /** Whether an execution was suspended or not */
+ in_transaction : bool /** Set during a block (not for external queries) */
+ error : option(Postgres.failure) /** The last received error value (driver internal) */
+ empty : bool /** Whether the last query returned an empty reply */
+ completed : list(string) /** List of [CommandComplete] messages received */
+ rows : int /** The number of rows received during a query */
+ rowdescs : Postgres.rowdescs /** Stored value of the last row description received */
+ paramdescs : list(int) /** List of the last-received parameter descriptions */
+ handlers : intmap((string,OpaType.ty,Postgres.abstract_handler)) /** Handlers for unknown data types */
+ backhandlers : stringmap(int) /** Reverse map for outgoing data */
}
/** Defines whether an operation is for a prepared statement or a portal */
@@ -217,6 +219,12 @@ type Postgres.listener_def = {
on_notice : option(Postgres.connection, Postgres.msg -> void)
}
+/** Cursor direction flags. */
+type Postgres.cursor_direction = {forward} / {backward}
+
+/** Cursor amount indicator. */
+type Postgres.cursor_amount = {num:int} / {all} / {next} / {prior}
+
/**
* {1 Interface}
*/
@@ -263,7 +271,7 @@ Postgres = {{
{success={ ~name ~secure ~conn ~dbase ssl_accepted=false
major_version=default_major_version minor_version=default_minor_version
params=StringMap.empty processid=-1 secret_key=-1
- query="" status="" suspended=false error=none
+ query="" status="" suspended=false in_transaction=false error=none
empty=true completed=[] paramdescs=[] rows=0 rowdescs=[]
handlers=IntMap.empty backhandlers=StringMap.empty }}
| {~failure} -> {~failure}
@@ -807,6 +815,15 @@ Postgres = {{
end
| _ -> {failure=(conn,{bad_type="create_enum: bad type {raw_ty}"})}
+ /** Fold a function over a list, with connection.
+ *
+ * Fold a function f(conn, element) for each element in a list.
+ *
+ * @param f The function to fold.
+ * @param conn The connection object.
+ * @param l The list of elements.
+ * @returns The final result or the first error.
+ */
fold(f:Postgres.connection, 'a -> Postgres.result, conn:Postgres.connection, l:list('a)) : Postgres.result =
rec aux(conn, l) =
match l with
@@ -819,6 +836,19 @@ Postgres = {{
end
aux(conn, l)
+ /** Create a table on the PostgreSQL server for a given type instantiating all enum types.
+ *
+ * A table is created to host the given value type (must be a record).
+ * If there are any enum-suitable types in the record, a handler will
+ * be generated and installed in the connection.
+ *
+ * @param conn The connection object.
+ * @param dbase The name of the database.
+ * @param temp Whether the table is temporary or not.
+ * @param value An instance of the type to be used (not added to the table).
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
create_table(conn:Postgres.connection, dbase:string, temp:bool, value:'a, k:Postgres.listener) : Postgres.connection =
enums = PostgresTypes.record_enum_types(@typeval('a))
do jlog("enums:{enums}")
@@ -830,15 +860,133 @@ Postgres = {{
| {failure=(conn,failure)} -> error(conn,failure,k)
end
+ /** Insert a row into a database.
+ *
+ * @param conn The connection object.
+ * @param dbase The name of the database.
+ * @param value The value to be inserted.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
insert(conn:Postgres.connection, dbase:string, value:'a, k:Postgres.listener) : Postgres.connection =
query(conn,PostgresTypes.insert(conn, dbase, value),k)
+ /** Update rows in a database. Sets individual fields only.
+ *
+ * @param conn The connection object.
+ * @param dbase The name of the database.
+ * @param value A value representing the fields to be updated.
+ * @param select A value representing which fields select the data to be updated.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
update(conn:Postgres.connection, dbase:string, value:'a, select:'b, k:Postgres.listener) : Postgres.connection =
query(conn,PostgresTypes.update(conn, dbase, value, select),k)
+ /** Delete a value from a database.
+ *
+ * @param conn The connection object.
+ * @param dbase The name of the database.
+ * @param select A value representing which fields select the rows to be deleted.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
delete(conn:Postgres.connection, dbase:string, select:'b, k:Postgres.listener) : Postgres.connection =
query(conn,PostgresTypes.delete(conn, dbase, select),k)
+ /** Open a transaction block.
+ *
+ * @param conn The connection object.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ begin(conn:Postgres.connection, k:Postgres.listener) : Postgres.connection =
+ {query(conn,"BEGIN",k) with in_transaction=true}
+
+ /** Close a transaction block.
+ *
+ * @param conn The connection object.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ commit(conn:Postgres.connection, k:Postgres.listener) : Postgres.connection =
+ {query(conn,"COMMIT",k) with in_transaction=false}
+
+ /** Roll back a transaction.
+ *
+ * @param conn The connection object.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ rollback(conn:Postgres.connection, k:Postgres.listener) : Postgres.connection =
+ {query(conn,"ROLLBACK",k) with in_transaction=false}
+
+ /** Declare a cursor.
+ *
+ * @param conn The connection object.
+ * @param binary Whether the data should be binary or not.
+ * @param name A name for the cursor.
+ * @param query The query string for the cursor.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ declare_cursor(conn:Postgres.connection, binary:bool, name:string, query:string, k:Postgres.listener) : Postgres.connection =
+ if conn.in_transaction
+ then Postgres.query(conn,"DECLARE {name} {if binary then "BINARY " else ""}CURSOR FOR {query}",k)
+ else error(conn, {sql="Cannot declare cursor outside of transaction"}, k)
+
+ @private string_of_cursor_direction(cd:option(Postgres.cursor_direction)) : string =
+ match cd with
+ | {some={forward}} -> "FORWARD "
+ | {some={backward}} -> "BACKWARD "
+ | {none} -> ""
+
+ @private string_of_cursor_amount(ca:option(Postgres.cursor_amount)) : string =
+ match ca with
+ | {some={~num}} -> Int.to_string(num)^" "
+ | {some={all}} -> "ALL "
+ | {some={next}} -> "NEXT "
+ | {some={prior}} -> "PRIOR "
+ | {none} -> ""
+
+ /** Fetch rows from a cursor.
+ *
+ * @param conn The connection object.
+ * @param name A name for the cursor.
+ * @param direction Optional direction flag (default: forward).
+ * @param amount Optional number of rows to return (default: all).
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ fetch(conn:Postgres.connection, name:string,
+ direction:option(Postgres.cursor_direction), amount:option(Postgres.cursor_amount),
+ k:Postgres.listener) : Postgres.connection =
+ query(conn,"FETCH {string_of_cursor_direction(direction)}{string_of_cursor_amount(amount)}FROM {name}",k)
+
+ /** Move a cursor pointer.
+ *
+ * @param conn The connection object.
+ * @param name A name for the cursor.
+ * @param direction Optional direction flag (default: forward).
+ * @param amount Optional number of rows to move (default: all).
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ move(conn:Postgres.connection, name:string,
+ direction:option(Postgres.cursor_direction), amount:option(Postgres.cursor_amount),
+ k:Postgres.listener) : Postgres.connection =
+ query(conn,"MOVE {string_of_cursor_direction(direction)}{string_of_cursor_amount(amount)}IN {name}",k)
+
+ /** Close and destroy a cursor.
+ *
+ * @param conn The connection object.
+ * @param name A name for the cursor.
+ * @param k The listener.
+ * @returns An updated connection object.
+ */
+ close_cursor(conn:Postgres.connection, name:string, k:Postgres.listener) : Postgres.connection =
+ query(conn,"CLOSE {name}",k)
+
}}
// End of file postgres.opa
Please sign in to comment.
Something went wrong with that request. Please try again.