diff --git a/conf/jdbc-mysql.xml b/conf/jdbc-mysql.xml new file mode 100644 index 0000000000..0dfa106350 --- /dev/null +++ b/conf/jdbc-mysql.xml @@ -0,0 +1,63 @@ + + + + + + + + + + + + + + + + + + + diff --git a/conf/jdbc-pg.xml b/conf/jdbc-pg.xml new file mode 100644 index 0000000000..3f42602d98 --- /dev/null +++ b/conf/jdbc-pg.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + + + + + + + + + diff --git a/src/org/jgroups/protocols/JDBC_PING2.java b/src/org/jgroups/protocols/JDBC_PING2.java index 81945d27bb..7f1da16eb3 100644 --- a/src/org/jgroups/protocols/JDBC_PING2.java +++ b/src/org/jgroups/protocols/JDBC_PING2.java @@ -55,11 +55,23 @@ public class JDBC_PING2 extends FILE_PING { "coord boolean, " + "PRIMARY KEY (address) )"; + @Property(description="List of stored procedure definitions to be created after schema initialization. The list " + + "needs to be separated by stored_proc_separator") + protected String stored_procedures; + + @Property(description="The separator to separate stored procedures") + protected String stored_proc_separator="||"; + + @Property(description="A stored procedure which deletes an existing row and inserts a new one. Used only if " + + "non-null (as an optimization of calling delete, then insert (1 SQL statement instead of 2). Needs to accept " + + "address (varchar), name (varchar), cluster (varchar), ip (varchar) and coord (boolean") + protected String insert_sp; + @Property(description="SQL used to insert a new row") protected String insert_single_sql="INSERT INTO " + table_name + " values (?, ?, ?, ?, ?)"; @Property(description="SQL used to delete a row") - protected String delete_single_sql="DELETE FROM " + table_name + " WHERE address=? AND cluster=?"; + protected String delete_single_sql="DELETE FROM " + table_name + " WHERE address=?"; @Property(description="SQL to clear the table") protected String clear_sql="DELETE from " + table_name + " WHERE cluster=?"; @@ -98,8 +110,14 @@ protected void createRootDir() { public JDBC_PING2 setConnectionDriver(String c) {this.connection_driver=c; return this;} public String getInitializeSql() {return initialize_sql;} public JDBC_PING2 setInitializeSql(String i) {this.initialize_sql=i; return this;} + public String getStoredProcedures() {return stored_procedures;} + public JDBC_PING2 setStoredProcedures(String s) {stored_procedures=s; return this;} + public String getStoredProcSeparator() {return stored_proc_separator;} + public JDBC_PING2 setStoredProcSeparator(String s) {stored_proc_separator=s; return this;} public String getInsertSingleSql() {return insert_single_sql;} public JDBC_PING2 setInsertSingleSql(String i) {this.insert_single_sql=i; return this;} + public String getInsertSp() {return insert_sp;} + public JDBC_PING2 setInsertSp(String sp) {insert_sp=sp; return this;} public String getDeleteSingleSql() {return delete_single_sql;} public JDBC_PING2 setDeleteSingleSql(String d) {this.delete_single_sql=d; return this;} public String getClearSql() {return clear_sql;} @@ -132,7 +150,8 @@ public void init() throws Exception { loadDriver(); } } - attemptSchemaInitialization(); + createSchema(); + createStoredProcedures(); } @@ -157,9 +176,12 @@ protected void write(List list, String clustername) { // additional configuration and testing on each database. See JGRP-1440 protected synchronized void writeToDB(PingData data, String clustername) throws SQLException { try(Connection connection=getConnection()) { - // todo: make a stored procedure - delete(connection, clustername, data.getAddress()); - insert(connection, data, clustername, data.getAddress()); + if(insert_sp != null) + insertSp(connection, data, clustername); + else { + delete(connection, clustername, data.getAddress()); + insert(connection, data, clustername); + } } } @@ -232,14 +254,13 @@ protected static PreparedStatement prepare(final Connection conn, final String s } } - protected void attemptSchemaInitialization() { + protected void createSchema() { if(initialize_sql == null) { log.debug("Table creation step skipped: initialize_sql attribute is missing"); return; } try(Connection conn=getConnection(); PreparedStatement ps=conn.prepareStatement(initialize_sql)) { - if(log.isTraceEnabled()) - log.trace("SQL for initializing schema: %s", ps); + log.trace("SQL for initializing schema: %s", ps); ps.execute(); log.debug("table " + table_name + " created for JDBC_PING discovery protocol"); } @@ -248,10 +269,27 @@ protected void attemptSchemaInitialization() { "To suppress this message, set initialize_sql to an empty value. Cause: %s", e.getMessage()); } } + protected void createStoredProcedures() throws SQLException { + if(stored_procedures == null) + return; + List stored_procs=Util.parseStringList(stored_procedures, stored_proc_separator); + try(Connection conn=getConnection()) { + for(String stored_proc: stored_procs) { + try(PreparedStatement ps=conn.prepareStatement(stored_proc)) { + log.trace("attempting to create stored procedure %s", stored_proc); + ps.execute(); + log.debug("successfully created stored procedure %s", stored_proc); + } + catch(SQLException ex) { + log.debug("failed creating stored procedure %s (probably already exists): %s", stored_proc, ex.getMessage()); + } + } + } + } protected void loadDriver() { assertNonNull("connection_driver", connection_driver); - log.debug("registering JDBC driver %s", connection_driver); + log.debug("loading JDBC driver %s", connection_driver); try { Util.loadClass(connection_driver, this.getClass().getClassLoader()); } @@ -272,8 +310,28 @@ protected Connection getConnection() throws SQLException { DriverManager.getConnection(connection_url, connection_username, connection_password); } - protected synchronized void insert(Connection connection, PingData data, String clustername, Address address) throws SQLException { + protected synchronized void insert(Connection connection, PingData data, String clustername) throws SQLException { try(PreparedStatement ps=connection.prepareStatement(insert_single_sql)) { + Address address=data.getAddress(); + String addr=Util.addressToString(address); + String name=address instanceof SiteUUID? ((SiteUUID)address).getName() : NameCache.get(address); + IpAddress ip_addr=(IpAddress)data.getPhysicalAddr(); + String ip=ip_addr.toString(); + ps.setString(1, addr); + ps.setString(2, name); + ps.setString(3, clustername); + ps.setString(4, ip); + ps.setBoolean(5, data.isCoord()); + if(log.isTraceEnabled()) + log.trace("%s: SQL for insertion: %s", local_addr, ps); + ps.executeUpdate(); + log.debug("inserted %s for cluster %s", address, clustername); + } + } + + protected synchronized void insertSp(Connection connection, PingData data, String clustername) throws SQLException { + try(PreparedStatement ps=connection.prepareStatement(insert_sp)) { + Address address=data.getAddress(); String addr=Util.addressToString(address); String name=address instanceof SiteUUID? ((SiteUUID)address).getName() : NameCache.get(address); IpAddress ip_addr=(IpAddress)data.getPhysicalAddr(); @@ -286,7 +344,7 @@ protected synchronized void insert(Connection connection, PingData data, String if(log.isTraceEnabled()) log.trace("%s: SQL for insertion: %s", local_addr, ps); ps.executeUpdate(); - log.debug("Inserted %s for cluster %s into database", address, clustername); + log.debug("inserted (via stored proc) %s for cluster %s", address, clustername); } } @@ -294,11 +352,10 @@ protected synchronized void delete(Connection conn, String clustername, Address try(PreparedStatement ps=conn.prepareStatement(delete_single_sql)) { String addr=Util.addressToString(addressToDelete); ps.setString(1, addr); - ps.setString(2, clustername); if(log.isTraceEnabled()) log.trace("%s: SQL for deletion: %s", local_addr, ps); ps.executeUpdate(); - log.debug("Removed %s for cluster %s from database", addressToDelete, clustername); + log.debug("removed %s for cluster %s from database", addressToDelete, clustername); } }