Skip to content

Commit

Permalink
feat: add jans-lock-event library to publish messages to event server…
Browse files Browse the repository at this point in the history
… from jans-auth (#6893)

* feat: if DB column is boolean convert string "true" and '1' to true

* feat: add jans-lock-event library to publish messages to event server from jans-auth #6538
  • Loading branch information
yurem committed Dec 2, 2023
1 parent 45ce059 commit c49f8f1
Show file tree
Hide file tree
Showing 21 changed files with 1,844 additions and 0 deletions.
84 changes: 84 additions & 0 deletions jans-core/message/pom.xml
@@ -0,0 +1,84 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<artifactId>jans-core-message</artifactId>
<name>Caches support</name>

<parent>
<groupId>io.jans</groupId>
<artifactId>jans-core-parent</artifactId>
<version>1.0.20</version>
</parent>

<prerequisites>
<maven>${maven.min-version}</maven>
</prerequisites>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*.xml</include>
<include>**/services/*</include>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*.xml</include>
<include>**/services/*</include>
<include>**/*.properties</include>
</includes>
</testResource>
</testResources>
</build>

<dependencies>
<!-- Gluu -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jans-core-cache</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jans-orm-core</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jans-orm-sql</artifactId>
</dependency>

<!-- CDI -->
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
</dependency>
<!-- Cache -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>

</dependencies>

</project>
@@ -0,0 +1,68 @@
/*
* Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text.
*
* Copyright (c) 2023, Janssen Project
*/

package io.jans.service;

import org.slf4j.Logger;

import io.jans.service.message.MessageInterface;
import io.jans.service.message.MessageProvider;
import io.jans.service.message.pubsub.PubSubInterface;
import jakarta.inject.Inject;

/**
* Provides operations with messages
*
* @author Yuriy Movchan Date: 30/11/2023
*/
public abstract class BaseMessageService implements MessageInterface {

public static int DEFAULT_EXPIRATION = 60;

@Inject
private Logger log;

public void subscribe(PubSubInterface pubSubAdapter, String... channels) {
MessageProvider<?> messageProvider = getMessageProvider();
if (messageProvider == null) {
log.error("Message provider is invalid!");
return;
}

log.trace("Subscribe '{}' for channels '{}'", pubSubAdapter, channels);
messageProvider.subscribe(pubSubAdapter, channels);
}

public void unsubscribe(PubSubInterface pubSubAdapter) {
MessageProvider<?> messageProvider = getMessageProvider();
if (messageProvider == null) {
log.error("Message provider is invalid!");
return;
}

log.trace("Unsubscribe '{}'", pubSubAdapter);
messageProvider.unsubscribe(pubSubAdapter);
}

public boolean publish(String channel, String message) {
MessageProvider<?> messageProvider = getMessageProvider();
if (messageProvider == null) {
log.error("Message provider is invalid!");
return false;
}

if (log.isTraceEnabled()) {
log.trace("Publish '{}' to channel '{}'", message, channel);
}

boolean result = messageProvider.publish(channel, message);

return result;
}

protected abstract MessageProvider getMessageProvider();

}
@@ -0,0 +1,29 @@
/*
* Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text.
*
* Copyright (c) 2023, Janssen Project
*/

package io.jans.service;

import io.jans.service.message.MessageProvider;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

/**
* Provides operations with messages
*
* @author Yuriy Movchan Date: 2023/12/03
*/
@ApplicationScoped
public class MessageService extends BaseMessageService {

@Inject
private MessageProvider messageProvider;

@Override
protected MessageProvider getMessageProvider() {
return messageProvider;
}

}
@@ -0,0 +1,22 @@
/*
* Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text.
*
* Copyright (c) 2023, Janssen Project
*/

package io.jans.service.message;

import java.util.concurrent.ExecutorService;

/**
* Interface for each message provider
*
* @author Yuriy Movchan Date: 30/11/2023
*/
public abstract class AbstractMessageProvider<T> extends MessageProvider<T> {

public abstract void create(ExecutorService executorService);

public abstract void destroy();

}
@@ -0,0 +1,24 @@
/*
* Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text.
*
* Copyright (c) 2023, Janssen Project
*/

package io.jans.service.message;

import io.jans.service.message.pubsub.PubSubInterface;

/**
* Interface for each message provider
*
* @author Yuriy Movchan Date: 30/11/2023
*/
public interface MessageInterface {

void subscribe(PubSubInterface pubSubAdapter, String... channels);

void unsubscribe(PubSubInterface pubSubAdapter);

boolean publish(String channel, String message);

}
@@ -0,0 +1,27 @@
/*
* Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text.
*
* Copyright (c) 2023, Janssen Project
*/

package io.jans.service.message;

import io.jans.service.message.model.config.MessageProviderType;

/**
* Interface for each message provider
*
* @author Yuriy Movchan Date: 30/11/2023
*/
public abstract class MessageProvider<T> implements MessageInterface {

/*
* Delegate internal connection object
*/
public abstract T getDelegate();

public abstract MessageProviderType getProviderType();

public abstract void shutdown();

}
@@ -0,0 +1,127 @@
/*
* Janssen Project software is available under the Apache License (2004). See http://www.apache.org/licenses/ for full text.
*
* Copyright (c) 2023, Janssen Project
*/

package io.jans.service.message;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;

import io.jans.service.message.model.config.MessageConfiguration;
import io.jans.service.message.model.config.MessageProviderType;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;

/**
* Message provider factory
*
* @author Yuriy Movchan Date: 30/11/2023
*/
@ApplicationScoped
public class MessageProviderFactory {

public static final String MESSAGE_PROVIDER_THREAD_NAME = "MessageProviderThread";

@Inject
private Logger log;

@Inject
private MessageConfiguration messageConfiguration;

@Inject
@Any
private Instance<MessageProvider> instance;

private ExecutorService executorService;
private MessageProvider messageProvider;

@PostConstruct
public void create() {
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName(MESSAGE_PROVIDER_THREAD_NAME);
thread.setDaemon(true);
return thread;
}
});
}

@PreDestroy
public void destroy() {
shutdown();
}

@Produces
@ApplicationScoped
public MessageProvider getMessageProvider() {
log.debug("Started to create message provider");

messageProvider = getCacheProvider(messageConfiguration);

return messageProvider;
}

public MessageProvider getCacheProvider(MessageConfiguration messageConfiguration) {
MessageProviderType messageProviderType = messageConfiguration.getMessageProviderType();

// Create proxied bean
AbstractMessageProvider<?> messageProvider = null;
switch (messageProviderType) {
case NULL:
messageProvider = instance.select(NullMessageProvider.class).get();
break;
case REDIS:
messageProvider = instance.select(RedisMessageProvider.class).get();
break;
case POSTGRES:
messageProvider = instance.select(PostgresMessageProvider.class).get();
break;
}

if (messageProvider == null) {
throw new RuntimeException(
"Failed to initialize messageProvider, messageProvider is unsupported: " + messageProviderType);
}

messageProvider.create(executorService);

return messageProvider;
}

public int getActiveCount() {
return ((ThreadPoolExecutor) executorService).getActiveCount();
}

public int getPoolSize() {
return ((ThreadPoolExecutor) executorService).getPoolSize();
}

public void shutdown() {
if (messageProvider != null) {
log.info("Starting message provider shutdown...");
messageProvider.shutdown();
messageProvider = null;
}

if (executorService != null) {
log.info("Starting message provider thread pool shutdown...");
executorService.shutdownNow();
executorService = null;
}

log.info("Successfully stopped message provider pool");
}

}

0 comments on commit c49f8f1

Please sign in to comment.