Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.util.stream.Collectors
*
* @author github.com/Kilemonn
*/
class RedisAuthenticator: MultiQueueAuthenticator()
class RedisAuthenticator(private val prefix: String): MultiQueueAuthenticator()
{
companion object
{
Expand All @@ -33,7 +33,7 @@ class RedisAuthenticator: MultiQueueAuthenticator()
{
if (!isInNoneMode())
{
return setOf(RESTRICTED_KEY)
return setOf("$prefix$RESTRICTED_KEY")
}
return setOf()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import au.kilemon.messagequeue.logging.HasLogger
import au.kilemon.messagequeue.logging.Messages
import au.kilemon.messagequeue.message.QueueMessage
import au.kilemon.messagequeue.queue.MultiQueue
import au.kilemon.messagequeue.queue.cache.CacheKeyManager
import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue
import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue
import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue
Expand Down Expand Up @@ -45,10 +46,6 @@ class QueueConfiguration : HasLogger
@Autowired
private lateinit var messageSource: ReloadableResourceBundleMessageSource

@Autowired
@Lazy
private lateinit var redisTemplate: RedisTemplate<String, QueueMessage>

/**
* Initialise the [MultiQueue] [Bean] based on the [MessageQueueSettings.storageMedium].
*/
Expand All @@ -61,7 +58,7 @@ class QueueConfiguration : HasLogger
var queue: MultiQueue = InMemoryMultiQueue()
when (messageQueueSettings.storageMedium.uppercase()) {
StorageMedium.REDIS.toString() -> {
queue = RedisMultiQueue(messageQueueSettings.redisPrefix, redisTemplate)
queue = RedisMultiQueue(messageQueueSettings.redisPrefix)
}
StorageMedium.SQL.toString() -> {
queue = SqlMultiQueue()
Expand Down Expand Up @@ -118,7 +115,7 @@ class QueueConfiguration : HasLogger
var authenticator: MultiQueueAuthenticator = InMemoryAuthenticator()
when (messageQueueSettings.storageMedium.uppercase()) {
StorageMedium.REDIS.toString() -> {
authenticator = RedisAuthenticator()
authenticator = RedisAuthenticator(messageQueueSettings.redisPrefix)
}
StorageMedium.SQL.toString() -> {
authenticator = SqlAuthenticator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import au.kilemon.messagequeue.authentication.AuthenticationMatrix
import au.kilemon.messagequeue.authentication.RestrictionMode
import au.kilemon.messagequeue.logging.HasLogger
import au.kilemon.messagequeue.message.QueueMessage
import au.kilemon.messagequeue.queue.cache.redis.RedisCacheKeyManager
import au.kilemon.messagequeue.settings.MessageQueueSettings
import io.lettuce.core.RedisURI
import org.slf4j.Logger
Expand Down Expand Up @@ -257,4 +258,21 @@ class RedisConfiguration: HasLogger
template.keySerializer = StringRedisSerializer()
return template
}

@Bean
@ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="REDIS")
fun getRedisCacheKeyManagerRedisTemplate(): RedisTemplate<String, String>
{
val template = RedisTemplate<String, String>()
template.connectionFactory = getConnectionFactory()
template.keySerializer = StringRedisSerializer()
return template
}

@Bean
@ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="REDIS")
fun getRedisCacheKeyManager(): RedisCacheKeyManager
{
return RedisCacheKeyManager()
}
}
6 changes: 1 addition & 5 deletions src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,7 @@ abstract class MultiQueue: Queue<QueueMessage>, HasLogger
*/
fun keys(includeEmpty: Boolean = true): Set<String>
{
val keysSet = keysInternal(includeEmpty)

// Remove the reserved key(s)
multiQueueAuthenticator.getReservedSubQueues().forEach { reservedSubQueue -> keysSet.remove(reservedSubQueue) }
return keysSet
return keysInternal(includeEmpty)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package au.kilemon.messagequeue.queue.cache

/**
* To optimise how we determine the key list/sub queue list when using a cache, this class is used to store and
* manage the subqueue list for the cache backed [au.kilemon.messagequeue.queue.MultiQueue].
*
* @author github.com/Kilemonn
*/
abstract class CacheKeyManager(protected val prefix: String = "")
{
companion object
{
const val CACHE_KEYS_KEY: String = "messagequeue-cache-keys"
}

fun getReservedKeys(): Set<String>
{
return setOf("$prefix$CACHE_KEYS_KEY")
}

abstract fun add(key: String)

abstract fun remove(key: String)

abstract fun getKeys(): HashSet<String>

/**
* Used for tests.
*/
abstract fun contains(key: String): Boolean

/**
* Used for tests.
*/
abstract fun clear()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package au.kilemon.messagequeue.queue.cache

import java.util.stream.Collectors

/**
* A marker interface for Cache backed [au.kilemon.messagequeue.queue.MultiQueue].
*
* @author github.com/Kilemonn
*/
interface CacheMultiQueue
{
/**
* Append the [getPrefix] to the provided [subQueue] [String].
*
* @param subQueue the [String] to add the prefix to
* @return a [String] with the provided [subQueue] with the [getPrefix] appended to the beginning.
*/
fun appendPrefix(subQueue: String): String
{
if (hasPrefix() && !subQueue.startsWith(getPrefix()))
{
return "${getPrefix()}$subQueue"
}
return subQueue
}

/**
* @return whether the [getPrefix] is [String.isNotBlank]
*/
fun hasPrefix(): Boolean
{
return getPrefix().isNotBlank()
}

/**
* If [getPrefix] is set, removes this from all provided [keys].
* If [getPrefix] is null or blank, then the provided [keys] [Set] is immediately returned.
*
* @param keys the [Set] of [String] to remove the [getPrefix] from
* @return the updated [Set] of [String] with the [getPrefix] removed
*/
fun removePrefix(keys: Set<String>): Set<String>
{
if (!hasPrefix())
{
return keys
}

val prefixLength = getPrefix().length
return keys.stream().filter { key -> key.startsWith(getPrefix()) }
.map { key -> key.substring(prefixLength) }
.collect(Collectors.toSet())
}

fun getPrefix(): String
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package au.kilemon.messagequeue.queue.cache.redis

import au.kilemon.messagequeue.queue.cache.CacheKeyManager
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.data.redis.core.RedisTemplate

/**
* To optimise how we determine the key list/sub queue list when using a cache, this class is used to store and
* manage the subqueue list for the [RedisMultiQueue].
*
* @author github.com/Kilemonn
*/
class RedisCacheKeyManager: CacheKeyManager()
{
@Autowired
private lateinit var redisTemplate: RedisTemplate<String, String>

override fun add(key: String)
{
redisTemplate.opsForSet().add(CACHE_KEYS_KEY, key)
}

override fun remove(key: String)
{
redisTemplate.opsForSet().remove(CACHE_KEYS_KEY, key)
}

override fun contains(key: String): Boolean
{
return redisTemplate.opsForSet().isMember(CACHE_KEYS_KEY, key)
}

override fun getKeys(): HashSet<String>
{
return HashSet(redisTemplate.opsForSet().members(CACHE_KEYS_KEY))
}

override fun clear()
{
redisTemplate.delete(CACHE_KEYS_KEY)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package au.kilemon.messagequeue.queue.cache.redis
import au.kilemon.messagequeue.logging.HasLogger
import au.kilemon.messagequeue.message.QueueMessage
import au.kilemon.messagequeue.queue.MultiQueue
import au.kilemon.messagequeue.queue.cache.CacheMultiQueue
import au.kilemon.messagequeue.queue.exception.IllegalSubQueueIdentifierException
import au.kilemon.messagequeue.queue.exception.MessageUpdateException
import au.kilemon.messagequeue.settings.MessageQueueSettings
import org.slf4j.Logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.core.ScanOptions
import java.util.Optional
import java.util.Queue

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.stream.Collectors
import kotlin.collections.HashSet
import kotlin.jvm.Throws

/**
* A `Redis` specific implementation of the [MultiQueue].
Expand All @@ -22,59 +25,24 @@ import kotlin.collections.HashSet
*
* @author github.com/Kilemonn
*/
class RedisMultiQueue(private val prefix: String = "", private val redisTemplate: RedisTemplate<String, QueueMessage>) : MultiQueue(), HasLogger
class RedisMultiQueue(private val prefix: String) : MultiQueue(), HasLogger, CacheMultiQueue
{
override val LOG: Logger = this.initialiseLogger()

/**
* Append the [MessageQueueSettings.redisPrefix] to the provided [subQueue] [String].
*
* @param subQueue the [String] to add the prefix to
* @return a [String] with the provided [subQueue] with the [MessageQueueSettings.redisPrefix] appended to the beginning.
*/
private fun appendPrefix(subQueue: String): String
{
if (hasPrefix() && !subQueue.startsWith(getPrefix()))
{
return "${getPrefix()}$subQueue"
}
return subQueue
}
@Autowired
private lateinit var redisTemplate: RedisTemplate<String, QueueMessage>

/**
* @return whether the [prefix] is [String.isNotBlank]
*/
internal fun hasPrefix(): Boolean
{
return getPrefix().isNotBlank()
}
@Autowired
private lateinit var cacheKeyManager: RedisCacheKeyManager

/**
* @return [prefix]
*/
internal fun getPrefix(): String
override fun getPrefix(): String
{
return prefix
}

/**
* If [prefix] is set, removes this from all provided [keys].
* If [prefix] is null or blank, then the provided [keys] [Set] is immediately returned.
*
* @param keys the [Set] of [String] to remove the [prefix] from
* @return the updated [Set] of [String] with the [prefix] removed
*/
fun removePrefix(keys: Set<String>): Set<String>
{
if (!hasPrefix())
{
return keys
}

val prefixLength = getPrefix().length
return keys.stream().filter { key -> key.startsWith(getPrefix()) }.map { key -> key.substring(prefixLength) }.collect(Collectors.toSet())
}

/**
* Attempts to append the prefix before requesting the underlying redis entry if the provided [subQueue] is not prefixed with [MessageQueueSettings.redisPrefix].
*/
Expand Down Expand Up @@ -125,8 +93,16 @@ class RedisMultiQueue(private val prefix: String = "", private val redisTemplate
return Optional.empty()
}

@Throws(IllegalSubQueueIdentifierException::class)
override fun addInternal(element: QueueMessage): Boolean
{
if (cacheKeyManager.getReservedKeys().contains(element.subQueue)
|| cacheKeyManager.getReservedKeys().contains(appendPrefix(element.subQueue)))
{
throw IllegalSubQueueIdentifierException(element.subQueue)
}

cacheKeyManager.add(appendPrefix(element.subQueue))
val result = redisTemplate.opsForSet().add(appendPrefix(element.subQueue), element)
return result != null && result > 0
}
Expand All @@ -151,6 +127,7 @@ class RedisMultiQueue(private val prefix: String = "", private val redisTemplate
{
LOG.debug("Attempting to clear non-existent sub-queue [{}]. No messages cleared.", subQueue)
}
cacheKeyManager.remove(appendPrefix(subQueue))
return amountRemoved
}

Expand All @@ -171,10 +148,7 @@ class RedisMultiQueue(private val prefix: String = "", private val redisTemplate

override fun keysInternal(includeEmpty: Boolean): HashSet<String>
{
val scanOptions = ScanOptions.scanOptions().match(appendPrefix("*")).build()
val cursor = redisTemplate.scan(scanOptions)
val keys = HashSet<String>()
cursor.forEach { element -> keys.add(element) }
val keys = cacheKeyManager.getKeys()
if (includeEmpty)
{
LOG.debug("Including all empty queue keys in call to keys(). Total queue keys [{}].", keys.size)
Expand Down
Loading
Loading