Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change subjectId List to Iterable #257

Merged
merged 10 commits into from
Mar 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -159,7 +161,7 @@ public MeasureReport evaluateMeasure(String url, String periodStart, String peri
}

MeasureEvalType measureEvalType = MeasureEvalType.fromCode(reportType);
List<String> subjectIds = this.getSubjects(measureEvalType,
Iterable<String> subjectIds = this.getSubjects(measureEvalType,
subject != null ? subject : practitioner, dataEndpoint, additionalData);

Iterable<IBaseResource> measures = fhirDal.searchByUrl("Measure", url);
Expand All @@ -181,17 +183,17 @@ public MeasureReport evaluateMeasure(String url, String periodStart, String peri
return measureReport;
}

public List<String> getSubjects(String reportType, String subjectId) {
public Iterable<String> getSubjects(String reportType, String subjectId) {
return this.getSubjects(reportType, subjectId, null, null);
}

public List<String> getSubjects(String reportType, String subjectId, Endpoint dataEndpoint,
public Iterable<String> getSubjects(String reportType, String subjectId, Endpoint dataEndpoint,
Bundle additionalData) {
MeasureEvalType measureEvalType = MeasureEvalType.fromCode(reportType);
return getSubjects(measureEvalType, subjectId, dataEndpoint, additionalData);
}

public List<String> getSubjects(MeasureEvalType measureEvalType, String subjectId,
public Iterable<String> getSubjects(MeasureEvalType measureEvalType, String subjectId,
Endpoint dataEndpoint, Bundle additionalData) {
CompositeFhirDal compositeFhirDal;
BundleFhirDal bundleDal = null;
Expand All @@ -211,53 +213,62 @@ public List<String> getSubjects(MeasureEvalType measureEvalType, String subjectI
}

public MeasureReport evaluateMeasure(Measure measure, String periodStart, String periodEnd,
String reportType, List<String> subjectIds, FhirDal fhirDal, Endpoint contentEndpoint,
String reportType, Iterable<String> subjectIds, FhirDal fhirDal, Endpoint contentEndpoint,
Endpoint terminologyEndpoint, Endpoint dataEndpoint, Bundle additionalData) {

if (this.measureEvaluationOptions.isThreadedEnabled()
&& subjectIds.size() > this.measureEvaluationOptions.getThreadedBatchSize()) {
return threadedMeasureEvaluate(measure, periodStart, periodEnd, reportType, subjectIds,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData);
} else {
return innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, subjectIds, fhirDal,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData);
var subjectIterator = subjectIds.iterator();
int threadBatchSize = Optional.ofNullable(measureEvaluationOptions.getThreadedBatchSize())
.orElse(Integer.MAX_VALUE);

List<CompletableFuture<MeasureReport>> futures = new ArrayList<>();
var ids = new ArrayList<String>();
while (subjectIterator.hasNext()) {
ids.add(subjectIterator.next());
if (ids.size() % threadBatchSize == 0) {
futures.add(runEvaluate(measure, periodStart, periodEnd, reportType, ids, fhirDal,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData));
ids = new ArrayList<>();
}
}


// Make sure to run partial batches.
if (!ids.isEmpty()) {
futures.add(runEvaluate(measure, periodStart, periodEnd, reportType, ids, fhirDal,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData));
}

var reports = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

if (reports.size() > 1) {
Dstu3MeasureReportAggregator reportAggregator = new Dstu3MeasureReportAggregator();
return reportAggregator.aggregate(reports);
} else if (reports.size() == 1) {
return reports.get(0);
} else {
throw new AssertionError(
"No reports were generated by Measure evaluation. This should be impossible.");
}
}

protected MeasureReport threadedMeasureEvaluate(Measure measure, String periodStart,
protected CompletableFuture<MeasureReport> runEvaluate(Measure measure, String periodStart,
String periodEnd, String reportType, List<String> subjectIds, FhirDal fhirDal,
Endpoint contentEndpoint, Endpoint terminologyEndpoint, Endpoint dataEndpoint,
Bundle additionalData) {
List<List<String>> batches =
getBatches(subjectIds, this.measureEvaluationOptions.getThreadedBatchSize());
ExecutorService executor =
Executors.newFixedThreadPool(this.measureEvaluationOptions.getNumThreads());
List<CompletableFuture<MeasureReport>> futures = new ArrayList<>();
for (List<String> idBatch : batches) {
futures.add(CompletableFuture.supplyAsync(
() -> this.innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, idBatch,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData),
executor));
}

List<MeasureReport> reports = new ArrayList<>();
futures.forEach(x -> reports.add(x.join()));
Dstu3MeasureReportAggregator reportAggregator = new Dstu3MeasureReportAggregator();
return reportAggregator.aggregate(reports);
}

public static <T> List<List<T>> getBatches(List<T> collection, int batchSize) {
int i = 0;
List<List<T>> batches = new ArrayList<>();
while (i < collection.size()) {
int nextInc = Math.min(collection.size() - i, batchSize);
List<T> batch = collection.subList(i, i + nextInc);
batches.add(batch);
i = i + nextInc;
if (measureEvaluationOptions.isThreadedEnabled()) {
ExecutorService executor =
Executors.newFixedThreadPool(this.measureEvaluationOptions.getNumThreads());
return CompletableFuture.supplyAsync(
() -> this.innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, subjectIds,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData),
executor);
} else {
return CompletableFuture.completedFuture(
this.innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, subjectIds,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData));
}

return batches;
}

protected MeasureReport innerEvaluateMeasure(Measure measure, String periodStart,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package org.opencds.cqf.cql.evaluator.measure.r4;


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -167,16 +170,17 @@ public MeasureReport evaluateMeasure(String url, String periodStart, String peri
Endpoint contentEndpoint, Endpoint terminologyEndpoint, Endpoint dataEndpoint,
Bundle additionalData) {

List<String> subjectIds = this.getSubjects(reportType, subject != null ? subject : practitioner,
dataEndpoint, additionalData);
Iterable<String> subjectIds = this.getSubjects(reportType,
subject != null ? subject : practitioner, dataEndpoint, additionalData);

return evaluateMeasure(url, periodStart, periodEnd, reportType, subjectIds, lastReceivedOn,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData);
}

public MeasureReport evaluateMeasure(String url, String periodStart, String periodEnd,
String reportType, List<String> subjectIds, String lastReceivedOn, Endpoint contentEndpoint,
Endpoint terminologyEndpoint, Endpoint dataEndpoint, Bundle additionalData) {
String reportType, Iterable<String> subjectIds, String lastReceivedOn,
Endpoint contentEndpoint, Endpoint terminologyEndpoint, Endpoint dataEndpoint,
Bundle additionalData) {

// TODO: Need a federated FhirDal..
FhirDal fhirDal = contentEndpoint != null
Expand Down Expand Up @@ -220,36 +224,23 @@ private Measure getMeasure(FhirDal fhirDal, String url) {
return (Measure) measureIter.next();
}

public static <T> List<List<T>> getBatches(List<T> collection, int batchSize) {
int i = 0;
List<List<T>> batches = new ArrayList<>();
while (i < collection.size()) {
int nextInc = Math.min(collection.size() - i, batchSize);
List<T> batch = collection.subList(i, i + nextInc);
batches.add(batch);
i = i + nextInc;
}

return batches;
}

public List<String> getSubjects(String reportType, String subjectId, Endpoint dataEndpoint) {
public Iterable<String> getSubjects(String reportType, String subjectId, Endpoint dataEndpoint) {
MeasureEvalType measureEvalType = MeasureEvalType.fromCode(reportType);
return getSubjects(measureEvalType, subjectId, dataEndpoint, null);
}

public List<String> getSubjects(String reportType, String subjectId, Bundle additionalData) {
public Iterable<String> getSubjects(String reportType, String subjectId, Bundle additionalData) {
MeasureEvalType measureEvalType = MeasureEvalType.fromCode(reportType);
return getSubjects(measureEvalType, subjectId, null, additionalData);
}

public List<String> getSubjects(String reportType, String subjectId, Endpoint dataEndpoint,
public Iterable<String> getSubjects(String reportType, String subjectId, Endpoint dataEndpoint,
Bundle additionalData) {
MeasureEvalType measureEvalType = MeasureEvalType.fromCode(reportType);
return getSubjects(measureEvalType, subjectId, dataEndpoint, additionalData);
}

public List<String> getSubjects(MeasureEvalType measureEvalType, String subjectId,
public Iterable<String> getSubjects(MeasureEvalType measureEvalType, String subjectId,
Endpoint dataEndpoint, Bundle additionalData) {
CompositeFhirDal compositeFhirDal;
BundleFhirDal bundleDal = null;
Expand All @@ -270,7 +261,7 @@ public List<String> getSubjects(MeasureEvalType measureEvalType, String subjectI
}

public MeasureReport evaluateMeasure(Measure measure, String periodStart, String periodEnd,
String reportType, List<String> subjectIds, FhirDal fhirDal, Endpoint contentEndpoint,
String reportType, Iterable<String> subjectIds, FhirDal fhirDal, Endpoint contentEndpoint,
Endpoint terminologyEndpoint, Endpoint dataEndpoint, Bundle additionalData) {
if (Boolean.TRUE.equals(this.measureEvaluationOptions.isValidationEnabled())) {
if (this.validator == null) {
Expand All @@ -281,39 +272,57 @@ public MeasureReport evaluateMeasure(Measure measure, String periodStart, String
this.validator.validate(measure, true);
}
}
var subjectIterator = subjectIds.iterator();

if (this.measureEvaluationOptions.isThreadedEnabled()
&& subjectIds.size() > this.measureEvaluationOptions.getThreadedBatchSize()) {
return threadedMeasureEvaluate(measure, periodStart, periodEnd, reportType, subjectIds,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData);
} else {
return innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, subjectIds, fhirDal,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData);
int threadBatchSize = Optional.ofNullable(measureEvaluationOptions.getThreadedBatchSize())
.orElse(Integer.MAX_VALUE);

List<CompletableFuture<MeasureReport>> futures = new ArrayList<>();
var ids = new ArrayList<String>();
while (subjectIterator.hasNext()) {
ids.add(subjectIterator.next());
if (ids.size() % threadBatchSize == 0) {
futures.add(runEvaluate(measure, periodStart, periodEnd, reportType, ids, fhirDal,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData));
ids = new ArrayList<>();
}
}

// Make sure to run partial batches.
if (!ids.isEmpty()) {
futures.add(runEvaluate(measure, periodStart, periodEnd, reportType, ids, fhirDal,
contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData));
}

var reports = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
if (reports.size() > 1) {
R4MeasureReportAggregator reportAggregator = new R4MeasureReportAggregator();
return reportAggregator.aggregate(reports);
} else if (reports.size() == 1) {
return reports.get(0);
} else {
throw new AssertionError(
"No reports were generated by Measure evaluation. This should be impossible.");
}
}

protected MeasureReport threadedMeasureEvaluate(Measure measure, String periodStart,
protected CompletableFuture<MeasureReport> runEvaluate(Measure measure, String periodStart,
String periodEnd, String reportType, List<String> subjectIds, FhirDal fhirDal,
Endpoint contentEndpoint, Endpoint terminologyEndpoint, Endpoint dataEndpoint,
Bundle additionalData) {
List<List<String>> batches =
getBatches(subjectIds, this.measureEvaluationOptions.getThreadedBatchSize());
ExecutorService executor =
Executors.newFixedThreadPool(this.measureEvaluationOptions.getNumThreads());
List<CompletableFuture<MeasureReport>> futures = new ArrayList<>();
for (List<String> idBatch : batches) {
futures.add(CompletableFuture.supplyAsync(
() -> this.innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, idBatch,

if (measureEvaluationOptions.isThreadedEnabled()) {
ExecutorService executor =
Executors.newFixedThreadPool(this.measureEvaluationOptions.getNumThreads());
return CompletableFuture.supplyAsync(
() -> this.innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, subjectIds,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData),
executor));
executor);
} else {
return CompletableFuture.completedFuture(
this.innerEvaluateMeasure(measure, periodStart, periodEnd, reportType, subjectIds,
fhirDal, contentEndpoint, terminologyEndpoint, dataEndpoint, additionalData));
}

List<MeasureReport> reports = new ArrayList<>();
futures.forEach(x -> reports.add(x.join()));
R4MeasureReportAggregator reportAggregator = new R4MeasureReportAggregator();
return reportAggregator.aggregate(reports);
}

protected MeasureReport innerEvaluateMeasure(Measure measure, String periodStart,
Expand Down
Loading