Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mapping update processor #32

Merged
merged 25 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
71188e5
fix(deps): update hapifhirversion to v6.10.5
renovate[bot] Feb 24, 2024
aed30b2
fix(deps): update dependency io.micrometer:micrometer-registry-promet…
renovate[bot] Feb 25, 2024
47eabfb
feat!: add mapping update processor
jabberwoc Mar 7, 2024
367770b
chore(deps): update github-actions
renovate[bot] Mar 10, 2024
d2d2c2b
test: fix lab processor tests
jabberwoc Mar 11, 2024
c55ea42
ci: reenable devskim
jabberwoc Mar 11, 2024
d164de5
fix(deps): update spring boot to v3.2.3
renovate[bot] Mar 11, 2024
2649207
Merge branch 'renovate/github-actions' into add-mapping-processor
jabberwoc Mar 11, 2024
ebc64a6
Merge branch 'renovate/spring-boot' of github.com:diz-unimr/lab-to-fh…
jabberwoc Mar 11, 2024
9d27027
Merge branch 'renovate/hapifhirversion' into add-mapping-processor
jabberwoc Mar 11, 2024
9ea4241
Merge branch 'renovate/io.micrometer-micrometer-registry-prometheus-1…
jabberwoc Mar 11, 2024
9dc323f
ci: use codecov token
jabberwoc Mar 11, 2024
c302dd1
test: configure test reports
jabberwoc Mar 11, 2024
cc49efb
test: add unit tests
jabberwoc Mar 12, 2024
6049e6b
test: add lab runner tests
jabberwoc Mar 13, 2024
c93359f
more tests
jabberwoc Mar 14, 2024
6e4cfce
test: add integration test
jabberwoc Mar 15, 2024
13f97eb
test: add equality tests
jabberwoc Mar 15, 2024
97b8d79
ci(lint): fix line length
jabberwoc Mar 18, 2024
1f1351d
ci(lint): fix line length
jabberwoc Mar 18, 2024
a2506d3
ci(lint): refactor processor test code to base class
jabberwoc Mar 18, 2024
c713d98
test: fix lab processor test
jabberwoc Mar 18, 2024
1ae1bdb
fix: update processor offset handling
jabberwoc May 16, 2024
fc61cd7
docs: add Mapping updates section in the README
jabberwoc May 17, 2024
4f53281
ci: fix markdown linting
jabberwoc May 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions .editorconfig

This file was deleted.

6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ jobs:
java-version: "17"
distribution: "temurin"
- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v2.1.0
uses: gradle/wrapper-validation-action@v2.1.1
- name: Setup Gradle
uses: gradle/gradle-build-action@v3.0.0
uses: gradle/gradle-build-action@v3.1.0
- name: Execute Gradle build
run: ./gradlew build
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
3 changes: 1 addition & 2 deletions .github/workflows/mega-linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ jobs:
id: ml
# You can override MegaLinter flavor used to have faster performances
# More info at https://megalinter.io/flavors/
uses: oxsecurity/megalinter/flavors/cupcake@v7.8.0
uses: oxsecurity/megalinter/flavors/cupcake@v7.10.0
env:
# All available variables are described in documentation
# https://megalinter.io/configuration/
VALIDATE_ALL_CODEBASE: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} # Validates all source when push on main, else just the git diff with main. Override with true if you always want to lint all sources
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# ADD YOUR CUSTOM ENV VARIABLES HERE OR DEFINE THEM IN A FILE .mega-linter.yml AT THE ROOT OF YOUR REPOSITORY
DISABLE: SPELL # ignore spell checks
DISABLE_ERRORS_LINTERS: REPOSITORY_DEVSKIM # missing git safe.directory
DISABLE_LINTERS: JAVA_PMD

# Upload MegaLinter artifacts
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
uses: actions/checkout@v4

- name: Log in to the Container registry
uses: docker/login-action@3d58c274f17dffee475a5520cbe67f0a882c4dbb
uses: docker/login-action@5139682d94efc37792e6b54386b5b470a68a4737
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
Expand Down
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ The following environment variables can be set:
| SSL_TRUST_STORE_PASSWORD | | Truststore password (if using `SECURITY_PROTOCOL=SSL`) |
| INPUT_TOPIC | aim-lab | Topic to read from |
| OUTPUT_TOPIC | lab-fhir | Topic to store result bundles |
| MAPPING_LOINC_VERSION | 2.0.1 | LOINC mapping package version: [Package Registry · mapping / loinc-mapping](https://gitlab.diz.uni-marburg.de/mapping/loinc-mapping/-/packages/)) |
| MAPPING_LOINC_VERSION | 3.0.1 | LOINC mapping package version: [Package Registry · mapping / loinc-mapping](https://gitlab.diz.uni-marburg.de/mapping/loinc-mapping/-/packages/)) |
| MAPPING_LOINC_CREDENTIALS_USER | | LOINC mapping package registry user |
| MAPPING_LOINC_CREDENTIALS_PASSWORD | | LOINC mapping package registry password |
| MAPPING_LOINC_PROXY | | Proxy server to use when pulling the package |
Expand All @@ -53,6 +53,24 @@ The following environment variables can be set:

Additional application properties can be set by overriding values form the [application.yml](src/main/resources/application.yml) by using environment variables.

## Mapping updates

In addition to the regular Kafka processor this application uses a separate
update processor to apply mapping updates to all records up until the
current offset state of the regular processor.

The update processor is a separate Kafka consumer and keeps its own offset
state in order to be able to resume unfinished updates. On completion, the
update consumer group is deleted.

On startup, the application checks the configured mapping version and
determines a diff between the mappings of the current and the last used
mapping version. This data is stored in the Kafka topic `mapping` with the key
`lab-update`.

In case there are no changes or the mapping versions used are equal, the
update processor is not started.

## Tests

This project includes unit and integration tests.
Expand Down
24 changes: 15 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id 'java'
id 'jacoco'
id 'org.springframework.boot' version '3.2.2'
id 'org.springframework.boot' version '3.2.3'
id 'io.spring.dependency-management' version '1.1.4'
id 'checkstyle'
}
Expand All @@ -18,23 +18,28 @@ repositories {
}

ext {
set('springBootVersion', "3.2.2")
set('springBootVersion', "3.2.3")
set('springCloudDepsVersion', "2023.0.0")
set('springCloudStreamVersion', "4.1.0")
set('springKafkaVersion', "3.1.1")
set("hapiFhirVersion", "6.8.5")
set("hapiFhirVersion", "6.10.5")
}

dependencies {
// spring cloud stream kafka
implementation "org.springframework.cloud:spring-cloud-stream:$springCloudStreamVersion"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka:$springCloudStreamVersion"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:$springCloudStreamVersion"
implementation "org.springframework.cloud:spring-cloud-starter-stream-kafka:$springCloudStreamVersion"
implementation "org.springframework.kafka:spring-kafka:$springKafkaVersion"

// retry
implementation 'org.springframework.retry:spring-retry:2.0.3'
implementation 'org.springframework:spring-aspects:6.0.11'

// metrics
implementation "org.springframework.boot:spring-boot-starter-web:$springBootVersion"
implementation "org.springframework.boot:spring-boot-starter-actuator:$springBootVersion"
implementation 'io.micrometer:micrometer-registry-prometheus:1.12.1'
implementation 'io.micrometer:micrometer-registry-prometheus:1.12.3'

// hapi fhir
implementation "ca.uhn.hapi.fhir:hapi-fhir-base:$hapiFhirVersion"
Expand All @@ -47,8 +52,8 @@ dependencies {
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.16.1'

// unit tests
testImplementation "org.springframework.cloud:spring-cloud-stream:$springCloudStreamVersion"
testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.1'
testImplementation 'nl.jqno.equalsverifier:equalsverifier:3.15.8'

testImplementation("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
Expand All @@ -66,16 +71,17 @@ dependencyManagement {

test {
useJUnitPlatform()
finalizedBy jacocoTestReport
}

jacocoTestReport {
reports {
xml.required.set(false)
xml.required = true
html.required = false
}
dependsOn test
}

check.dependsOn jacocoTestReport


jar {
enabled = false
Expand Down
4 changes: 3 additions & 1 deletion dev/compose-data.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
version: "3.7"

name: lab-to-fhir

services:
lab-data-loader:
image: confluentinc/cp-kafkacat:7.0.1
entrypoint: ["/bin/bash", "-c"]
command: >
"kafkacat -b localhost:9092 -K: -t laboratory -P -l /data/lab-data.ndjson"
volumes:
- ./lab-data-single.ndjson:/data/lab-data.ndjson:ro
- ./lab-data.ndjson:/data/lab-data.ndjson:ro
network_mode: host
2 changes: 2 additions & 0 deletions dev/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version: "3.7"

name: lab-to-fhir

services:
zoo:
image: zookeeper:3.6.1
Expand Down
1 change: 0 additions & 1 deletion dev/lab-data-single.ndjson

This file was deleted.

4 changes: 3 additions & 1 deletion dev/lab-data.ndjson

Large diffs are not rendered by default.

140 changes: 140 additions & 0 deletions src/main/java/de/unimarburg/diz/labtofhir/LabRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package de.unimarburg.diz.labtofhir;

import de.unimarburg.diz.labtofhir.configuration.AdminClientProvider;
import de.unimarburg.diz.labtofhir.model.MappingInfo;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.Admin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.stream.binding.BindingsLifecycleController.State;
import org.springframework.cloud.stream.endpoint.BindingsEndpoint;
import org.springframework.context.event.EventListener;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.policy.AlwaysRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Retryable
public class LabRunner implements ApplicationRunner {

static final long RETRY_BACKOFF_PERIOD = 2_000L;
private static final Logger LOG = LoggerFactory.getLogger(LabRunner.class);
private final BindingsEndpoint endpoint;
private final AdminClientProvider kafkaAdmin;
private final MappingInfo mappingInfo;
private final String updateGroup;
private final RetryTemplate retryTemplate;

@SuppressWarnings("checkstyle:LineLength")
public LabRunner(BindingsEndpoint endpoint, AdminClientProvider kafkaAdmin,
@Nullable MappingInfo mappingInfo, @Value(
"${spring.cloud.stream.kafka.streams.binder.functions.update"
+ ".applicationId}") String updateGroup) {
this.endpoint = endpoint;
this.kafkaAdmin = kafkaAdmin;
this.mappingInfo = mappingInfo;
this.updateGroup = updateGroup;
this.retryTemplate = setupRetryTemplate();
}

@Override
public void run(ApplicationArguments args) throws Exception {

if (mappingInfo != null) {

// reset only on new update version
if (!mappingInfo.resume()) {
// reset update consumer

try (var client = createAdminClient()) {
var offsets = client.listConsumerGroupOffsets(updateGroup)
.partitionsToOffsetAndMetadata().get();

if (!offsets.isEmpty()) {
LOG.info("Starting mapping update from {} to {}",
mappingInfo.update().getOldVersion(),
mappingInfo.update().getVersion());

// start update at the beginning
// delete consumer group
deleteUpdateConsumerGroup();
}
}
}

// start update processor
endpoint.changeState("update-in-0", State.STARTED);
}

// start regular lab processor
endpoint.changeState("process-in-0", State.STARTED);
}

private Admin createAdminClient() {
return kafkaAdmin.createClient();
}

RetryTemplate setupRetryTemplate() {

return RetryTemplate.builder().customPolicy(new AlwaysRetryPolicy())
.fixedBackoff(RETRY_BACKOFF_PERIOD)
.withListener(new RetryListener() {
@Override
public <T, E extends Throwable> void onError(
RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
LOG.debug(
"Delete Consumer group failed: {}. " + "Retrying {}",
Optional.ofNullable(throwable.getCause())
.orElse(throwable).getMessage(),
context.getRetryCount());
}
}).retryOn(ExecutionException.class).build();
}

public void stopAndDeleteUpdateConsumer() throws Exception {

// start update processor
endpoint.changeState("update-in-0", State.STOPPED);

LOG.info("Update consumer stopped");

// delete consumer group
LOG.info("Deleting update consumer group...");
deleteUpdateConsumerGroup();
}


private void deleteUpdateConsumerGroup() throws Exception {
try (var client = createAdminClient()) {
retryTemplate.execute(ctx -> client
.deleteConsumerGroups(Collections.singleton(updateGroup)).all()
.get());
}
}


@EventListener
@Async
public void onApplicationEvent(UpdateCompleted event) {
try {
LOG.info("Update process complete");
stopAndDeleteUpdateConsumer();
} catch (Exception e) {
LOG.error("stopAndDeleteUpdateConsumer", e);
throw new RuntimeException("Failed to delete update consumer group",
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.TimeZone;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class LabToFhirApplication {

public static void main(String[] args) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/de/unimarburg/diz/labtofhir/UpdateCompleted.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.unimarburg.diz.labtofhir;

import org.springframework.context.ApplicationEvent;

public class UpdateCompleted extends ApplicationEvent {

public UpdateCompleted(Object source) {
super(source);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package de.unimarburg.diz.labtofhir.configuration;

import org.apache.kafka.clients.admin.Admin;

@FunctionalInterface
public interface AdminClientProvider {

Admin createClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;


@ConfigurationProperties(prefix = "fhir")
@Validated
public class FhirProperties {
Expand Down
Loading