diff --git a/jans-core/message/pom.xml b/jans-core/message/pom.xml new file mode 100644 index 00000000000..d2cd8afdd80 --- /dev/null +++ b/jans-core/message/pom.xml @@ -0,0 +1,84 @@ + + + 4.0.0 + jans-core-message + Caches support + + + io.jans + jans-core-parent + 1.0.20 + + + + ${maven.min-version} + + + + + + src/main/resources + true + + **/*.xml + **/services/* + **/*.properties + + + + + + src/test/resources + true + + **/*.xml + **/services/* + **/*.properties + + + + + + + + + ${project.groupId} + jans-core-cache + + + ${project.groupId} + jans-orm-core + + + ${project.groupId} + jans-orm-sql + + + + + jakarta.annotation + jakarta.annotation-api + + + jakarta.enterprise + jakarta.enterprise.cdi-api + + + jakarta.inject + jakarta.inject-api + + + + redis.clients + jedis + + + + + org.testng + testng + + + + + \ No newline at end of file diff --git a/jans-core/message/src/main/java/io/jans/service/BaseMessageService.java b/jans-core/message/src/main/java/io/jans/service/BaseMessageService.java new file mode 100644 index 00000000000..9e9e0357579 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/BaseMessageService.java @@ -0,0 +1,68 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service; + +import org.slf4j.Logger; + +import io.jans.service.message.MessageInterface; +import io.jans.service.message.MessageProvider; +import io.jans.service.message.pubsub.PubSubInterface; +import jakarta.inject.Inject; + +/** + * Provides operations with messages + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public abstract class BaseMessageService implements MessageInterface { + + public static int DEFAULT_EXPIRATION = 60; + + @Inject + private Logger log; + + public void subscribe(PubSubInterface pubSubAdapter, String... channels) { + MessageProvider messageProvider = getMessageProvider(); + if (messageProvider == null) { + log.error("Message provider is invalid!"); + return; + } + + log.trace("Subscribe '{}' for channels '{}'", pubSubAdapter, channels); + messageProvider.subscribe(pubSubAdapter, channels); + } + + public void unsubscribe(PubSubInterface pubSubAdapter) { + MessageProvider messageProvider = getMessageProvider(); + if (messageProvider == null) { + log.error("Message provider is invalid!"); + return; + } + + log.trace("Unsubscribe '{}'", pubSubAdapter); + messageProvider.unsubscribe(pubSubAdapter); + } + + public boolean publish(String channel, String message) { + MessageProvider messageProvider = getMessageProvider(); + if (messageProvider == null) { + log.error("Message provider is invalid!"); + return false; + } + + if (log.isTraceEnabled()) { + log.trace("Publish '{}' to channel '{}'", message, channel); + } + + boolean result = messageProvider.publish(channel, message); + + return result; + } + + protected abstract MessageProvider getMessageProvider(); + +} diff --git a/jans-core/message/src/main/java/io/jans/service/MessageService.java b/jans-core/message/src/main/java/io/jans/service/MessageService.java new file mode 100644 index 00000000000..92e7bf13a06 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/MessageService.java @@ -0,0 +1,29 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service; + +import io.jans.service.message.MessageProvider; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +/** + * Provides operations with messages + * + * @author Yuriy Movchan Date: 2023/12/03 + */ +@ApplicationScoped +public class MessageService extends BaseMessageService { + + @Inject + private MessageProvider messageProvider; + + @Override + protected MessageProvider getMessageProvider() { + return messageProvider; + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/AbstractMessageProvider.java b/jans-core/message/src/main/java/io/jans/service/message/AbstractMessageProvider.java new file mode 100644 index 00000000000..740ffbb69ea --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/AbstractMessageProvider.java @@ -0,0 +1,22 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import java.util.concurrent.ExecutorService; + +/** + * Interface for each message provider + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public abstract class AbstractMessageProvider extends MessageProvider { + + public abstract void create(ExecutorService executorService); + + public abstract void destroy(); + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/MessageInterface.java b/jans-core/message/src/main/java/io/jans/service/message/MessageInterface.java new file mode 100644 index 00000000000..703ff55d228 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/MessageInterface.java @@ -0,0 +1,24 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import io.jans.service.message.pubsub.PubSubInterface; + +/** + * Interface for each message provider + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public interface MessageInterface { + + void subscribe(PubSubInterface pubSubAdapter, String... channels); + + void unsubscribe(PubSubInterface pubSubAdapter); + + boolean publish(String channel, String message); + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/MessageProvider.java b/jans-core/message/src/main/java/io/jans/service/message/MessageProvider.java new file mode 100644 index 00000000000..466fb9e17c4 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/MessageProvider.java @@ -0,0 +1,27 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import io.jans.service.message.model.config.MessageProviderType; + +/** + * Interface for each message provider + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public abstract class MessageProvider implements MessageInterface { + + /* + * Delegate internal connection object + */ + public abstract T getDelegate(); + + public abstract MessageProviderType getProviderType(); + + public abstract void shutdown(); + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/MessageProviderFactory.java b/jans-core/message/src/main/java/io/jans/service/message/MessageProviderFactory.java new file mode 100644 index 00000000000..c6bf7c4d91c --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/MessageProviderFactory.java @@ -0,0 +1,127 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; + +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; + +/** + * Message provider factory + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +@ApplicationScoped +public class MessageProviderFactory { + + public static final String MESSAGE_PROVIDER_THREAD_NAME = "MessageProviderThread"; + + @Inject + private Logger log; + + @Inject + private MessageConfiguration messageConfiguration; + + @Inject + @Any + private Instance instance; + + private ExecutorService executorService; + private MessageProvider messageProvider; + + @PostConstruct + public void create() { + executorService = Executors.newCachedThreadPool(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName(MESSAGE_PROVIDER_THREAD_NAME); + thread.setDaemon(true); + return thread; + } + }); + } + + @PreDestroy + public void destroy() { + shutdown(); + } + + @Produces + @ApplicationScoped + public MessageProvider getMessageProvider() { + log.debug("Started to create message provider"); + + messageProvider = getCacheProvider(messageConfiguration); + + return messageProvider; + } + + public MessageProvider getCacheProvider(MessageConfiguration messageConfiguration) { + MessageProviderType messageProviderType = messageConfiguration.getMessageProviderType(); + + // Create proxied bean + AbstractMessageProvider messageProvider = null; + switch (messageProviderType) { + case NULL: + messageProvider = instance.select(NullMessageProvider.class).get(); + break; + case REDIS: + messageProvider = instance.select(RedisMessageProvider.class).get(); + break; + case POSTGRES: + messageProvider = instance.select(PostgresMessageProvider.class).get(); + break; + } + + if (messageProvider == null) { + throw new RuntimeException( + "Failed to initialize messageProvider, messageProvider is unsupported: " + messageProviderType); + } + + messageProvider.create(executorService); + + return messageProvider; + } + + public int getActiveCount() { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + + public int getPoolSize() { + return ((ThreadPoolExecutor) executorService).getPoolSize(); + } + + public void shutdown() { + if (messageProvider != null) { + log.info("Starting message provider shutdown..."); + messageProvider.shutdown(); + messageProvider = null; + } + + if (executorService != null) { + log.info("Starting message provider thread pool shutdown..."); + executorService.shutdownNow(); + executorService = null; + } + + log.info("Successfully stopped message provider pool"); + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/NullMessageProvider.java b/jans-core/message/src/main/java/io/jans/service/message/NullMessageProvider.java new file mode 100644 index 00000000000..48655ef6400 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/NullMessageProvider.java @@ -0,0 +1,88 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; + +import io.jans.service.cache.RedisConfiguration; +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import io.jans.service.message.pubsub.PubSubInterface; +import io.jans.util.security.StringEncrypter; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +/** + * Null message provider + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +@ApplicationScoped +public class NullMessageProvider extends AbstractMessageProvider { + + @Inject + private Logger log; + + @PostConstruct + public void init() { + } + + @Override + public void create(ExecutorService executorService) { + } + + public void create(RedisConfiguration redisConfiguration) { + log.debug("Starting NullProvider messages ... configuration {}", redisConfiguration); + log.debug("NullProvider message started."); + } + + public void configure(MessageConfiguration messageConfiguration, StringEncrypter stringEncrypter) { + } + + public void setMessageConfiguration(MessageConfiguration messageConfiguration) { + } + + public boolean isConnected() { + return true; + } + + @PreDestroy + public void destroy() { + } + + @Override + public Object getDelegate() { + return null; + } + + @Override + public MessageProviderType getProviderType() { + return MessageProviderType.NULL; + } + + @Override + public void subscribe(PubSubInterface pubSub, String... channels) { + } + + @Override + public void unsubscribe(PubSubInterface pubSub) { + } + + @Override + public boolean publish(String channel, String message) { + return true; + } + + @Override + public void shutdown() { + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/PostgresMessageProvider.java b/jans-core/message/src/main/java/io/jans/service/message/PostgresMessageProvider.java new file mode 100644 index 00000000000..76fbed80940 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/PostgresMessageProvider.java @@ -0,0 +1,307 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.jans.orm.sql.operation.impl.SqConnectionProviderPool; +import io.jans.service.cache.RedisProvider; +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import io.jans.service.message.model.config.PostgresMessageConfiguration; +import io.jans.service.message.pubsub.PubSubInterface; +import io.jans.util.security.StringEncrypter; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +/** + * Postgres message provider + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +@ApplicationScoped +public class PostgresMessageProvider extends AbstractMessageProvider { + + @Inject + private Logger log; + + @Inject + private MessageConfiguration messageConfiguration; + + @Inject + private StringEncrypter stringEncrypter; + + private ConcurrentHashMap> subscibedPubSubs; + + private SqConnectionProviderPool сonnectionProviderPool; + private ExecutorService executorService; + + @PostConstruct + public void init() { + } + + @PreDestroy + public void destroy() { + log.debug("Destroying PostgresProvider"); + + shutdown(); + if (сonnectionProviderPool != null) { + сonnectionProviderPool.destroy(); + } + + log.debug("Destroyed PostgresProvider"); + } + + @Override + public void create(ExecutorService executorService) { + this.executorService = executorService; + this.subscibedPubSubs = new ConcurrentHashMap<>(); + + try { + PostgresMessageConfiguration postgresMessageConfiguration = messageConfiguration + .getPostgresMessageConfiguration(); + Properties connectionProperties = toPostgresProperties(postgresMessageConfiguration); + log.debug("Starting PostgresMessageProvider messages ... configuration {}", postgresMessageConfiguration); + + сonnectionProviderPool = new SqConnectionProviderPool(connectionProperties); + сonnectionProviderPool.create(); + if (!сonnectionProviderPool.isCreated()) { + throw new IllegalStateException( + String.format("Failed to create SQL connection pool for messaging! Result code: '%d'", + сonnectionProviderPool.getCreationResultCode())); + } + log.debug("PostgresMessageProvider message was started."); + } catch (Exception ex) { + log.error("Failed to start PostgresProvider messages", ex); + throw new IllegalStateException("Failed to create SQL connection pool for messaging!", ex); + } + } + + public void configure(MessageConfiguration messageConfiguration, StringEncrypter stringEncrypter) { + this.log = LoggerFactory.getLogger(RedisProvider.class); + this.messageConfiguration = messageConfiguration; + this.stringEncrypter = stringEncrypter; + } + + private Properties toPostgresProperties(PostgresMessageConfiguration postgresMessageConfiguration) { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty("db.schema.name", postgresMessageConfiguration.getDbSchemaName()); + connectionProperties.setProperty("connection.uri", postgresMessageConfiguration.getConnectionUri()); + connectionProperties.setProperty("auth.userName", postgresMessageConfiguration.getAuthUserName()); + + try { + String encryptedPassword = postgresMessageConfiguration.getAuthUserPassword(); + if (StringUtils.isNotBlank(encryptedPassword)) { + connectionProperties.setProperty("auth.userPassword", stringEncrypter.decrypt(encryptedPassword)); + log.trace("Decrypted Postgres password successfully."); + } + } catch (StringEncrypter.EncryptionException e) { + log.error("Error during Postgres password decryption", e); + } + + if (postgresMessageConfiguration.getConnectionPoolMaxTotal() != null) { + connectionProperties.setProperty("connection.pool.max-total", + postgresMessageConfiguration.getConnectionPoolMaxTotal().toString()); + } + + if (postgresMessageConfiguration.getConnectionPoolMaxIdle() != null) { + connectionProperties.setProperty("connection.pool.max-idle", + postgresMessageConfiguration.getConnectionPoolMaxIdle().toString()); + } + + if (postgresMessageConfiguration.getConnectionPoolMinIdle() != null) { + connectionProperties.setProperty("connection.pool.min-idle", + postgresMessageConfiguration.getConnectionPoolMinIdle().toString()); + } + + return connectionProperties; + } + + public boolean isConnected() { + return сonnectionProviderPool.isConnected(); + } + + @Override + public SqConnectionProviderPool getDelegate() { + return сonnectionProviderPool; + } + + @Override + public MessageProviderType getProviderType() { + return MessageProviderType.POSTGRES; + } + + @Override + public void subscribe(PubSubInterface pubSub, String... channels) { + log.info("Starting new thread(s) for subscribing to Postgres channels {}", Arrays.asList(channels)); + + List listeners = new ArrayList<>(); + int countChannels = 0; + for (String channel : channels) { + Connection conn = сonnectionProviderPool.getConnection(); + try { + PostgresMessageListener postgresMessageListener = new PostgresMessageListener(pubSub, conn); + postgresMessageListener.subscribe(channel); + listeners.add(postgresMessageListener); + + executorService.execute(postgresMessageListener); + pubSub.onSubscribe(channel, ++countChannels); + } catch (SQLException ex) { + log.error(String.format("Failed to subscribe to Postgres channel {}", channel)); + if (conn != null) { + try { + conn.close(); + } catch (Exception ex2) { + log.error(String.format( + "Failed to release connection after subscribe attempt to Postgres channel {}", + channel)); + } + } + throw new IllegalStateException(String.format("Failed to subscribe to Postgres channel {}", channel), + ex); + } + } + subscibedPubSubs.put(System.identityHashCode(pubSub), listeners); + } + + @Override + public void unsubscribe(PubSubInterface pubSub) { + log.info("Starting end subscription to Postgres for {}", pubSub); + + int pubSubIdentifier = System.identityHashCode(pubSub); + List listeners = subscibedPubSubs.get(pubSubIdentifier); + if (listeners == null) { + log.warn("PubSub {} in unsubscribe request is not registered", pubSub); + return; + } + + unsubscribe(listeners); + subscibedPubSubs.remove(pubSubIdentifier); + log.info("Sent request to end subscription to Postgres for {}", pubSub); + } + + private void unsubscribe(List listeners) { + for (Iterator it = listeners.iterator(); it.hasNext();) { + PostgresMessageListener listener = (PostgresMessageListener) it.next(); + try { + listener.unsubscribe(); + it.remove(); + + PubSubInterface pubSub = listener.getPubSub(); + pubSub.onUnsubscribe(listener.getChannel(), listeners.size()); + } catch (Throwable ex) { + log.error("Failed to unsubscribe for {}", listener.getPubSub()); + } + } + } + + @Override + public boolean publish(String channel, String message) { + CompletableFuture.runAsync(() -> { + try (Connection conn = сonnectionProviderPool.getConnection()) { + try (Statement stmt = conn.createStatement()) { + stmt.execute( + String.format("NOTIFY %s, '%s'", channel, Base64.encodeBase64String(message.getBytes()))); + } + } catch (SQLException ex) { + log.error("Failed to publish message to channel {}", channel, ex); + } + }); + + return true; + } + + @Override + public void shutdown() { + for (List listeners : subscibedPubSubs.values()) { + unsubscribe(listeners); + } + subscibedPubSubs.clear(); + } + + class PostgresMessageListener implements Runnable { + private PubSubInterface pubSub; + private String channel; + + private Connection conn; + private org.postgresql.PGConnection pgConn; + + private boolean active; + + PostgresMessageListener(PubSubInterface pubSub, Connection conn) throws SQLException { + this.pubSub = pubSub; + this.conn = conn; + this.pgConn = conn.unwrap(org.postgresql.PGConnection.class); + this.active = true; + } + + public PubSubInterface getPubSub() { + return pubSub; + } + + public String getChannel() { + return channel; + } + + public void subscribe(String channel) throws SQLException { + this.channel = channel; + try (Statement stmt = conn.createStatement()) { + stmt.execute("LISTEN " + channel); + } + } + + public void unsubscribe() throws SQLException { + active = false; + try (Statement stmt = conn.createStatement()) { + stmt.execute("UNLISTEN " + channel); + } + conn.close(); + } + + public void run() { + PostgresMessageConfiguration postgresMessageConfiguration = messageConfiguration + .getPostgresMessageConfiguration(); + int messageWaitMillis = postgresMessageConfiguration.getMessageWaitMillis(); + int messageSleepThreadTime = postgresMessageConfiguration.getMessageSleepThreadTime(); + + try { + while (active) { + org.postgresql.PGNotification notifications[] = pgConn.getNotifications(messageWaitMillis); + + if (notifications != null) { + for (int i = 0; i < notifications.length; i++) { + pubSub.onMessage(notifications[i].getName(), notifications[i].getParameter()); + } + } + + Thread.sleep(messageSleepThreadTime); + } + } catch (SQLException ex) { + } catch (InterruptedException ex) { + log.error("Error during reading messages", ex); + } + } + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/RedisMessageProvider.java b/jans-core/message/src/main/java/io/jans/service/message/RedisMessageProvider.java new file mode 100644 index 00000000000..db251a73980 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/RedisMessageProvider.java @@ -0,0 +1,205 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.jans.service.cache.AbstractRedisProvider; +import io.jans.service.cache.RedisConfiguration; +import io.jans.service.cache.RedisProvider; +import io.jans.service.cache.RedisProviderFactory; +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import io.jans.service.message.pubsub.PubSubInterface; +import io.jans.service.message.pubsub.PubSubRedisAdapter; +import io.jans.util.security.StringEncrypter; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.ShardedJedisPool; + +/** + * Redis message provider + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +@ApplicationScoped +public class RedisMessageProvider extends AbstractMessageProvider { + + @Inject + private Logger log; + + @Inject + private MessageConfiguration messageConfiguration; + + @Inject + private StringEncrypter stringEncrypter; + + private ConcurrentHashMap subscibedPubSubs; + + private AbstractRedisProvider redisProvider; + private ExecutorService executorService; + + @PostConstruct + public void init() { + } + + @PreDestroy + public void destroy() { + log.debug("Destroying RedisProvider"); + + shutdown(); + if (redisProvider != null) { + redisProvider.destroy(); + } + + log.debug("Destroyed RedisMessageProvider"); + } + + @Override + public void create(ExecutorService executorService) { + this.executorService = executorService; + this.subscibedPubSubs = new ConcurrentHashMap<>(); + + try { + RedisConfiguration redisConfiguration = messageConfiguration.getRedisConfiguration(); + decryptPassword(redisConfiguration); + log.debug("Starting RedisMessageProvider messages ... configuration {}", redisConfiguration); + redisProvider = RedisProviderFactory.create(redisConfiguration); + redisProvider.create(); + log.debug("RedisMessageProvider message started."); + } catch (Exception ex) { + log.error("Failed to start RedisMessageProvider messages", ex); + throw new IllegalStateException("Error starting RedisMessageProvider messages", ex); + } + } + + public void configure(MessageConfiguration messageConfiguration, StringEncrypter stringEncrypter) { + this.log = LoggerFactory.getLogger(RedisProvider.class); + this.messageConfiguration = messageConfiguration; + this.stringEncrypter = stringEncrypter; + } + + private void decryptPassword(RedisConfiguration redisConfiguration) { + try { + String encryptedPassword = redisConfiguration.getPassword(); + if (StringUtils.isNotBlank(encryptedPassword)) { + redisConfiguration.setPassword(stringEncrypter.decrypt(encryptedPassword)); + log.trace("Decrypted redis password successfully."); + } + } catch (StringEncrypter.EncryptionException e) { + log.error("Error during redis password decryption", e); + } + } + + public boolean isConnected() { + return redisProvider.isConnected(); + } + + @Override + public AbstractRedisProvider getDelegate() { + return redisProvider; + } + + @Override + public MessageProviderType getProviderType() { + return MessageProviderType.REDIS; + } + + @Override + public void subscribe(PubSubInterface pubSub, String... channels) { + log.info("Starting new thread for subscribing to Redis channels {}", Arrays.asList(channels)); + Object objectPool = redisProvider.getDelegate(); + if (objectPool instanceof JedisPool) { + executorService.execute(() -> { + Jedis jedis = ((JedisPool) objectPool).getResource(); + PubSubRedisAdapter pubSubRedisAdapter = new PubSubRedisAdapter(pubSub); + subscibedPubSubs.put(System.identityHashCode(pubSub), pubSubRedisAdapter); + + jedis.subscribe(pubSubRedisAdapter, channels); + }); + } else if (objectPool instanceof JedisCluster) { + executorService.execute(() -> { + JedisCluster jedis = ((JedisCluster) objectPool); + PubSubRedisAdapter pubSubRedisAdapter = new PubSubRedisAdapter(pubSub); + subscibedPubSubs.put(System.identityHashCode(pubSub), pubSubRedisAdapter); + + jedis.subscribe(pubSubRedisAdapter, channels); + }); + } else if (objectPool instanceof ShardedJedisPool) { + // Not supported in current lib, also Sharded is deprecated in 5.x lib + throw new UnsupportedOperationException("Sharded pool not provides PubSub in 3.9.x API"); + } + log.info("Stopping thread after subscription end to Redis from channels {}", Arrays.asList(channels)); + } + + @Override + public void unsubscribe(PubSubInterface pubSub) { + log.info("Starting end subscription to Redis for {}", pubSub); + + int pubSubIdentifier = System.identityHashCode(pubSub); + PubSubRedisAdapter pubSubRedisAdapter = subscibedPubSubs.get(pubSubIdentifier); + if (pubSubRedisAdapter == null) { + log.warn("PubSub {} in unsubscribe request is not registered", pubSub); + return; + } + + pubSubRedisAdapter.unsubscribe(); + subscibedPubSubs.remove(pubSubIdentifier); + log.info("Sent request to end subscription to Redis for {}", pubSub); + } + + @Override + public boolean publish(String channel, String message) { + Object objectPool = redisProvider.getDelegate(); + if (objectPool instanceof JedisPool) { + CompletableFuture.runAsync(() -> { + JedisPool pool = ((JedisPool) objectPool); + Jedis jedis = pool.getResource(); + try { + jedis.publish(channel, message); + } finally { + pool.returnResource(jedis); + } + }); + } else if (objectPool instanceof JedisCluster) { + CompletableFuture.runAsync(() -> { + Jedis jedis = ((JedisPool) objectPool).getResource(); + jedis.publish(channel, message); + }); + } else if (objectPool instanceof ShardedJedisPool) { + // Not supported in current lib, also Sharded is deprecatged in 5.x lib + throw new UnsupportedOperationException("Sharded pool not provides PubSub in 3.9.x API"); + } + + return true; + } + + @Override + public void shutdown() { + for (PubSubRedisAdapter pubSubRedisAdapter : subscibedPubSubs.values()) { + try { + pubSubRedisAdapter.unsubscribe(); + } catch (Throwable ex) { + log.error("Failed to unsubscribe for {}", pubSubRedisAdapter.getPubSub()); + } + } + subscibedPubSubs.clear(); + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/StandaloneMessageProviderFactory.java b/jans-core/message/src/main/java/io/jans/service/message/StandaloneMessageProviderFactory.java new file mode 100644 index 00000000000..ac321626b82 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/StandaloneMessageProviderFactory.java @@ -0,0 +1,114 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import io.jans.util.security.StringEncrypter; + +/** + * Message provider for non CDI applications + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public class StandaloneMessageProviderFactory { + + private static final Logger LOG = LoggerFactory.getLogger(StandaloneMessageProviderFactory.class); + + private StringEncrypter stringEncrypter; + private ExecutorService executorService; + + public StandaloneMessageProviderFactory(ExecutorService executorService, StringEncrypter stringEncrypter) { + this.executorService = executorService; + this.stringEncrypter = stringEncrypter; + } + + public StandaloneMessageProviderFactory(StringEncrypter stringEncrypter) { + this.executorService = Executors.newCachedThreadPool(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName(MessageProviderFactory.MESSAGE_PROVIDER_THREAD_NAME); + thread.setDaemon(true); + return thread; + } + }); + this.stringEncrypter = stringEncrypter; + } + + public MessageProvider getMessageProvider(MessageConfiguration messageConfiguration) { + MessageProviderType messageProviderType = messageConfiguration.getMessageProviderType(); + + if (messageProviderType == null) { + LOG.error("Failed to initialize messageProvider, messageProviderType is null. Fallback to NULL type."); + messageProviderType = MessageProviderType.NULL; + } + + // Create bean + AbstractMessageProvider messageProvider = null; + switch (messageProviderType) { + case NULL: + if (stringEncrypter == null) { + throw new RuntimeException("Factory is not initialized properly. stringEncrypter is not specified"); + } + + NullMessageProvider nullMessageProvider = new NullMessageProvider(); + nullMessageProvider.configure(messageConfiguration, stringEncrypter); + nullMessageProvider.init(); + + messageProvider = nullMessageProvider; + break; + case REDIS: + if (stringEncrypter == null) { + throw new RuntimeException("Factory is not initialized properly. stringEncrypter is not specified"); + } + + RedisMessageProvider redisMessageProvider = new RedisMessageProvider(); + redisMessageProvider.configure(messageConfiguration, stringEncrypter); + redisMessageProvider.init(); + + messageProvider = redisMessageProvider; + break; + case POSTGRES: + if (stringEncrypter == null) { + throw new RuntimeException("Factory is not initialized properly. stringEncrypter is not specified"); + } + + PostgresMessageProvider postgresMessageProvider = new PostgresMessageProvider(); + postgresMessageProvider.configure(messageConfiguration, stringEncrypter); + postgresMessageProvider.init(); + + messageProvider = postgresMessageProvider; + break; + } + + if (messageProvider == null) { + throw new RuntimeException( + "Failed to initialize messageProvider, messageProviderType is unsupported: " + messageProvider); + } + + messageProvider.create(executorService); + + return messageProvider; + } + + public int getActiveCount() { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + + public int getPoolSize() { + return ((ThreadPoolExecutor) executorService).getPoolSize(); + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/model/config/MessageConfiguration.java b/jans-core/message/src/main/java/io/jans/service/message/model/config/MessageConfiguration.java new file mode 100644 index 00000000000..658b7c6641d --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/model/config/MessageConfiguration.java @@ -0,0 +1,71 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.model.config; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import io.jans.service.cache.RedisConfiguration; +import jakarta.enterprise.inject.Vetoed; + +/** + * @author Yuriy Movchan Date: 30/11/2023 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@Vetoed +public class MessageConfiguration implements Serializable { + + private static final long serialVersionUID = 5047285980342633402L; + + private MessageProviderType messageProviderType = MessageProviderType.NULL; + + private NullMessageConfiguration nullConfiguration = new NullMessageConfiguration(); + + private RedisConfiguration redisConfiguration; + + private PostgresMessageConfiguration postgresMessageConfiguration; + + public MessageProviderType getMessageProviderType() { + return messageProviderType; + } + + public void setMessageProviderType(MessageProviderType messageProviderType) { + this.messageProviderType = messageProviderType; + } + + public NullMessageConfiguration getNullConfiguration() { + return nullConfiguration; + } + + public void setNullConfiguration(NullMessageConfiguration nullConfiguration) { + this.nullConfiguration = nullConfiguration; + } + + public PostgresMessageConfiguration getPostgresMessageConfiguration() { + return postgresMessageConfiguration; + } + + public void setPostgresMessageConfiguration(PostgresMessageConfiguration postgresMessageConfiguration) { + this.postgresMessageConfiguration = postgresMessageConfiguration; + } + + public RedisConfiguration getRedisConfiguration() { + return redisConfiguration; + } + + public void setRedisConfiguration(RedisConfiguration redisConfiguration) { + this.redisConfiguration = redisConfiguration; + } + + @Override + public String toString() { + return "CacheConfiguration{" + "cacheProviderType=" + messageProviderType + ", nullConfiguration=" + + nullConfiguration + ", redisConfiguration=" + redisConfiguration + ", postgresMessageConfiguration=" + + postgresMessageConfiguration + '}'; + } +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/model/config/MessageProviderType.java b/jans-core/message/src/main/java/io/jans/service/message/model/config/MessageProviderType.java new file mode 100644 index 00000000000..c83dbdf4a16 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/model/config/MessageProviderType.java @@ -0,0 +1,19 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.model.config; + +import jakarta.xml.bind.annotation.XmlEnum; + +/** + * @author Yuriy Movchan Date: 30/11/2023 + */ +@XmlEnum(String.class) +public enum MessageProviderType { + + NULL, REDIS, POSTGRES + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/model/config/NullMessageConfiguration.java b/jans-core/message/src/main/java/io/jans/service/message/model/config/NullMessageConfiguration.java new file mode 100644 index 00000000000..8eef215af9b --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/model/config/NullMessageConfiguration.java @@ -0,0 +1,21 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.model.config; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** + * @author Yuriy Movchan Date: 30/11/2023 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class NullMessageConfiguration implements Serializable { + + private static final long serialVersionUID = 7544731515017051209L; + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/model/config/PostgresMessageConfiguration.java b/jans-core/message/src/main/java/io/jans/service/message/model/config/PostgresMessageConfiguration.java new file mode 100644 index 00000000000..cf8ce6e1b57 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/model/config/PostgresMessageConfiguration.java @@ -0,0 +1,129 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.model.config; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import jakarta.xml.bind.annotation.XmlElement; + +/** + * @author Yuriy Movchan Date: 30/11/2023 + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class PostgresMessageConfiguration implements Serializable { + + @XmlElement(name = "db.schema.name") + private String dbSchemaName; + + @XmlElement(name = "connection.uri") + private String connectionUri; + + @XmlElement(name = "auth.userName") + private String authUserName; + + @XmlElement(name = "auth.userPassword") + private String authUserPassword; + + @XmlElement(name = "connection.pool.max-total") + private Integer connectionPoolMaxTotal; + + @XmlElement(name = "connection.pool.max-idle") + private Integer connectionPoolMaxIdle; + + @XmlElement(name = "connection.pool.min-idle") + private Integer connectionPoolMinIdle; + + @XmlElement(name = "message.wait-millis") + private Integer messageWaitMillis; + + @XmlElement(name = "message.sleep-thread-millis") + private Integer messageSleepThreadTime; + + public String getDbSchemaName() { + return dbSchemaName; + } + + public void setDbSchemaName(String dbSchemaName) { + this.dbSchemaName = dbSchemaName; + } + + public String getConnectionUri() { + return connectionUri; + } + + public void setConnectionUri(String connectionUri) { + this.connectionUri = connectionUri; + } + + public String getAuthUserName() { + return authUserName; + } + + public void setAuthUserName(String authUserName) { + this.authUserName = authUserName; + } + + public String getAuthUserPassword() { + return authUserPassword; + } + + public void setAuthUserPassword(String authUserPassword) { + this.authUserPassword = authUserPassword; + } + + public Integer getConnectionPoolMaxTotal() { + return connectionPoolMaxTotal; + } + + public void setConnectionPoolMaxTotal(Integer connectionPoolMaxTotal) { + this.connectionPoolMaxTotal = connectionPoolMaxTotal; + } + + public Integer getConnectionPoolMaxIdle() { + return connectionPoolMaxIdle; + } + + public void setConnectionPoolMaxIdle(Integer connectionPoolMaxIdle) { + this.connectionPoolMaxIdle = connectionPoolMaxIdle; + } + + public Integer getConnectionPoolMinIdle() { + return connectionPoolMinIdle; + } + + public void setConnectionPoolMinIdle(Integer connectionPoolMinIdle) { + this.connectionPoolMinIdle = connectionPoolMinIdle; + } + + public Integer getMessageWaitMillis() { + return messageWaitMillis; + } + + public void setMessageWaitMillis(Integer messageWaitMillis) { + this.messageWaitMillis = messageWaitMillis; + } + + public Integer getMessageSleepThreadTime() { + return messageSleepThreadTime; + } + + public void setMessageSleepThreadTime(Integer messageSleepThreadTime) { + this.messageSleepThreadTime = messageSleepThreadTime; + } + + @Override + public String toString() { + return "PostgresMessageConfiguration [dbSchemaName=" + dbSchemaName + ", connectionUri=" + connectionUri + + ", authUserName=" + authUserName + ", authUserPassword=" + authUserPassword + + ", connectionPoolMaxTotal=" + connectionPoolMaxTotal + ", connectionPoolMaxIdle=" + + connectionPoolMaxIdle + ", connectionPoolMinIdle=" + connectionPoolMinIdle + ", messageWaitMillis=" + + messageWaitMillis + ", messageSleepThreadTime=" + messageSleepThreadTime + "]"; + } + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/pubsub/PubSubInterface.java b/jans-core/message/src/main/java/io/jans/service/message/pubsub/PubSubInterface.java new file mode 100644 index 00000000000..0a74c75d872 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/pubsub/PubSubInterface.java @@ -0,0 +1,22 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.pubsub; + +/** + * Listener for PubSub messages + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public interface PubSubInterface { + + void onMessage(String channel, String message); + + void onSubscribe(String channel, int subscribedChannels); + + void onUnsubscribe(String channel, int subscribedChannels); + +} diff --git a/jans-core/message/src/main/java/io/jans/service/message/pubsub/PubSubRedisAdapter.java b/jans-core/message/src/main/java/io/jans/service/message/pubsub/PubSubRedisAdapter.java new file mode 100644 index 00000000000..b9d3e8cfe08 --- /dev/null +++ b/jans-core/message/src/main/java/io/jans/service/message/pubsub/PubSubRedisAdapter.java @@ -0,0 +1,50 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.pubsub; + +import redis.clients.jedis.JedisPubSub; + +/** + * Listener converter for PubSub messages from Redis listener + * + * @author Yuriy Movchan Date: 30/11/2023 + */ +public class PubSubRedisAdapter extends JedisPubSub implements PubSubInterface { + + private PubSubInterface pubSub; + private long messagesCount; + + public PubSubRedisAdapter(PubSubInterface pubSub) { + this.pubSub = pubSub; + this.messagesCount = 0; + } + + public PubSubInterface getPubSub() { + return pubSub; + } + + @Override + public void onMessage(String channel, String message) { + this.pubSub.onMessage(channel, message); + messagesCount++; + } + + public long getMessagesCount() { + return messagesCount; + } + + @Override + public void unsubscribe() { + super.unsubscribe(); + } + + @Override + public boolean isSubscribed() { + return super.isSubscribed(); + } + +} diff --git a/jans-core/message/src/main/resources/META-INF/beans.xml b/jans-core/message/src/main/resources/META-INF/beans.xml new file mode 100644 index 00000000000..b7930c568e8 --- /dev/null +++ b/jans-core/message/src/main/resources/META-INF/beans.xml @@ -0,0 +1,6 @@ + + + diff --git a/jans-core/message/src/test/java/io/jans/service/message/test/dev/StandalonePostresMessageTest.java b/jans-core/message/src/test/java/io/jans/service/message/test/dev/StandalonePostresMessageTest.java new file mode 100644 index 00000000000..ce0f9178415 --- /dev/null +++ b/jans-core/message/src/test/java/io/jans/service/message/test/dev/StandalonePostresMessageTest.java @@ -0,0 +1,101 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.test.dev; + +import io.jans.service.message.MessageProvider; +import io.jans.service.message.StandaloneMessageProviderFactory; +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import io.jans.service.message.model.config.PostgresMessageConfiguration; +import io.jans.service.message.pubsub.PubSubInterface; +import io.jans.util.security.StringEncrypter; +import io.jans.util.security.StringEncrypter.EncryptionException; + +/** + * @author Yuriy Movchan Date: 30/11/2023 + */ +public class StandalonePostresMessageTest { + + public static void main(String[] args) throws EncryptionException, InterruptedException { + StringEncrypter stringEncrypter = StringEncrypter.instance("aOm7B9mrWT66roqZCNcUr7ox"); + + MessageConfiguration messageConfiguration = new MessageConfiguration(); + messageConfiguration.setMessageProviderType(MessageProviderType.POSTGRES); + + PostgresMessageConfiguration postgresMessageConfiguration = new PostgresMessageConfiguration(); + postgresMessageConfiguration.setDbSchemaName("public"); + postgresMessageConfiguration.setConnectionUri("jdbc:postgresql://localhost:5432/postgres"); + postgresMessageConfiguration.setAuthUserName("postgres"); + postgresMessageConfiguration.setAuthUserPassword("rgy1GUg+1kY="); // secret + postgresMessageConfiguration.setMessageWaitMillis(100); + postgresMessageConfiguration.setMessageSleepThreadTime(200); + + messageConfiguration.setPostgresMessageConfiguration(postgresMessageConfiguration); + + StandaloneMessageProviderFactory messageProviderFactory = new StandaloneMessageProviderFactory(stringEncrypter); + MessageProvider messageProvider = messageProviderFactory.getMessageProvider(messageConfiguration); + + PubSubInterface pubSubAdapter = new PubSubInterface() { + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + System.out.println(String.format("onUnsubscribe %s : %d", channel, subscribedChannels)); + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + System.out.println(String.format("onSubscribe %s : %d", channel, subscribedChannels)); + } + + @Override + public void onMessage(String channel, String message) { + System.out.println(String.format("onMessage %s : %s", channel, message)); + } + }; + + System.out.printf("First test...\n"); + messageProvider.subscribe(pubSubAdapter, "test1", "test2", "test3"); + for (int i = 0; i < 1000; i++) { + messageProvider.publish("test1", "1111111"); + messageProvider.publish("test2", "1111112"); + messageProvider.publish("test3", "1111113"); + } + + Thread.sleep(5 * 1000L); + messageProvider.unsubscribe(pubSubAdapter); + messageProvider.shutdown(); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + Thread.sleep(1 * 1000L); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + Thread.sleep(5 * 1000L); + System.out.printf("Second test...\n"); + + messageProvider.subscribe(pubSubAdapter, "test1"); + for (int i = 0; i < 1000; i++) { + messageProvider.publish("test1", "1111111"); + messageProvider.publish("test2", "1111112"); + messageProvider.publish("test3", "1111113"); + } + + Thread.sleep(5 * 1000L); + messageProvider.unsubscribe(pubSubAdapter); + messageProvider.shutdown(); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + Thread.sleep(1 * 1000L); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + System.out.printf("End test...\n"); + } + +} diff --git a/jans-core/message/src/test/java/io/jans/service/message/test/dev/StandaloneRedisMessageTest.java b/jans-core/message/src/test/java/io/jans/service/message/test/dev/StandaloneRedisMessageTest.java new file mode 100644 index 00000000000..cf3ac354712 --- /dev/null +++ b/jans-core/message/src/test/java/io/jans/service/message/test/dev/StandaloneRedisMessageTest.java @@ -0,0 +1,97 @@ +/* + * Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text. + * + * Copyright (c) 2023, Janssen Project + */ + +package io.jans.service.message.test.dev; + +import io.jans.service.cache.RedisConfiguration; +import io.jans.service.message.MessageProvider; +import io.jans.service.message.StandaloneMessageProviderFactory; +import io.jans.service.message.model.config.MessageConfiguration; +import io.jans.service.message.model.config.MessageProviderType; +import io.jans.service.message.pubsub.PubSubInterface; +import io.jans.util.security.StringEncrypter; +import io.jans.util.security.StringEncrypter.EncryptionException; + +/** + * @author Yuriy Movchan Date: 30/11/2023 + */ +public class StandaloneRedisMessageTest { + + public static void main(String[] args) throws EncryptionException, InterruptedException { + StringEncrypter stringEncrypter = StringEncrypter.instance("aOm7B9mrWT66roqZCNcUr7ox"); + + MessageConfiguration messageConfiguration = new MessageConfiguration(); + messageConfiguration.setMessageProviderType(MessageProviderType.REDIS); + + RedisConfiguration redisMessageConfiguration = new RedisConfiguration(); + redisMessageConfiguration.setServers("192.168.1.151:6379"); + redisMessageConfiguration.setPassword("rgy1GUg+1kY="); // secret + + messageConfiguration.setRedisConfiguration(redisMessageConfiguration); + + StandaloneMessageProviderFactory messageProviderFactory = new StandaloneMessageProviderFactory(stringEncrypter); + MessageProvider messageProvider = messageProviderFactory.getMessageProvider(messageConfiguration); + + PubSubInterface pubSubAdapter = new PubSubInterface() { + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + System.out.println(String.format("onUnsubscribe %s : %d", channel, subscribedChannels)); + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + System.out.println(String.format("onSubscribe %s : %d", channel, subscribedChannels)); + } + + @Override + public void onMessage(String channel, String message) { + System.out.println(String.format("onMessage %s : %s", channel, message)); + } + }; + + System.out.printf("First test...\n"); + messageProvider.subscribe(pubSubAdapter, "test1", "test2", "test3"); + for (int i = 0; i < 1000; i++) { + messageProvider.publish("test1", "1111111"); + messageProvider.publish("test2", "1111112"); + messageProvider.publish("test3", "1111113"); + } + + Thread.sleep(5 * 1000L); + messageProvider.unsubscribe(pubSubAdapter); + messageProvider.shutdown(); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + Thread.sleep(1 * 1000L); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + Thread.sleep(5 * 1000L); + System.out.printf("Second test...\n"); + + messageProvider.subscribe(pubSubAdapter, "test1"); + for (int i = 0; i < 1000; i++) { + messageProvider.publish("test1", "1111111"); + messageProvider.publish("test2", "1111112"); + messageProvider.publish("test3", "1111113"); + } + + Thread.sleep(5 * 1000L); + messageProvider.unsubscribe(pubSubAdapter); + messageProvider.shutdown(); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + Thread.sleep(1 * 1000L); + System.out.printf("Active count %d, total: %d \n", messageProviderFactory.getActiveCount(), + messageProviderFactory.getPoolSize()); + + System.out.printf("End test...\n"); + } + +} diff --git a/jans-orm/sql/src/main/java/io/jans/orm/sql/operation/impl/SqConnectionProviderPool.java b/jans-orm/sql/src/main/java/io/jans/orm/sql/operation/impl/SqConnectionProviderPool.java new file mode 100644 index 00000000000..e8cc387ca63 --- /dev/null +++ b/jans-orm/sql/src/main/java/io/jans/orm/sql/operation/impl/SqConnectionProviderPool.java @@ -0,0 +1,233 @@ +package io.jans.orm.sql.operation.impl; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.commons.dbcp2.ConnectionFactory; +import org.apache.commons.dbcp2.DriverManagerConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnection; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.jans.orm.exception.operation.ConfigurationException; +import io.jans.orm.exception.operation.ConnectionException; +import io.jans.orm.sql.model.ResultCode; +import io.jans.orm.util.PropertiesHelper; +import io.jans.orm.util.StringHelper; + +public class SqConnectionProviderPool { + + private static final Logger LOG = LoggerFactory.getLogger(SqConnectionProviderPool.class); + + private static final String DRIVER_PROPERTIES_PREFIX = "connection.driver-property"; + + protected Properties props; + + private String connectionUri; + private Properties connectionProperties; + + private GenericObjectPoolConfig objectPoolConfig; + protected PoolingDataSource poolingDataSource; + + protected int creationResultCode; + + protected SqConnectionProviderPool() {} + + public SqConnectionProviderPool(Properties props) { + this.props = props; + } + + public void create() { + try { + init(); + } catch (Exception ex) { + this.creationResultCode = ResultCode.OPERATIONS_ERROR_INT_VALUE; + + Properties clonedProperties = (Properties) props.clone(); + if (clonedProperties.getProperty("auth.userName") != null) { + clonedProperties.setProperty("auth.userPassword", "REDACTED"); + } + + LOG.error("Failed to create connection pool with properties: '{}'. Exception: {}", clonedProperties, ex); + } + } + + protected void init() throws Exception { + if (!props.containsKey("db.schema.name")) { + throw new ConfigurationException("Property 'db.schema.name' is mandatory!"); + } + + if (!props.containsKey("connection.uri")) { + throw new ConfigurationException("Property 'connection.uri' is mandatory!"); + } + this.connectionUri = props.getProperty("connection.uri"); + + Properties filteredDriverProperties = PropertiesHelper.findProperties(props, DRIVER_PROPERTIES_PREFIX, "."); + this.connectionProperties = new Properties(); + for (Entry driverPropertyEntry : filteredDriverProperties.entrySet()) { + String key = StringHelper.toString(driverPropertyEntry.getKey()).substring(DRIVER_PROPERTIES_PREFIX.length() + 1); + String value = StringHelper.toString(driverPropertyEntry.getValue()); + + connectionProperties.put(key, value); + } + + String userName = props.getProperty("auth.userName"); + String userPassword = props.getProperty("auth.userPassword"); + + connectionProperties.setProperty("user", userName); + connectionProperties.setProperty("password", userPassword); + + this.objectPoolConfig = new GenericObjectPoolConfig<>(); + + Integer cpMaxTotal = StringHelper.toInteger(props.getProperty("connection.pool.max-total"), null); + if (cpMaxTotal != null) { + objectPoolConfig.setMaxTotal(cpMaxTotal); + } + + Integer cpMaxIdle = StringHelper.toInteger(props.getProperty("connection.pool.max-idle"), null); + if (cpMaxIdle != null) { + objectPoolConfig.setMaxIdle(cpMaxIdle); + } + + Integer cpMinIdle = StringHelper.toInteger(props.getProperty("connection.pool.min-idle"), null); + if (cpMinIdle != null) { + objectPoolConfig.setMinIdle(cpMinIdle); + } + + Integer cpMaxWaitTimeMillis = StringHelper.toInteger(props.getProperty("connection.pool.max-wait-time-millis"), + null); + if (cpMaxWaitTimeMillis != null) { + objectPoolConfig.setMaxWaitMillis(cpMaxWaitTimeMillis); + } + + Integer cpMinEvictableIdleTimeMillis = StringHelper + .toInteger(props.getProperty("connection.pool.min-evictable-idle-time-millis"), null); + if (cpMaxWaitTimeMillis != null) { + objectPoolConfig.setMinEvictableIdleTimeMillis(cpMinEvictableIdleTimeMillis); + } + + Boolean testOnCreate = StringHelper.toBoolean(props.getProperty("connection.pool.test-on-create"), null); + if (testOnCreate != null) { + objectPoolConfig.setTestOnCreate(testOnCreate); + } + + Boolean testOnReturn = StringHelper.toBoolean(props.getProperty("connection.pool.test-on-return"), null); + if (testOnReturn != null) { + objectPoolConfig.setTestOnReturn(testOnReturn); + } + + openWithWaitImpl(); + + this.creationResultCode = ResultCode.SUCCESS_INT_VALUE; + + LOG.info("Created connection pool"); + } + + private void openWithWaitImpl() throws Exception { + long connectionMaxWaitTimeMillis = StringHelper.toLong(props.getProperty("connection.pool.create-max-wait-time-millis"), 30 * 1000L); + LOG.debug("Using connection timeout: '{}'", connectionMaxWaitTimeMillis); + + Exception lastException = null; + + int attempt = 0; + long currentTime = System.currentTimeMillis(); + long maxWaitTime = currentTime + connectionMaxWaitTimeMillis; + do { + attempt++; + if (attempt > 0) { + LOG.info("Attempting to create connection pool: '{}'", attempt); + } + + try { + open(); + if (isConnected()) { + break; + } else { + LOG.info("Failed to connect to DB"); + destroy(); + throw new ConnectionException("Failed to create connection pool"); + } + } catch (Exception ex) { + lastException = ex; + } + + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + LOG.error("Exception happened in sleep", ex); + return; + } + currentTime = System.currentTimeMillis(); + } while (maxWaitTime > currentTime); + + if (lastException != null) { + throw lastException; + } + } + + private void open() { + ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(connectionUri, connectionProperties); + PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory, null); + ObjectPool objectPool = new GenericObjectPool<>(poolableConnectionFactory, objectPoolConfig); + + this.poolingDataSource = new PoolingDataSource<>(objectPool); + poolableConnectionFactory.setPool(objectPool); + } + + public int getCreationResultCode() { + return creationResultCode; + } + + public boolean isCreated() { + return ResultCode.SUCCESS_INT_VALUE == creationResultCode; + } + + public boolean isConnected() { + if (this.poolingDataSource == null) { + return false; + } + + boolean isConnected = true; + try (Connection con = this.poolingDataSource.getConnection()) { + return con.isValid(30); + } catch (Exception ex) { + LOG.error("Failed to check connection", ex); + isConnected = false; + } + + return isConnected; + } + + public Connection getConnection() { + try { + return this.poolingDataSource.getConnection(); + } catch (SQLException ex) { + throw new ConnectionException("Failed to get connection from pool", ex); + } + } + + public boolean destroy() { + boolean result = true; + if (this.poolingDataSource != null) { + try { + this.poolingDataSource.close(); + } catch (RuntimeException ex) { + LOG.error("Failed to close connection pool", ex); + result = false; + } catch (SQLException ex) { + LOG.error("Failed to close connection pool. Erorr code: '{}'", ex.getErrorCode(), ex); + result = false; + } + } + + return result; + } + +} \ No newline at end of file