Skip to content

Commit

Permalink
feat: Vanilla Postgres Driver PubSub Impl
Browse files Browse the repository at this point in the history
ref: #26019
  • Loading branch information
wezell committed Sep 7, 2023
1 parent 3a24869 commit e36b561
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions dotCMS/src/main/java/com/dotcms/dotpubsub/JDBCPubSubImpl.java
Expand Up @@ -23,24 +23,34 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;


/**
* Provides notifications for the postgres pub/sub connection.
* The secret sauce is that it borrows 1 DB connection and keeps it open forever.
* With this one long term connection, you "listen" to the topics on
* postgres and continually do a "SELECT 1" in that connection to be notified
* of any new messages to those topics.
*/
public class JDBCPubSubImpl implements DotPubSubProvider {

private enum RUNSTATE {
STOPPED, STARTED
}

static final String PG_NOTIFY_SQL = "SELECT pg_notify(?,?)";
private static final String PG_NOTIFY_SQL = "SELECT pg_notify(?,?)";
private final String serverId;

private static final int KILL_ON_FAILURES = Config.getIntProperty("PGLISTENER_KILL_ON_FAILURES", 100);
private static final int SLEEP_BETWEEN_RUNS = Config.getIntProperty("PGLISTENER_SLEEP_BETWEEN_RUNS", 500);

/**
* provides db connection information for the postgres pub/sub connection
*/

private PGListener internalListener;

/**
* creates a long term listener that holds the connection open
* and constantly polls for new incoming messages. If the listener
* errors out, it will reconnect and resubscribe to those messages
* @return
*/
private PGListener listener() {

if (internalListener != null && internalListener.isListening()) {
Expand Down Expand Up @@ -75,23 +85,15 @@ private PGListener listener() {
@Override
public DotPubSubProvider start() {

if (DbConnectionFactory.isPostgres()) {
listener();
int numberOfServers = Try.of(() -> APILocator.getServerAPI().getAliveServers().size()).getOrElse(1);
Logger.info(JDBCPubSubImpl.class, () -> "Starting JDBCPubSubImpl. Have servers:" + numberOfServers);
listener();
Logger.info(JDBCPubSubImpl.class, () -> "Starting to listen for Postgres notifications.");

try {
for (DotPubSubTopic topic : topicMap.values()) {
subscribeToTopicSQL(topic.getKey().toString());
}
} catch (Exception e) {
Logger.warnAndDebug(getClass(), e);
}
} else {
Logger.debug(this, "JDBCPubSubImpl only runs on Postgres, for: " + DbConnectionFactory.getDBType() +
", use another implementation.");

for (DotPubSubTopic topic : topicMap.values()) {
subscribeToTopicSQL(topic.getKey().toString());
}


return this;
}

Expand All @@ -116,18 +118,17 @@ class PGListener extends Thread {

private RUNSTATE runstate = RUNSTATE.STARTED;
private final Set<String> topics = ConcurrentHashMap.newKeySet();

private final Lazy<Connection> connection = Lazy.of(() -> Try.of(() -> DbConnectionFactory.getDataSource().getConnection()).getOrElseThrow(DotRuntimeException::new));

private final Lazy<PGConnection> pgConnection = Lazy.of(() -> Try.of(() -> connection.get().unwrap(PGConnection.class)).getOrElseThrow(DotRuntimeException::new));

private final Pattern validTopicRegEx = Pattern.compile("[a-z0-9_]");

PGListener() {
//init our db connections
// init our db connection. The pgConnection opens the underlying
// db connection
pgConnection.get();
}

final Pattern validTopicRegEx = Pattern.compile("[a-z0-9_]");


private long failures = 0;

Expand All @@ -136,8 +137,8 @@ boolean subscribeTopic(String topic) {
return false;
}

// topic is checked against an alphanumeric regex to prevent
// SQL Injection
// topic is checked against an alphanumeric
// regex to prevent SQL Injection
if (!validTopicRegEx.matcher(topic).find()) {
throw new DotRuntimeException("Invalid Topic Name:" + topic + ". Must match pattern" + validTopicRegEx);
}
Expand Down Expand Up @@ -217,14 +218,18 @@ public void runInternal() {
}
}

// throws an error if failures grow > KILL_ON_FAILURES
private void logFailure(Throwable e) {
Logger.warn(JDBCPubSubImpl.class, e.getMessage());
if (++failures > KILL_ON_FAILURES) {
Logger.fatal(JDBCPubSubImpl.class, "PGListener failed " + KILL_ON_FAILURES + " times. Dieing", e);
Logger.fatal(JDBCPubSubImpl.class, "PGListener failed " + KILL_ON_FAILURES + " times. Dying", e);
throw new DotRuntimeException(e);
}
}

/**
* decodes and dispatches incoming events
*/
private int notify(PGNotification[] notifications) {
if (notifications == null) {
return 0;
Expand Down Expand Up @@ -264,9 +269,6 @@ private void subscribeToTopicSQL(@NotNull String topic) {






@Override
public DotPubSubProvider subscribe(DotPubSubTopic topic) {
this.topicMap.put(topic.getKey().toString().toLowerCase(), topic);
Expand Down

0 comments on commit e36b561

Please sign in to comment.