Skip to content

Commit

Permalink
Merge branch 'main' into docs/oidvci-contract
Browse files Browse the repository at this point in the history
  • Loading branch information
patlo-iog committed Jul 16, 2024
2 parents d1aa9a1 + d22745f commit 587b322
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object PresentBackgroundJobs extends BackgroundJobsHelper {
private type MESSAGING_RESOURCES = DidOps & DIDResolver & HttpClient

val presentProofExchanges: ZIO[RESOURCES, Throwable, Unit] = {
val presentProofDidComExchange = for {
for {
presentationService <- ZIO.service[PresentationService]
config <- ZIO.service[AppConfig]
records <- presentationService
Expand All @@ -72,7 +72,6 @@ object PresentBackgroundJobs extends BackgroundJobsHelper {
.foreachPar(records)(performPresentProofExchange)
.withParallelism(config.pollux.presentationBgJobProcessingParallelism)
} yield ()
presentProofDidComExchange
}

private def counterMetric(key: String) = Metric
Expand All @@ -81,16 +80,19 @@ object PresentBackgroundJobs extends BackgroundJobsHelper {

private def performPresentProofExchange(record: PresentationRecord): URIO[RESOURCES, Unit] =
aux(record)
.tapError({
(error: PresentationError | DIDSecretStorageError | BackgroundJobError | CredentialServiceError |
CastorDIDResolutionError | GetManagedDIDError | Failure) =>
ZIO.logErrorCause(
s"Present Proof - Error processing record: ${record.id}",
Cause.fail(error)
)
})
.catchAll(e => ZIO.logErrorCause(s"Present Proof - Error processing record: ${record.id} ", Cause.fail(e)))
.catchAllDefect(d => ZIO.logErrorCause(s"Present Proof - Defect processing record: ${record.id}", Cause.fail(d)))
.catchAll {
case ex: Failure =>
ZIO
.service[PresentationService]
.flatMap(_.reportProcessingFailure(record.id, Some(ex)))
.catchAll(ex =>
ZIO.logErrorCause(s"PresentBackgroundJobs - Fail to recover from ${record.id}", Cause.fail(ex))
)
case ex => ZIO.logErrorCause(s"PresentBackgroundJobs - Error processing record: ${record.id}", Cause.fail(ex))
}
.catchAllDefect(d =>
ZIO.logErrorCause(s"PresentBackgroundJobs - Defect processing record: ${record.id}", Cause.fail(d))
)

private def aux(record: PresentationRecord): ZIO[RESOURCES, ERROR, Unit] = {
import org.hyperledger.identus.pollux.core.model.PresentationRecord.ProtocolState.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ trait PresentationRepository {
def updateAfterFail(
recordId: DidCommID,
failReason: Option[Failure]
): URIO[WalletAccessContext, Unit]
): UIO[Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,6 @@ trait PresentationService {
def reportProcessingFailure(
recordId: DidCommID,
failReason: Option[Failure]
): ZIO[WalletAccessContext, PresentationError, Unit]
): IO[PresentationError, Unit]

}
Original file line number Diff line number Diff line change
Expand Up @@ -1098,14 +1098,11 @@ private class PresentationServiceImpl(
} yield result
}

def reportProcessingFailure(
override def reportProcessingFailure(
recordId: DidCommID,
failReason: Option[Failure]
): ZIO[WalletAccessContext, PresentationError, Unit] =
for {
_ <- getRecord(recordId)
result <- presentationRepository.updateAfterFail(recordId, failReason)
} yield result
): IO[PresentationError, Unit] =
presentationRepository.updateAfterFail(recordId, failReason)

private def getRecordFromThreadId(
thid: DidCommID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class PresentationServiceNotifier(
override def reportProcessingFailure(
recordId: DidCommID,
failReason: Option[Failure]
): ZIO[WalletAccessContext, PresentationError, Unit] = svc.reportProcessingFailure(recordId, failReason)
): IO[PresentationError, Unit] = svc.reportProcessingFailure(recordId, failReason)
}

object PresentationServiceNotifier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ class PresentationRepositoryInMemory(
}(ZIO.succeed)
} yield walletRef

private def anyWalletStoreRefBy(
recordId: DidCommID
): ZIO[Any, Nothing, Option[Ref[Map[DidCommID, PresentationRecord]]]] = {
for {
refs <- walletRefs.get
// walletsNoRef <- ZIO.foreach(refs)({ case (wID, ref) => ref.get.map(r => (wID, r)) })
tmp <- ZIO.foreach(refs)({ case (wID, ref) => ref.get.map(r => (ref, r)) })
walletRef = tmp.find(e => e._2.keySet.contains(recordId)).map(_._1)
} yield walletRef
}

override def createPresentationRecord(record: PresentationRecord): URIO[WalletAccessContext, Unit] = {
val result =
for {
Expand Down Expand Up @@ -324,35 +335,40 @@ class PresentationRepositoryInMemory(
result.ensureOneAffectedRowOrDie
}

// def updateAfterFailX(
// recordId: DidCommID,
// failReason: Option[Failure]
// ): URIO[WalletAccessContext, Unit] = {
override def updateAfterFail(
recordId: DidCommID,
failReason: Option[Failure]
): URIO[WalletAccessContext, Unit] = {
val result =
for {
storeRef <- walletStoreRef
maybeRecord <- findPresentationRecord(recordId)
count <- maybeRecord
.map(record =>
for {
_ <- storeRef.update(r =>
r.updated(
recordId,
record.copy(
metaRetries = math.max(0, record.metaRetries - 1),
metaNextRetry =
if (record.metaRetries - 1 <= 0) None
else Some(Instant.now().plusSeconds(60)), // TODO exponention time
metaLastFailure = failReason
): UIO[Unit] =
anyWalletStoreRefBy(recordId).flatMap { mStoreRef =>
mStoreRef match
case None => ZIO.succeed(0)
case Some(storeRef) =>
for {
maybeRecord <- storeRef.get.map(store => store.get(recordId))
count <- maybeRecord
.map(record =>
for {
_ <- storeRef.update(r =>
r.updated(
recordId,
record.copy(
metaRetries = math.max(0, record.metaRetries - 1),
metaNextRetry =
if (record.metaRetries - 1 <= 0) None
else Some(Instant.now().plusSeconds(60)), // TODO exponention time
metaLastFailure = failReason
)
)
)
)
} yield 1
)
} yield 1
)
.getOrElse(ZIO.succeed(0))
} yield count
result.ensureOneAffectedRowOrDie
}
.getOrElse(ZIO.succeed(0))
} yield count
}.ensureOneAffectedRowOrDie
}

object PresentationRepositoryInMemory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class JdbcCredentialRepository(xa: Transactor[ContextAwareTask], xb: Transactor[
.ensureOneAffectedRowOrDie
}

def updateAfterFail(
override def updateAfterFail(
recordId: DidCommID,
failReason: Option[Failure]
): URIO[WalletAccessContext, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,10 @@ class JdbcPresentationRepository(
.ensureOneAffectedRowOrDie
}

def updateAfterFail(
override def updateAfterFail(
recordId: DidCommID,
failReason: Option[Failure]
): URIO[WalletAccessContext, Unit] = {
): UIO[Unit] = {
val cxnIO = sql"""
| UPDATE public.presentation_records
| SET
Expand All @@ -503,7 +503,7 @@ class JdbcPresentationRepository(
| id = $recordId
""".stripMargin.update
cxnIO.run
.transactWallet(xa)
.transact(xb)
.orDie
.ensureOneAffectedRowOrDie
}
Expand Down

0 comments on commit 587b322

Please sign in to comment.