diff --git a/dotCMS/src/main/java/com/dotcms/dotpubsub/JDBCPubSubImpl.java b/dotCMS/src/main/java/com/dotcms/dotpubsub/JDBCPubSubImpl.java index 1f4f8c665103..ea8d1bf821a7 100644 --- a/dotCMS/src/main/java/com/dotcms/dotpubsub/JDBCPubSubImpl.java +++ b/dotCMS/src/main/java/com/dotcms/dotpubsub/JDBCPubSubImpl.java @@ -23,24 +23,34 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; + +/** + * Provides notifications for the postgres pub/sub connection. + * The secret sauce is that it borrows 1 DB connection and keeps it open forever. + * With this one long term connection, you "listen" to the topics on + * postgres and continually do a "SELECT 1" in that connection to be notified + * of any new messages to those topics. + */ public class JDBCPubSubImpl implements DotPubSubProvider { private enum RUNSTATE { STOPPED, STARTED } - static final String PG_NOTIFY_SQL = "SELECT pg_notify(?,?)"; + private static final String PG_NOTIFY_SQL = "SELECT pg_notify(?,?)"; private final String serverId; private static final int KILL_ON_FAILURES = Config.getIntProperty("PGLISTENER_KILL_ON_FAILURES", 100); private static final int SLEEP_BETWEEN_RUNS = Config.getIntProperty("PGLISTENER_SLEEP_BETWEEN_RUNS", 500); - /** - * provides db connection information for the postgres pub/sub connection - */ - private PGListener internalListener; + /** + * creates a long term listener that holds the connection open + * and constantly polls for new incoming messages. If the listener + * errors out, it will reconnect and resubscribe to those messages + * @return + */ private PGListener listener() { if (internalListener != null && internalListener.isListening()) { @@ -75,23 +85,15 @@ private PGListener listener() { @Override public DotPubSubProvider start() { - if (DbConnectionFactory.isPostgres()) { - listener(); - int numberOfServers = Try.of(() -> APILocator.getServerAPI().getAliveServers().size()).getOrElse(1); - Logger.info(JDBCPubSubImpl.class, () -> "Starting JDBCPubSubImpl. Have servers:" + numberOfServers); + listener(); + Logger.info(JDBCPubSubImpl.class, () -> "Starting to listen for Postgres notifications."); - try { - for (DotPubSubTopic topic : topicMap.values()) { - subscribeToTopicSQL(topic.getKey().toString()); - } - } catch (Exception e) { - Logger.warnAndDebug(getClass(), e); - } - } else { - Logger.debug(this, "JDBCPubSubImpl only runs on Postgres, for: " + DbConnectionFactory.getDBType() + - ", use another implementation."); + + for (DotPubSubTopic topic : topicMap.values()) { + subscribeToTopicSQL(topic.getKey().toString()); } + return this; } @@ -116,18 +118,17 @@ class PGListener extends Thread { private RUNSTATE runstate = RUNSTATE.STARTED; private final Set topics = ConcurrentHashMap.newKeySet(); - private final Lazy connection = Lazy.of(() -> Try.of(() -> DbConnectionFactory.getDataSource().getConnection()).getOrElseThrow(DotRuntimeException::new)); - private final Lazy pgConnection = Lazy.of(() -> Try.of(() -> connection.get().unwrap(PGConnection.class)).getOrElseThrow(DotRuntimeException::new)); - + private final Pattern validTopicRegEx = Pattern.compile("[a-z0-9_]"); PGListener() { - //init our db connections + // init our db connection. The pgConnection opens the underlying + // db connection pgConnection.get(); } - final Pattern validTopicRegEx = Pattern.compile("[a-z0-9_]"); + private long failures = 0; @@ -136,8 +137,8 @@ boolean subscribeTopic(String topic) { return false; } - // topic is checked against an alphanumeric regex to prevent - // SQL Injection + // topic is checked against an alphanumeric + // regex to prevent SQL Injection if (!validTopicRegEx.matcher(topic).find()) { throw new DotRuntimeException("Invalid Topic Name:" + topic + ". Must match pattern" + validTopicRegEx); } @@ -217,14 +218,18 @@ public void runInternal() { } } + // throws an error if failures grow > KILL_ON_FAILURES private void logFailure(Throwable e) { Logger.warn(JDBCPubSubImpl.class, e.getMessage()); if (++failures > KILL_ON_FAILURES) { - Logger.fatal(JDBCPubSubImpl.class, "PGListener failed " + KILL_ON_FAILURES + " times. Dieing", e); + Logger.fatal(JDBCPubSubImpl.class, "PGListener failed " + KILL_ON_FAILURES + " times. Dying", e); throw new DotRuntimeException(e); } } + /** + * decodes and dispatches incoming events + */ private int notify(PGNotification[] notifications) { if (notifications == null) { return 0; @@ -264,9 +269,6 @@ private void subscribeToTopicSQL(@NotNull String topic) { - - - @Override public DotPubSubProvider subscribe(DotPubSubTopic topic) { this.topicMap.put(topic.getKey().toString().toLowerCase(), topic);