Skip to content

Commit

Permalink
WIP Imemorykafka
Browse files Browse the repository at this point in the history
Signed-off-by: mineme0110 <shailesh.patil@iohk.io>
  • Loading branch information
mineme0110 committed Jul 15, 2024
1 parent 47ed8c1 commit 3ab832a
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.hyperledger.identus.iam.wallet.http.controller.WalletManagementContro
import org.hyperledger.identus.issue.controller.IssueControllerImpl
import org.hyperledger.identus.mercury.*
import org.hyperledger.identus.messaging.kafka.{ZKafkaMessagingServiceImpl, ZKafkaProducerImpl}
import org.hyperledger.identus.messaging.kafka.InMemoryMessagingService
import org.hyperledger.identus.messaging.ByteArrayWrapper
import org.hyperledger.identus.oid4vci.controller.CredentialIssuerControllerImpl
import org.hyperledger.identus.oid4vci.service.OIDCCredentialIssuerServiceImpl
Expand Down Expand Up @@ -223,9 +224,12 @@ object MainApp extends ZIOAppDefault {
// HTTP client
SystemModule.zioHttpClientLayer,
Scope.default,
ZKafkaMessagingServiceImpl.layer(List("kafka:9092")),
ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId],
ZKafkaProducerImpl.layer[ByteArrayWrapper, ByteArrayWrapper]
// ZKafkaMessagingServiceImpl.layer(List("kafka:9092")),
// ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId],
// ZKafkaProducerImpl.layer[ByteArrayWrapper, ByteArrayWrapper]
InMemoryMessagingService.messagingServiceLayer,
InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId],
InMemoryMessagingService.producerLayer[ByteArrayWrapper, ByteArrayWrapper]
)
} yield app

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package org.hyperledger.identus.messaging.kafka

import org.hyperledger.identus.messaging._
import zio._
import zio.concurrent.ConcurrentMap
import zio.stream._
import zio.Clock
import zio.Task
import InMemoryMessagingService.*

import java.util.concurrent.TimeUnit

class InMemoryMessagingService(
queueMap: ConcurrentMap[String, (Queue[Message[_, _]], Ref[Offset])],
queueCapacity: Int,
consumerGroups: ConcurrentMap[String, ConcurrentMap[Offset, TimeStamp]] // Track processed messages by groupId
) extends MessagingService {

override def makeConsumer[K, V](groupId: String)(using kSerde: Serde[K], vSerde: Serde[V]): Task[Consumer[K, V]] = {
for {
processedMessages <- consumerGroups.get(groupId).flatMap {
case Some(map) => ZIO.succeed(map)
case None =>
for {
newMap <- ConcurrentMap.empty[Offset, TimeStamp]
_ <- consumerGroups.put(groupId, newMap)
} yield newMap
}
} yield new InMemoryConsumer[K, V](groupId, queueMap, processedMessages)
}

override def makeProducer[K, V]()(using kSerde: Serde[K], vSerde: Serde[V]): Task[Producer[K, V]] =
ZIO.succeed(new InMemoryProducer[K, V](queueMap, queueCapacity))
}

class InMemoryConsumer[K, V](
groupId: String,
queueMap: ConcurrentMap[String, (Queue[Message[_, _]], Ref[Offset])],
processedMessages: ConcurrentMap[Offset, TimeStamp]
) extends Consumer[K, V] {
override def consume[HR](topic: String, topics: String*)(handler: Message[K, V] => URIO[HR, Unit]): RIO[HR, Unit] = {
val allTopics = topic +: topics

def getQueueStream(topic: String): ZStream[Any, Nothing, Message[K, V]] =
ZStream.repeatZIO {
queueMap.get(topic).flatMap {
case Some((queue, _)) =>
ZIO.debug(s"Connected to queue for topic $topic in group $groupId") *>
ZIO.succeed(ZStream.fromQueue(queue).collect { case msg: Message[K, V] => msg })
case None =>
ZIO.debug(s"Waiting to connect to queue for topic $topic in group $groupId, retrying...") *>
ZIO.sleep(1.second) *> ZIO.succeed(ZStream.empty)
}
}.flatten

val streams = allTopics.map(getQueueStream)
ZStream
.mergeAllUnbounded()(streams: _*)
.tap(msg => ZIO.log(s"Processing message in group $groupId: $msg"))
.filterZIO { msg =>
for {
currentTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
isNew <- processedMessages
.putIfAbsent(Offset(msg.offset), TimeStamp(currentTime))
.map(_.isEmpty) // Store the current timestamp
} yield isNew
} // Ensures message is processed only once
.mapZIO(handler)
.runDrain
}
}

class InMemoryProducer[K, V](
queueMap: ConcurrentMap[String, (Queue[Message[_, _]], Ref[Offset])],
queueCapacity: Int
) extends Producer[K, V] {
override def produce(topic: String, key: K, value: V): Task[Unit] = for {
queueAndIdRef <- queueMap.get(topic).flatMap {
case Some(qAndIdRef) => ZIO.succeed(qAndIdRef)
case None =>
for {
newQueue <- Queue.sliding[Message[_, _]](queueCapacity)
newIdRef <- Ref.make(Offset(0L))
_ <- queueMap.put(topic, (newQueue, newIdRef))
} yield (newQueue, newIdRef)
}
(queue, idRef) = queueAndIdRef
currentTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
messageId <- idRef.updateAndGet(x => Offset(x.value + 1)) // unique atomic id incremented per topic
_ <- queue.offer(Message(key, value, messageId.value, currentTime))
_ <- ZIO.debug(s"Message offered to queue: $topic with ID: $messageId")
} yield ()
}

object InMemoryMessagingService {

opaque type Offset = Long
object Offset:
def apply(value: Long): Offset = value
extension (id: Offset) def value: Long = id

opaque type TimeStamp = Long
object TimeStamp:
def apply(value: Long): TimeStamp = value
extension (ts: TimeStamp) def value: Long = ts

val messagingServiceLayer: ULayer[MessagingService] =
ZLayer.fromZIO {
for {
queueMap <- ConcurrentMap.empty[String, (Queue[Message[_, _]], Ref[Offset])]
consumerGroups <- ConcurrentMap.empty[String, ConcurrentMap[Offset, TimeStamp]]
queueCapacity = 100 // queue capacity
_ <- cleanupTaskForProcessedMessages(consumerGroups)
} yield new InMemoryMessagingService(queueMap, queueCapacity, consumerGroups)
}

def producerLayer[K: EnvironmentTag, V: EnvironmentTag](using
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Producer[K, V]] =
ZLayer.fromZIO {
for {
messagingService <- ZIO.service[MessagingService]
producer <- messagingService.makeProducer[K, V]()
} yield producer
}

def consumerLayer[K: EnvironmentTag, V: EnvironmentTag](
groupId: String
)(using kSerde: Serde[K], vSerde: Serde[V]): RLayer[MessagingService, Consumer[K, V]] =
ZLayer.fromZIO {
for {
messagingService <- ZIO.service[MessagingService]
consumer <- messagingService.makeConsumer[K, V](groupId)
} yield consumer
}

private def cleanupTaskForProcessedMessages(
consumerGroups: ConcurrentMap[String, ConcurrentMap[Offset, TimeStamp]],
maxAge: Duration = 60.minutes // Maximum age for entries
): UIO[Unit] = {
def cleanupOldEntries(map: ConcurrentMap[Offset, TimeStamp]): UIO[Unit] = for {
currentTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
entries <- map.toList
_ <- ZIO.foreachDiscard(entries) { case (key, timestamp) =>
if (currentTime - timestamp > maxAge.toMillis)
map.remove(key) *> ZIO.log(s"Removed old entry with key: $key and timestamp: $timestamp")
else
ZIO.unit
}
} yield ()

(for {
entries <- consumerGroups.toList
_ <- ZIO.foreachDiscard(entries) { case (groupId, map) =>
ZIO.log(s"Cleaning up entries for group: $groupId") *> cleanupOldEntries(map)
}
} yield ())
.repeat(Schedule.spaced(10.minutes))
.fork
.unit
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.hyperledger.identus.messaging.kafka

import org.hyperledger.identus.messaging.*
import zio.*
import zio.test.*
import zio.test.Assertion.*

object InMemoryMessagingServiceSpec extends ZIOSpecDefault {
val testLayer = InMemoryMessagingService.messagingServiceLayer >+>
InMemoryMessagingService.producerLayer[String, String] >+>
InMemoryMessagingService.consumerLayer[String, String]("test-group")

def spec = suite("InMemoryMessagingServiceSpec")(
test("should produce and consume messages") {

val key = "key"
val value = "value"
val topic = "test-topic"
for {
producer <- ZIO.service[Producer[String, String]]
consumer <- ZIO.service[Consumer[String, String]]
promise <- Promise.make[Nothing, Message[String, String]]
_ <- producer.produce(topic, key, value)
_ <- consumer
.consume(topic) { msg =>
promise.succeed(msg).unit
}
.fork
receivedMessage <- promise.await
} yield assert(receivedMessage)(equalTo(Message(key, value, 1L, 0)))
}.provideLayer(testLayer),
test("should produce and consume 5 messages") {
val topic = "test-topic"
val messages = List(
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4"),
("key5", "value5")
)

for {
producer <- ZIO.service[Producer[String, String]]
consumer <- ZIO.service[Consumer[String, String]]
promise <- Promise.make[Nothing, List[Message[String, String]]]
ref <- Ref.make(List.empty[Message[String, String]])

_ <- ZIO.foreach(messages) { case (key, value) =>
producer.produce(topic, key, value) *> ZIO.debug(s"Produced message: $key -> $value")
}
_ <- consumer
.consume(topic) { msg =>
ZIO.debug(s"Consumed message: ${msg.key} -> ${msg.value}") *>
ref.update(_ :+ msg) *> ref.get.flatMap { msgs =>
if (msgs.size == messages.size) promise.succeed(msgs).unit else ZIO.unit
}
}
.fork
receivedMessages <- promise.await
_ <- ZIO.debug(s"Received messages: ${receivedMessages.map(m => (m.key, m.value))}")
} yield assert(receivedMessages.map(m => (m.key, m.value)).sorted)(
equalTo(messages.sorted)
)
}.provideLayer(testLayer),
)
}
100 changes: 50 additions & 50 deletions infrastructure/shared/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ services:
condition: service_started
vault-server:
condition: service_healthy
kafka:
condition: service_healthy
# kafka:
# condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://cloud-agent:8085/_system/health"]
interval: 30s
Expand All @@ -145,54 +145,54 @@ services:
- cloud-agent
- swagger-ui

zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# ports:
# - 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
# ports:
# - 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
healthcheck:
test: [ "CMD", "kafka-topics", "--list", "--bootstrap-server", "localhost:9092" ]
interval: 5s
timeout: 10s
retries: 5

init-kafka:
image: confluentinc/cp-kafka:latest
depends_on:
kafka:
condition: service_healthy
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-3 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-4 --replication-factor 1 --partitions 5
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-DLQ --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:9092 --list
"
# zookeeper:
# image: confluentinc/cp-zookeeper:latest
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
## ports:
## - 22181:2181
#
# kafka:
# image: confluentinc/cp-kafka:latest
# depends_on:
# - zookeeper
## ports:
## - 29092:29092
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
# healthcheck:
# test: [ "CMD", "kafka-topics", "--list", "--bootstrap-server", "localhost:9092" ]
# interval: 5s
# timeout: 10s
# retries: 5
#
# init-kafka:
# image: confluentinc/cp-kafka:latest
# depends_on:
# kafka:
# condition: service_healthy
# entrypoint: [ '/bin/sh', '-c' ]
# command: |
# "
# # blocks until kafka is reachable
# kafka-topics --bootstrap-server kafka:9092 --list
# echo -e 'Creating kafka topics'
# kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect --replication-factor 1 --partitions 5
# kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-1 --replication-factor 1 --partitions 5
# kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-2 --replication-factor 1 --partitions 5
# kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-3 --replication-factor 1 --partitions 5
# kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-retry-4 --replication-factor 1 --partitions 5
# kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect-DLQ --replication-factor 1 --partitions 1
# echo -e 'Successfully created the following topics:'
# kafka-topics --bootstrap-server kafka:9092 --list
# "

volumes:
pg_data_db:
Expand Down

0 comments on commit 3ab832a

Please sign in to comment.