Skip to content

Commit

Permalink
feat(prism-agent): fix DB connection pool duplication issue by provid…
Browse files Browse the repository at this point in the history
…ing ZLayer globally (#256)
  • Loading branch information
bvoiturier committed Dec 14, 2022
1 parent ea10db5 commit 4424de1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 83 deletions.
Expand Up @@ -11,26 +11,41 @@ import io.iohk.atala.agent.server.sql.{Migrations => AgentMigrations}
import io.iohk.atala.agent.walletapi.service.ManagedDIDService
import io.iohk.atala.resolvers.DIDResolver
import io.iohk.atala.agent.server.http.ZioHttpClient
import org.flywaydb.core.extensibility.AppliedMigration
import io.iohk.atala.pollux.service.SchemaRegistryServiceInMemory
import io.iohk.atala.pollux.service.VerificationPolicyServiceInMemory

object Main extends ZIOAppDefault {
def agentLayer(peer: PeerDID): ZLayer[Any, Nothing, AgentServiceAny] =
ZLayer.succeed(
io.iohk.atala.mercury.AgentServiceAny(
new DIDComm(UniversalDidResolver, peer.getSecretResolverInMemory),
peer.did
)
)

private def createAndStorePeerDID(serviceEndpoint: String): RIO[ManagedDIDService, PeerDID] = {
def didCommAgentLayer(didCommServiceUrl: String) = ZLayer(
for {
managedDIDService <- ZIO.service[ManagedDIDService]
peerDID <- managedDIDService.createAndStorePeerDID(serviceEndpoint)
peerDID <- managedDIDService.createAndStorePeerDID(didCommServiceUrl)
_ <- ZIO.logInfo(s"New DID: ${peerDID.did}")
} yield peerDID
}

override def run: ZIO[Any, Throwable, Unit] =
for {
} yield io.iohk.atala.mercury.AgentServiceAny(
new DIDComm(UniversalDidResolver, peerDID.getSecretResolverInMemory),
peerDID.did
)
)

val migrations = for {
_ <- ZIO.serviceWithZIO[PolluxMigrations](_.migrate)
_ <- ZIO.serviceWithZIO[ConnectMigrations](_.migrate)
_ <- ZIO.serviceWithZIO[AgentMigrations](_.migrate)
} yield ()

def appComponents(didCommServicePort: Int, restServicePort: Int) = for {
_ <- Modules.didCommExchangesJob.debug.fork
_ <- Modules.presentProofExchangeJob.debug.fork
_ <- Modules.connectDidCommExchangesJob.debug.fork
_ <- Modules.didCommServiceEndpoint(didCommServicePort).debug.fork
_ <- Modules.app(restServicePort).fork
_ <- Modules.zioApp.fork
_ <- ZIO.never
} yield ()

override def run: ZIO[Any, Throwable, Unit] = {
val app = for {
_ <- Console
.printLine("""
|██████╗ ██████╗ ██╗███████╗███╗ ███╗
Expand Down Expand Up @@ -67,56 +82,29 @@ object Main extends ZIOAppDefault {
}
_ <- ZIO.logInfo(s"DIDComm Service port => $didCommServicePort")

// Execute migrations from Castor and Pollux libraries using Flyway
_ <- ZIO
.serviceWithZIO[PolluxMigrations](_.migrate)
.provide(RepoModule.polluxDbConfigLayer >>> PolluxMigrations.layer)
_ <- ZIO
.serviceWithZIO[ConnectMigrations](_.migrate)
.provide(RepoModule.connectDbConfigLayer >>> ConnectMigrations.layer)
_ <- ZIO
.serviceWithZIO[AgentMigrations](_.migrate)
.provide(RepoModule.agentDbConfigLayer >>> AgentMigrations.layer)

peerDID <- createAndStorePeerDID(didCommServiceUrl)
.provide(AppModule.manageDIDServiceLayer)

didCommLayer = agentLayer(peerDID)

didCommExchangesFiber <- Modules.didCommExchangesJob
.provide(didCommLayer, DIDResolver.layer, ZioHttpClient.layer)
.debug
.fork

presentProofDidCommExchangesFiber <- Modules.presentProofExchangeJob
.provide(didCommLayer, DIDResolver.layer, ZioHttpClient.layer)
.debug
.fork

connectDidCommExchangesFiber <- Modules.connectDidCommExchangesJob
.provide(didCommLayer, DIDResolver.layer, ZioHttpClient.layer)
.debug
.fork

didCommServiceFiber <- Modules
.didCommServiceEndpoint(didCommServicePort)
.provide(
didCommLayer,
AppModule.credentialServiceLayer,
AppModule.presentationServiceLayer,
AppModule.connectionServiceLayer,
AppModule.manageDIDServiceLayer
)
.debug
.fork

_ <- Modules
.app(restServicePort)
.provide(didCommLayer, AppModule.manageDIDServiceLayer, SystemModule.configLayer)
.fork
_ <- migrations

app <- appComponents(didCommServicePort, restServicePort).provide(
didCommAgentLayer(didCommServiceUrl),
DIDResolver.layer,
ZioHttpClient.layer,
AppModule.credentialServiceLayer,
AppModule.presentationServiceLayer,
AppModule.connectionServiceLayer,
SystemModule.configLayer,
SystemModule.actorSystemLayer,
HttpModule.layers,
SchemaRegistryServiceInMemory.layer,
VerificationPolicyServiceInMemory.layer,
AppModule.manageDIDServiceLayer
)
} yield app

_ <- Modules.zioApp.fork
_ <- ZIO.never
} yield ()
app.provide(
RepoModule.polluxDbConfigLayer >>> PolluxMigrations.layer,
RepoModule.connectDbConfigLayer >>> ConnectMigrations.layer,
RepoModule.agentDbConfigLayer >>> AgentMigrations.layer,
)
}

}
Expand Up @@ -5,12 +5,16 @@ import io.iohk.atala.mercury.*
import io.iohk.atala.resolvers.UniversalDidResolver
import org.didcommx.didcomm.DIDComm
import zio.*
import io.iohk.atala.pollux.service.SchemaRegistryServiceInMemory
import io.iohk.atala.pollux.service.VerificationPolicyServiceInMemory

object MainInMemory extends ZIOAppDefault {

override def run: ZIO[Any, Throwable, Unit] =
for {
_ <- Modules.zioApp.fork
_ <- Modules.zioApp
.provide(SchemaRegistryServiceInMemory.layer, VerificationPolicyServiceInMemory.layer, SystemModule.configLayer)
.fork
_ <- ZIO.never
} yield ()

Expand Down
Expand Up @@ -93,15 +93,17 @@ import io.iohk.atala.resolvers.DIDResolver

object Modules {

def app(port: Int): RIO[DidComm & ManagedDIDService & AppConfig, Unit] = {
def app(port: Int): RIO[
DidComm & ManagedDIDService & AppConfig & DIDRegistrarApi & IssueCredentialsProtocolApi & ConnectionsManagementApi &
DIDApi & DIDAuthenticationApi & PresentProofApi & ActorSystem[Nothing],
Unit
] = {
val httpServerApp = HttpRoutes.routes.flatMap(HttpServer.start(port, _))

httpServerApp
.provideLayer(SystemModule.actorSystemLayer ++ HttpModule.layers)
.unit
httpServerApp.unit
}

lazy val zioApp = {
lazy val zioApp: RIO[SchemaRegistryServiceInMemory & VerificationPolicyServiceInMemory & AppConfig, Unit] = {
val zioHttpServerApp = for {
allSchemaRegistryEndpoints <- SchemaRegistryServerEndpoints.all
allVerificationPolicyEndpoints <- VerificationPolicyServerEndpoints.all
Expand All @@ -112,11 +114,7 @@ object Modules {
httpServer <- ZHttp4sBlazeServer.start(allEndpoints, port = appConfig.agent.httpEndpoint.http.port)
} yield httpServer

zioHttpServerApp
.provideLayer(
SchemaRegistryServiceInMemory.layer ++ VerificationPolicyServiceInMemory.layer ++ SystemModule.configLayer
)
.unit
zioHttpServerApp.unit
}

def didCommServiceEndpoint(port: Int) = {
Expand Down Expand Up @@ -154,23 +152,21 @@ object Modules {
Server.start(port, app)
}

val didCommExchangesJob: RIO[DidComm & DIDResolver & HttpClient, Unit] =
val didCommExchangesJob: RIO[DidComm & DIDResolver & HttpClient & CredentialService & ManagedDIDService, Unit] =
BackgroundJobs.didCommExchanges
.repeat(Schedule.spaced(10.seconds))
.unit
.provideSomeLayer(AppModule.credentialServiceLayer ++ AppModule.manageDIDServiceLayer)

val presentProofExchangeJob: RIO[DidComm & DIDResolver & HttpClient, Unit] =
val presentProofExchangeJob: RIO[DidComm & DIDResolver & HttpClient & PresentationService, Unit] =
BackgroundJobs.presentProofExchanges
.repeat(Schedule.spaced(10.seconds))
.unit
.provideSomeLayer(AppModule.presentationServiceLayer)

val connectDidCommExchangesJob: RIO[DidComm & DIDResolver & HttpClient, Unit] =
val connectDidCommExchangesJob
: RIO[DidComm & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] =
ConnectBackgroundJobs.didCommExchanges
.repeat(Schedule.spaced(10.seconds))
.unit
.provideSomeLayer(AppModule.connectionServiceLayer ++ AppModule.manageDIDServiceLayer)

private[this] def extractFirstRecipientDid(jsonMessage: String): IO[ParsingFailure | DecodingFailure, String] = {
import io.circe._, io.circe.parser._
Expand Down Expand Up @@ -369,9 +365,8 @@ object Modules {
}
}

val publishCredentialsToDltJob: RIO[DidComm, Unit] = {
val publishCredentialsToDltJob: RIO[DidComm & CredentialService, Unit] = {
val effect = BackgroundJobs.publishCredentialsToDlt
.provideLayer(AppModule.credentialServiceLayer)
(effect repeat Schedule.spaced(1.seconds)).unit
}

Expand Down

0 comments on commit 4424de1

Please sign in to comment.