Skip to content

Commit

Permalink
- Support for stored procedures in JDBC_PING2
Browse files Browse the repository at this point in the history
- Sample configs for Postgres and MySql
  • Loading branch information
belaban committed May 8, 2024
1 parent e393ee6 commit 9d352fd
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 13 deletions.
63 changes: 63 additions & 0 deletions conf/jdbc-mysql.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<!--
JDBC_PING2 for MySql
-->
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
<TCP
bind_addr="${jgroups.bind_addr:match-address:192.168.1.110}"
bind_port="${jgroups.bind_port:7800}"
port_range="50"
recv_buf_size="150000"
send_buf_size="640000"
sock_conn_timeout="300"
thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="50"
thread_pool.keep_alive_time="60000"
/>

<!-- Comment/remove attributes 'stored_procedures' and 'insert_sp' if you don't want to use stored procedures -->
<JDBC_PING2
connection_driver="com.mysql.cj.jdbc.Driver"
connection_url="jdbc:mysql://localhost/test"
connection_username="bela"
connection_password="password"
remove_all_data_on_view_change="true"
register_shutdown_hook="true"
return_entire_cache="false"

stored_procedures="CREATE PROCEDURE deleteAndInsert
(IN addr varchar(200), IN name varchar(200), IN cluster varchar(200),
IN ip varchar(200), IN coord boolean)
BEGIN
DELETE FROM jgroups WHERE address = addr;
INSERT INTO jgroups VALUES (addr, name, cluster, ip, coord);
END"
insert_sp="call deleteAndInsert(?,?,?,?,?);"

/>
<MERGE3 min_interval="10000"
max_interval="30000"/>
<FD_SOCK2/>
<FD_ALL3 timeout="40000" interval="5000" />
<VERIFY_SUSPECT2 timeout="5000" />
<pbcast.NAKACK2
use_mcast_xmit="false"
xmit_interval="100"/>
<UNICAST3
xmit_interval="100"/>
<pbcast.STABLE
desired_avg_gossip="5000"
max_bytes="1000000"/>
<pbcast.GMS
print_local_addr="false"
join_timeout="1000"
max_join_attempts="1"/>
<UFC max_credits="2000000"
min_threshold="0.40"/>
<MFC max_credits="2000000"
min_threshold="0.4"/>
<FRAG3 frag_size="60000" />
<pbcast.STATE_TRANSFER/>
</config>
59 changes: 59 additions & 0 deletions conf/jdbc-pg.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
JDBC_PING2 for Postgresql
-->
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
<TCP
bind_addr="${jgroups.bind_addr:site_local}"
bind_port="${jgroups.bind_port:7800}"
port_range="50"
recv_buf_size="150000"
send_buf_size="640000"
sock_conn_timeout="300"
/>

<!-- Comment/remove attributes 'stored_procedures' and 'insert_sp' if you don't want to use stored procedures -->
<JDBC_PING2
connection_driver="org.postgresql.Driver"
connection_url="jdbc:postgresql://localhost:5432/bela"
connection_username="bela"
connection_password="secret"
remove_all_data_on_view_change="true"
register_shutdown_hook="true"
return_entire_cache="false"

stored_procedures="CREATE PROCEDURE deleteAndInsert
(addr varchar(200), name varchar(200), cluster varchar(200), ip varchar(200), coord boolean)
LANGUAGE SQL
BEGIN ATOMIC
DELETE from jgroups where address = addr;
INSERT INTO jgroups VALUES(addr, name, cluster, ip, coord);
END"
insert_sp="call deleteAndInsert(?,?,?,?,?);"

/>
<MERGE3 min_interval="10000"
max_interval="30000"/>
<FD_SOCK2/>
<FD_ALL3 timeout="40000" interval="5000" />
<VERIFY_SUSPECT2 />
<pbcast.NAKACK2
use_mcast_xmit="false"
xmit_interval="500"/>
<UNICAST3
xmit_interval="500"/>
<pbcast.STABLE
desired_avg_gossip="5000"
max_bytes="1000000"/>
<pbcast.GMS
print_local_addr="false"
join_timeout="1000"
max_join_attempts="1"/>
<UFC max_credits="2M"
min_threshold="0.40"/>
<MFC max_credits="2M"
min_threshold="0.4"/>
<FRAG3 frag_size="60000" />
<pbcast.STATE_TRANSFER/>
</config>
83 changes: 70 additions & 13 deletions src/org/jgroups/protocols/JDBC_PING2.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,23 @@ public class JDBC_PING2 extends FILE_PING {
"coord boolean, " +
"PRIMARY KEY (address) )";

@Property(description="List of stored procedure definitions to be created after schema initialization. The list " +
"needs to be separated by stored_proc_separator")
protected String stored_procedures;

@Property(description="The separator to separate stored procedures")
protected String stored_proc_separator="||";

@Property(description="A stored procedure which deletes an existing row and inserts a new one. Used only if " +
"non-null (as an optimization of calling delete, then insert (1 SQL statement instead of 2). Needs to accept " +
"address (varchar), name (varchar), cluster (varchar), ip (varchar) and coord (boolean")
protected String insert_sp;

@Property(description="SQL used to insert a new row")
protected String insert_single_sql="INSERT INTO " + table_name + " values (?, ?, ?, ?, ?)";

@Property(description="SQL used to delete a row")
protected String delete_single_sql="DELETE FROM " + table_name + " WHERE address=? AND cluster=?";
protected String delete_single_sql="DELETE FROM " + table_name + " WHERE address=?";

@Property(description="SQL to clear the table")
protected String clear_sql="DELETE from " + table_name + " WHERE cluster=?";
Expand Down Expand Up @@ -98,8 +110,14 @@ protected void createRootDir() {
public JDBC_PING2 setConnectionDriver(String c) {this.connection_driver=c; return this;}
public String getInitializeSql() {return initialize_sql;}
public JDBC_PING2 setInitializeSql(String i) {this.initialize_sql=i; return this;}
public String getStoredProcedures() {return stored_procedures;}
public JDBC_PING2 setStoredProcedures(String s) {stored_procedures=s; return this;}
public String getStoredProcSeparator() {return stored_proc_separator;}
public JDBC_PING2 setStoredProcSeparator(String s) {stored_proc_separator=s; return this;}
public String getInsertSingleSql() {return insert_single_sql;}
public JDBC_PING2 setInsertSingleSql(String i) {this.insert_single_sql=i; return this;}
public String getInsertSp() {return insert_sp;}
public JDBC_PING2 setInsertSp(String sp) {insert_sp=sp; return this;}
public String getDeleteSingleSql() {return delete_single_sql;}
public JDBC_PING2 setDeleteSingleSql(String d) {this.delete_single_sql=d; return this;}
public String getClearSql() {return clear_sql;}
Expand Down Expand Up @@ -132,7 +150,8 @@ public void init() throws Exception {
loadDriver();
}
}
attemptSchemaInitialization();
createSchema();
createStoredProcedures();
}


Expand All @@ -157,9 +176,12 @@ protected void write(List<PingData> list, String clustername) {
// additional configuration and testing on each database. See JGRP-1440
protected synchronized void writeToDB(PingData data, String clustername) throws SQLException {
try(Connection connection=getConnection()) {
// todo: make a stored procedure
delete(connection, clustername, data.getAddress());
insert(connection, data, clustername, data.getAddress());
if(insert_sp != null)
insertSp(connection, data, clustername);
else {
delete(connection, clustername, data.getAddress());
insert(connection, data, clustername);
}
}
}

Expand Down Expand Up @@ -232,14 +254,13 @@ protected static PreparedStatement prepare(final Connection conn, final String s
}
}

protected void attemptSchemaInitialization() {
protected void createSchema() {
if(initialize_sql == null) {
log.debug("Table creation step skipped: initialize_sql attribute is missing");
return;
}
try(Connection conn=getConnection(); PreparedStatement ps=conn.prepareStatement(initialize_sql)) {
if(log.isTraceEnabled())
log.trace("SQL for initializing schema: %s", ps);
log.trace("SQL for initializing schema: %s", ps);
ps.execute();
log.debug("table " + table_name + " created for JDBC_PING discovery protocol");
}
Expand All @@ -248,10 +269,27 @@ protected void attemptSchemaInitialization() {
"To suppress this message, set initialize_sql to an empty value. Cause: %s", e.getMessage());
}
}
protected void createStoredProcedures() throws SQLException {
if(stored_procedures == null)
return;
List<String> stored_procs=Util.parseStringList(stored_procedures, stored_proc_separator);
try(Connection conn=getConnection()) {
for(String stored_proc: stored_procs) {
try(PreparedStatement ps=conn.prepareStatement(stored_proc)) {
log.trace("attempting to create stored procedure %s", stored_proc);
ps.execute();
log.debug("successfully created stored procedure %s", stored_proc);
}
catch(SQLException ex) {
log.debug("failed creating stored procedure %s (probably already exists): %s", stored_proc, ex.getMessage());
}
}
}
}

protected void loadDriver() {
assertNonNull("connection_driver", connection_driver);
log.debug("registering JDBC driver %s", connection_driver);
log.debug("loading JDBC driver %s", connection_driver);
try {
Util.loadClass(connection_driver, this.getClass().getClassLoader());
}
Expand All @@ -272,8 +310,28 @@ protected Connection getConnection() throws SQLException {
DriverManager.getConnection(connection_url, connection_username, connection_password);
}

protected synchronized void insert(Connection connection, PingData data, String clustername, Address address) throws SQLException {
protected synchronized void insert(Connection connection, PingData data, String clustername) throws SQLException {
try(PreparedStatement ps=connection.prepareStatement(insert_single_sql)) {
Address address=data.getAddress();
String addr=Util.addressToString(address);
String name=address instanceof SiteUUID? ((SiteUUID)address).getName() : NameCache.get(address);
IpAddress ip_addr=(IpAddress)data.getPhysicalAddr();
String ip=ip_addr.toString();
ps.setString(1, addr);
ps.setString(2, name);
ps.setString(3, clustername);
ps.setString(4, ip);
ps.setBoolean(5, data.isCoord());
if(log.isTraceEnabled())
log.trace("%s: SQL for insertion: %s", local_addr, ps);
ps.executeUpdate();
log.debug("inserted %s for cluster %s", address, clustername);
}
}

protected synchronized void insertSp(Connection connection, PingData data, String clustername) throws SQLException {
try(PreparedStatement ps=connection.prepareStatement(insert_sp)) {
Address address=data.getAddress();
String addr=Util.addressToString(address);
String name=address instanceof SiteUUID? ((SiteUUID)address).getName() : NameCache.get(address);
IpAddress ip_addr=(IpAddress)data.getPhysicalAddr();
Expand All @@ -286,19 +344,18 @@ protected synchronized void insert(Connection connection, PingData data, String
if(log.isTraceEnabled())
log.trace("%s: SQL for insertion: %s", local_addr, ps);
ps.executeUpdate();
log.debug("Inserted %s for cluster %s into database", address, clustername);
log.debug("inserted (via stored proc) %s for cluster %s", address, clustername);
}
}

protected synchronized void delete(Connection conn, String clustername, Address addressToDelete) throws SQLException {
try(PreparedStatement ps=conn.prepareStatement(delete_single_sql)) {
String addr=Util.addressToString(addressToDelete);
ps.setString(1, addr);
ps.setString(2, clustername);
if(log.isTraceEnabled())
log.trace("%s: SQL for deletion: %s", local_addr, ps);
ps.executeUpdate();
log.debug("Removed %s for cluster %s from database", addressToDelete, clustername);
log.debug("removed %s for cluster %s from database", addressToDelete, clustername);
}
}

Expand Down

0 comments on commit 9d352fd

Please sign in to comment.