Skip to content

Commit

Permalink
fix(pollux): add pagination at db level for getCredentialRecords (#586)
Browse files Browse the repository at this point in the history
* fix(prism-agent): pagination at controller & service level

* fix(pollux): paginate at service and db level for CredentailRecords

* tests: fix failing tests

* tests: add pagination tests
  • Loading branch information
patlo-iog committed Jul 7, 2023
1 parent 9a97c7a commit c0db5c8
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import io.iohk.atala.pollux.core.model.IssueCredentialRecord.ProtocolState
trait CredentialRepository[F[_]] {
def createIssueCredentialRecord(record: IssueCredentialRecord): F[Int]
def getIssueCredentialRecords(
ignoreWithZeroRetries: Boolean = true
): F[Seq[IssueCredentialRecord]]
ignoreWithZeroRetries: Boolean = true,
offset: Option[Int] = None,
limit: Option[Int] = None
): F[(Seq[IssueCredentialRecord], Int)]
def getIssueCredentialRecord(recordId: DidCommID): F[Option[IssueCredentialRecord]]
def getIssueCredentialRecordsByStates(
ignoreWithZeroRetries: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ class CredentialRepositoryInMemory(

override def getIssueCredentialRecords(
ignoreWithZeroRetries: Boolean = true,
): Task[Seq[IssueCredentialRecord]] = {
offset: Option[Int],
limit: Option[Int]
): Task[(Seq[IssueCredentialRecord], Int)] = {
for {
store <- storeRef.get
} yield store.values.toSeq
paginated = store.values.toSeq.drop(offset.getOrElse(0)).take(limit.getOrElse(Int.MaxValue))
} yield paginated -> store.values.size
}

override def updateCredentialRecordProtocolState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ trait CredentialService {
issuingDID: Option[CanonicalPrismDID]
): IO[CredentialServiceError, IssueCredentialRecord]

def getIssueCredentialRecords: IO[CredentialServiceError, Seq[IssueCredentialRecord]]
/** Return a list of records as well as a count of all filtered items */
def getIssueCredentialRecords(
offset: Option[Int] = None,
limit: Option[Int] = None
): IO[CredentialServiceError, (Seq[IssueCredentialRecord], Int)]

def getIssueCredentialRecordsByStates(
ignoreWithZeroRetries: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ private class CredentialServiceImpl(
override def extractIdFromCredential(credential: W3cCredentialPayload): Option[DidCommID] =
credential.maybeId.map(_.split("/").last).map(DidCommID(_))

override def getIssueCredentialRecords: IO[CredentialServiceError, Seq[IssueCredentialRecord]] = {
override def getIssueCredentialRecords(
offset: Option[Int],
limit: Option[Int]
): IO[CredentialServiceError, (Seq[IssueCredentialRecord], Int)] = {
for {
records <- credentialRepository
.getIssueCredentialRecords()
.getIssueCredentialRecords(offset = offset, limit = limit)
.mapError(RepositoryError.apply)
} yield records
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,54 @@ object CredentialRepositorySpecSuite {
bRecord = issueCredentialRecord
_ <- repo.createIssueCredentialRecord(aRecord)
_ <- repo.createIssueCredentialRecord(bRecord)
records <- repo.getIssueCredentialRecords()
records <- repo.getIssueCredentialRecords().map(_._1)
} yield {
assertTrue(records.size == 2) &&
assertTrue(records.contains(aRecord)) &&
assertTrue(records.contains(bRecord))
}
},
test("getIssuanceCredentialRecord returns records with offset") {
for {
repo <- ZIO.service[CredentialRepository[Task]]
aRecord = issueCredentialRecord
bRecord = issueCredentialRecord
_ <- repo.createIssueCredentialRecord(aRecord)
_ <- repo.createIssueCredentialRecord(bRecord)
records <- repo.getIssueCredentialRecords(offset = Some(1)).map(_._1)
} yield {
assertTrue(records.size == 1) &&
assertTrue(records.contains(bRecord))
}
},
test("getIssuanceCredentialRecord returns records with limit") {
for {
repo <- ZIO.service[CredentialRepository[Task]]
aRecord = issueCredentialRecord
bRecord = issueCredentialRecord
_ <- repo.createIssueCredentialRecord(aRecord)
_ <- repo.createIssueCredentialRecord(bRecord)
records <- repo.getIssueCredentialRecords(limit = Some(1)).map(_._1)
} yield {
assertTrue(records.size == 1) &&
assertTrue(records.contains(aRecord))
}
},
test("getIssuanceCredentialRecord returns records with offset and limit") {
for {
repo <- ZIO.service[CredentialRepository[Task]]
aRecord = issueCredentialRecord
bRecord = issueCredentialRecord
cRecord = issueCredentialRecord
_ <- repo.createIssueCredentialRecord(aRecord)
_ <- repo.createIssueCredentialRecord(bRecord)
_ <- repo.createIssueCredentialRecord(cRecord)
records <- repo.getIssueCredentialRecords(offset = Some(1), limit = Some(1)).map(_._1)
} yield {
assertTrue(records.size == 1) &&
assertTrue(records.contains(bRecord))
}
},
test("deleteIssueCredentialRecord deletes an exsiting record") {
for {
repo <- ZIO.service[CredentialRepository[Task]]
Expand All @@ -116,7 +157,7 @@ object CredentialRepositorySpecSuite {
_ <- repo.createIssueCredentialRecord(aRecord)
_ <- repo.createIssueCredentialRecord(bRecord)
count <- repo.deleteIssueCredentialRecord(aRecord.id)
records <- repo.getIssueCredentialRecords()
records <- repo.getIssueCredentialRecords().map(_._1)
} yield {
assertTrue(count == 1) &&
assertTrue(records.size == 1) &&
Expand All @@ -131,7 +172,7 @@ object CredentialRepositorySpecSuite {
_ <- repo.createIssueCredentialRecord(aRecord)
_ <- repo.createIssueCredentialRecord(bRecord)
count <- repo.deleteIssueCredentialRecord(DidCommID())
records <- repo.getIssueCredentialRecords()
records <- repo.getIssueCredentialRecords().map(_._1)
} yield {
assertTrue(count == 0) &&
assertTrue(records.size == 2) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ object CredentialServiceImplSpec extends ZIOSpecDefault {
svc <- ZIO.service[CredentialService]
aRecord <- svc.createRecord()
bRecord <- svc.createRecord()
records <- svc.getIssueCredentialRecords
records <- svc.getIssueCredentialRecords().map(_._1)
} yield {
assertTrue(records.size == 2) &&
assertTrue(records.contains(aRecord)) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,15 @@ class JdbcCredentialRepository(xa: Transactor[Task], maxRetries: Int) extends Cr
}

override def getIssueCredentialRecords(
ignoreWithZeroRetries: Boolean = true
): Task[Seq[IssueCredentialRecord]] = {
ignoreWithZeroRetries: Boolean = true,
offset: Option[Int],
limit: Option[Int]
): Task[(Seq[IssueCredentialRecord], Int)] = {
val conditionFragment = Fragments.whereAndOpt(
Option.when(ignoreWithZeroRetries)(fr"meta_retries > 0")
)
val cxnIO = sql"""
val baseFragment =
sql"""
| SELECT
| id,
| created_at,
Expand All @@ -153,11 +156,29 @@ class JdbcCredentialRepository(xa: Transactor[Task], maxRetries: Int) extends Cr
| FROM public.issue_credential_records
| $conditionFragment
""".stripMargin
val withOffsetFragment = offset.fold(baseFragment)(offsetValue => baseFragment ++ fr"OFFSET $offsetValue")
val withOffsetAndLimitFragment =
limit.fold(withOffsetFragment)(limitValue => withOffsetFragment ++ fr"LIMIT $limitValue")

val countCxnIO =
sql"""
| SELECT COUNT(*)
| FROM public.issue_credential_records
| $conditionFragment
""".stripMargin
.query[Int]
.unique

val cxnIO = withOffsetAndLimitFragment
.query[IssueCredentialRecord]
.to[Seq]

cxnIO
.transact(xa)
val effect = for {
totalCount <- countCxnIO
records <- cxnIO
} yield (records, totalCount)

effect.transact(xa)
}

override def getIssueCredentialRecordsByStates(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.iohk.atala.issue.controller

import io.iohk.atala.agent.server.ControllerHelper
import io.iohk.atala.api.http.model.CollectionStats
import io.iohk.atala.api.http.model.PaginationInput
import io.iohk.atala.api.http.{ErrorResponse, RequestContext}
import io.iohk.atala.api.util.PaginationUtils
import io.iohk.atala.connect.controller.ConnectionController
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.connect.core.service.ConnectionService
Expand Down Expand Up @@ -48,22 +50,30 @@ class IssueControllerImpl(
mapIssueErrors(result)
}

// TODO - Tech Debt - Do not filter this in memory - need to filter at the database level
// TODO - Tech Debt - Implement pagination
override def getCredentialRecords(paginationInput: PaginationInput, thid: Option[String])(implicit
rc: RequestContext
): IO[ErrorResponse, IssueCredentialRecordPage] = {
val uri = rc.request.uri
val pagination = paginationInput.toPagination
val result = for {
records <- thid match
case None => credentialService.getIssueCredentialRecords
case Some(thid) => credentialService.getIssueCredentialRecordByThreadId(DidCommID(thid)).map(_.toSeq)
pageResult <- thid match
case None =>
credentialService
.getIssueCredentialRecords(offset = Some(pagination.offset), limit = Some(pagination.limit))
case Some(thid) =>
credentialService
.getIssueCredentialRecordByThreadId(DidCommID(thid))
.map(_.toSeq)
.map(records => records -> records.length)
(records, totalCount) = pageResult
stats = CollectionStats(totalCount = totalCount, filteredCount = totalCount)
} yield IssueCredentialRecordPage(
self = "/issue-credentials/records",
self = uri.toString(),
kind = "Collection",
pageOf = "1",
next = None,
previous = None,
contents = (records map IssueCredentialRecord.fromDomain) // TODO - Tech Debt - Optimise this transformation - each time we get a list of things we iterate it once here
pageOf = PaginationUtils.composePageOfUri(uri).toString,
next = PaginationUtils.composeNextUri(uri, records, pagination, stats).map(_.toString),
previous = PaginationUtils.composePreviousUri(uri, records, pagination, stats).map(_.toString),
contents = (records map IssueCredentialRecord.fromDomain)
)
mapIssueErrors(result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,17 @@ class JdbcDIDNonSecretStorage(xa: Transactor[Task]) extends DIDNonSecretStorage
.query[DIDStateRow]
.to[List]

for {
totalCount <- countCxnIO.transact(xa)
dids <- didsCxnIO
.transact(xa)
.map(_.map(row => row.toDomain.map(row.did -> _)))
.flatMap(ls => ZIO.foreach(ls)(ZIO.fromTry[(PrismDID, ManagedDIDState)](_)))
} yield (dids, totalCount)
val effect = for {
totalCount <- countCxnIO
rows <- didsCxnIO
} yield (rows, totalCount)

effect
.transact(xa)
.flatMap { case (rows, totalCount) =>
val results = rows.map(row => row.toDomain.map(row.did -> _))
ZIO.foreach(results)(ZIO.fromTry).map(_ -> totalCount)
}
}

override def insertDIDUpdateLineage(did: PrismDID, updateLineage: DIDUpdateLineage): Task[Unit] = {
Expand Down

0 comments on commit c0db5c8

Please sign in to comment.