Skip to content

Commit

Permalink
feat(prism-agent): simple event mechanism using webhook (#575)
Browse files Browse the repository at this point in the history
* feat(prism-agent): introduce webhook publisher config

* feat(prism-agent): add necessary event notification interfaces

* feat(prism-agent): add event notification service 'in-memory' implementation

* feat(prism-agent): implement dummy webhook publisher consumer

* feat(prism-agent): add notification service calls for issue credential flow (DIDComm sender side only)

* feat(prism-agent): wire notification service and webhook publisher in app entry point

* feat(prism-agent): add fake webhook URL config param

* feat(prism-agent): notify when credential is generated

* feat(prism-agent): notify issue credential steps occuring on the DIDComm receiver side

* feat(prism-agent): extract event notification classes to a new shared project

* feat(pollux): add credential service subclass that notifies of events

* feat(pollux): use global ZIO http client layer in HttpURIDereferencerImpl

* feat(prism-agent): implement real HTTP calls in Webhook publisher

* feat(prism-agent): include WEBHOOK_API_KEY in request Authorization header when provided

* chore(prism-agent): declare WALLET_SEED and WEBHOOK_API_KEY in docker compose config

* chore(prism-agent): complete main branch merging

* feature(prism-agent): refactor event notification and make it more generic

* feature(prism-agent): add connect & presentation consumers in Webhook publisher

* chore(prism-agent): use postgres secret storage implementation in default docker-compose config

* feat(prism-agent): add event notification sending for connect flow

* chore(pollux): use RecordIdNotFound in error channel instead of an Option return value, in presentation service methods

* feat(prism-agent): implement event notifiation sending in presentation protocol

* test(pollux): fix unit test related to presentation rejection

* chore(prism-agent): get rid of event encoder/decoder (useless for now)

* chore(pollux): rename event notification service implementation

* test(prism-agent): make capacity of event queue configurable and add unit tests

* chore(prism-agent): add event notification service dependency to wallet api

* feat(prism-agent): add new 'DIDState' topic and notify of DID published events

* feat(prism-agent): consume new 'DIDState' topic events (only DID published for now)

* chore(prism-agent): run scalafmt

* chore(prism-agent): scalafmtAll

* test(connect): add unit tests for connection service with event notif

* test(pollux): add unit tests for credential service with event notif

* test(pollux): move presentation service spec utility methods to helper trait

* chore(prism-agent): scalafmtAll

* chore(pollux): fix method typo

* test(pollux): add unit tests for presentation flow notifications, introducing ZIO Mock for PresentationService

* test(connect): refactor notifier unit tests to use ZIO mock for ConnectionService

* test(pollux): refactor credential service notifier unit tests to use ZIO mock for CredentialService

* fix(pollux): fix presentation accepted/rejected event not being sent

* chore(prism-agent): scalafmtAll

* test(pollux): fix unit tests

* feat: add Json serialization for events #1

* feat: add application/json header

* doc(prism-agent): update protocol state enums in OAS

* doc(prism-agent): add tutorial for simple event mechanism webhooks

* doc(prism-agent): update webhook tutorial

* doc(prism-agent): update Docusaurus sidebar for webhook

* feat: add event type field to all events

* chore(prism-agent): use ZIO ConcurrentMap to store notification queues

* chore(prism-agent): fixing last PR comments

* doc(prism-agent): document DID-related events in webhook.md

* fix(prism-agent): use ZIO sliding queue (discarding old messages) instead of a bounded one

* feat(prism-agent): use a 5 seconds request timeout in Webhook publisher

* test(prism-agent): fix unit test for sliding queue

* chore(prism-agent): fix URLs in webhook.md

---------

Co-authored-by: Yurii Shynbuiev <yurii.shynbuiev@iohk.io>
  • Loading branch information
bvoiturier and yshyn-iohk committed Jul 10, 2023
1 parent e036bc8 commit 42cf8c9
Show file tree
Hide file tree
Showing 41 changed files with 2,460 additions and 426 deletions.
40 changes: 34 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ lazy val D_Connect = new {

// Dependency Modules
private lazy val baseDependencies: Seq[ModuleID] =
Seq(D.zio, D.zioTest, D.zioTestSbt, D.zioTestMagnolia, D.testcontainersPostgres, logback)
Seq(D.zio, D.zioTest, D.zioTestSbt, D.zioTestMagnolia, D.zioMock, D.testcontainersPostgres, logback)

// Project Dependencies
lazy val coreDependencies: Seq[ModuleID] =
Expand Down Expand Up @@ -223,6 +223,7 @@ lazy val D_Pollux = new {
D.zioTest,
D.zioTestSbt,
D.zioTestMagnolia,
D.zioMock,
D.munit,
D.munitZio,
prismCrypto,
Expand Down Expand Up @@ -280,6 +281,17 @@ lazy val D_Pollux_VC_JWT = new {
lazy val polluxVcJwtDependencies: Seq[ModuleID] = baseDependencies
}

lazy val D_EventNotification = new {
val zio = "dev.zio" %% "zio" % V.zio
val zioConcurrent = "dev.zio" %% "zio-concurrent" % V.zio
val zioTest = "dev.zio" %% "zio-test" % V.zio % Test
val zioTestSbt = "dev.zio" %% "zio-test-sbt" % V.zio % Test
val zioTestMagnolia = "dev.zio" %% "zio-test-magnolia" % V.zio % Test

val zioDependencies: Seq[ModuleID] = Seq(zio, zioConcurrent, zioTest, zioTestSbt, zioTestMagnolia)
val baseDependencies: Seq[ModuleID] = zioDependencies
}

lazy val D_PrismAgent = new {

// Added here to make prism-crypto works.
Expand Down Expand Up @@ -632,7 +644,7 @@ lazy val polluxCore = project
)
.dependsOn(shared)
.dependsOn(polluxVcJWT)
.dependsOn(protocolIssueCredential, protocolPresentProof, resolver, agentDidcommx)
.dependsOn(protocolIssueCredential, protocolPresentProof, resolver, agentDidcommx, eventNotification)

lazy val polluxDoobie = project
.in(file("pollux/lib/sql-doobie"))
Expand Down Expand Up @@ -687,7 +699,7 @@ lazy val connectCore = project
Test / publishArtifact := true
)
.dependsOn(shared)
.dependsOn(protocolConnection, protocolReportProblem)
.dependsOn(protocolConnection, protocolReportProblem, eventNotification)

lazy val connectDoobie = project
.in(file("connect/lib/sql-doobie"))
Expand All @@ -699,6 +711,17 @@ lazy val connectDoobie = project
.dependsOn(shared)
.dependsOn(connectCore % "compile->compile;test->test")

// ############################
// #### Event Notification ####
// ############################

lazy val eventNotification = project
.in(file("event-notification"))
.settings(
name := "event-notification",
libraryDependencies ++= D_EventNotification.baseDependencies
)

// #####################
// #### Prism Agent ####
// #####################
Expand All @@ -711,8 +734,11 @@ lazy val prismAgentWalletAPI = project
name := "prism-agent-wallet-api",
libraryDependencies ++= D_PrismAgent.keyManagementDependencies
)
.dependsOn(agentDidcommx)
.dependsOn(castorCore)
.dependsOn(
agentDidcommx,
castorCore,
eventNotification
)

lazy val prismAgentServer = project
.in(file("prism-agent/service/server"))
Expand Down Expand Up @@ -741,7 +767,8 @@ lazy val prismAgentServer = project
polluxAnoncreds,
connectCore,
connectDoobie,
castorCore
castorCore,
eventNotification
)

// ##################
Expand Down Expand Up @@ -822,6 +849,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
prismAgentWalletAPI,
prismAgentServer,
mediator,
eventNotification,
)

lazy val root = project
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.iohk.atala.connect.core.service

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.event.notification.{Event, EventNotificationService}
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import zio.{IO, URLayer, ZIO, ZLayer}

import java.util.UUID

class ConnectionServiceNotifier(
svc: ConnectionService,
eventNotificationService: EventNotificationService
) extends ConnectionService {

private val connectionUpdatedEvent = "ConnectionUpdated"

override def createConnectionInvitation(
label: Option[String],
pairwiseDID: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.createConnectionInvitation(label, pairwiseDID))

override def receiveConnectionInvitation(invitation: String): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.receiveConnectionInvitation(invitation))

override def acceptConnectionInvitation(
recordId: UUID,
pairwiseDid: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.acceptConnectionInvitation(recordId, pairwiseDid))

override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.markConnectionRequestSent(recordId))

override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.receiveConnectionRequest(request))

override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.acceptConnectionRequest(recordId))

override def markConnectionResponseSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.markConnectionResponseSent(recordId))

override def receiveConnectionResponse(response: ConnectionResponse): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.receiveConnectionResponse(response))

private[this] def notifyOnSuccess(effect: IO[ConnectionServiceError, ConnectionRecord]) =
for {
record <- effect
_ <- notify(record)
} yield record

private[this] def notify(record: ConnectionRecord) = {
val result = for {
producer <- eventNotificationService.producer[ConnectionRecord]("Connect")
_ <- producer.send(Event(connectionUpdatedEvent, record))
} yield ()
result.catchAll(e => ZIO.logError(s"Notification service error: $e"))
}

override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] =
svc.getConnectionRecord(recordId)

override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] =
svc.getConnectionRecordByThreadId(thid)

override def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int] =
svc.deleteConnectionRecord(recordId)

override def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Unit] =
svc.reportProcessingFailure(recordId, failReason)

override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] =
svc.getConnectionRecords()

override def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): IO[ConnectionServiceError, Seq[ConnectionRecord]] =
svc.getConnectionRecordsByStates(ignoreWithZeroRetries, limit, states: _*)
}

object ConnectionServiceNotifier {
val layer: URLayer[ConnectionService & EventNotificationService, ConnectionService] =
ZLayer.fromFunction(ConnectionServiceNotifier(_, _))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.iohk.atala.connect.core.service

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import zio.mock.{Mock, Proxy}
import zio.{IO, URLayer, ZIO, ZLayer, mock}

import java.util.UUID

object MockConnectionService extends Mock[ConnectionService] {

object CreateConnectionInvitation extends Effect[(Option[String], DidId), ConnectionServiceError, ConnectionRecord]
object ReceiveConnectionInvitation extends Effect[String, ConnectionServiceError, ConnectionRecord]
object AcceptConnectionInvitation extends Effect[(UUID, DidId), ConnectionServiceError, ConnectionRecord]
object MarkConnectionRequestSent extends Effect[UUID, ConnectionServiceError, ConnectionRecord]
object ReceiveConnectionRequest extends Effect[ConnectionRequest, ConnectionServiceError, ConnectionRecord]
object AcceptConnectionRequest extends Effect[UUID, ConnectionServiceError, ConnectionRecord]
object MarkConnectionResponseSent extends Effect[UUID, ConnectionServiceError, ConnectionRecord]
object ReceiveConnectionResponse extends Effect[ConnectionResponse, ConnectionServiceError, ConnectionRecord]

override val compose: URLayer[mock.Proxy, ConnectionService] = ZLayer {
for {
proxy <- ZIO.service[Proxy]
} yield new ConnectionService {
override def createConnectionInvitation(
label: Option[String],
pairwiseDID: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
proxy(CreateConnectionInvitation, label, pairwiseDID)

override def receiveConnectionInvitation(invitation: String): IO[ConnectionServiceError, ConnectionRecord] =
proxy(ReceiveConnectionInvitation, invitation)

override def acceptConnectionInvitation(
recordId: UUID,
pairwiseDid: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
proxy(AcceptConnectionInvitation, recordId, pairwiseDid)

override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
proxy(MarkConnectionRequestSent, recordId)

override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] =
proxy(ReceiveConnectionRequest, request)

override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
proxy(AcceptConnectionRequest, recordId)

override def markConnectionResponseSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
proxy(MarkConnectionResponseSent, recordId)

override def receiveConnectionResponse(
response: ConnectionResponse
): IO[ConnectionServiceError, ConnectionRecord] =
proxy(ReceiveConnectionResponse, response)

override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ???

override def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ???

override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] = ???

override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] =
???

override def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int] = ???

override def reportProcessingFailure(
recordId: UUID,
failReason: Option[String]
): IO[ConnectionServiceError, Unit] = ???
}
}
}

0 comments on commit 42cf8c9

Please sign in to comment.