Skip to content

Commit

Permalink
streamify (again, thanks @ndr_brt)
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Sep 28, 2022
1 parent 51131ab commit 9868175
Showing 1 changed file with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

/**
Expand All @@ -56,40 +57,41 @@ public ContractOfferServiceImpl(ParticipantAgentService agentService, ContractDe
@NotNull
public Stream<ContractOffer> queryContractOffers(ContractOfferQuery query, Range range) {
var agent = agentService.createFor(query.getClaimToken());
var defs = definitionService.definitionsFor(agent).collect(Collectors.toList());
var skip = range.getFrom();
var numSeenAssets = new AtomicLong(0);
var limit = range.getTo() - range.getFrom();
Stream<ContractOffer> allOffers = Stream.empty();
var numSeenAssets = 0L;
for (int i = 0; i < defs.size() && numSeenAssets < range.getTo(); i++) {
var d = defs.get(i);
var querySpecBuilder = QuerySpec.Builder.newInstance().filter(d.getSelectorExpression().getCriteria());
var skip = new AtomicInteger(range.getFrom());

var querySpec = querySpecBuilder.build();
var numAssets = assetIndex.countAssets(querySpec);
return definitionService.definitionsFor(agent)
.takeWhile(d -> numSeenAssets.get() < range.getTo())
.flatMap(definition -> {
var querySpecBuilder = QuerySpec.Builder.newInstance().filter(definition.getSelectorExpression().getCriteria());

if (skip > 0) {
querySpecBuilder.offset(skip);
}
if (numAssets + numSeenAssets > limit) {
querySpecBuilder.limit(limit);
}
var querySpec = querySpecBuilder.build();
var numAssets = assetIndex.countAssets(querySpec);

if (skip < numAssets) {
var byId = policyStore.findById(d.getContractPolicyId());
if (byId == null) { //policy not found
continue;
}
var assets = assetIndex.queryAssets(querySpecBuilder.build());
allOffers = Stream.concat(allOffers,
assets.map(a -> createContractOffer(d, byId.getPolicy(), a)));
if (skip.get() > 0) {
querySpecBuilder.offset(skip.get());
}
if (numAssets + numSeenAssets.get() > limit) {
querySpecBuilder.limit(limit);
}

}
numSeenAssets += numAssets;
skip -= numAssets;
}
if (skip.get() < numAssets) {
var byId = policyStore.findById(definition.getContractPolicyId());
if (byId == null) { //policy not found
return Stream.empty();
}
var assets = assetIndex.queryAssets(querySpecBuilder.build());
numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return assets.map(a -> createContractOffer(definition, byId.getPolicy(), a));

return allOffers;
} else {
numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return Stream.empty();
}
});
}

@NotNull
Expand Down

0 comments on commit 9868175

Please sign in to comment.