Skip to content

Commit

Permalink
fix: Use logical references (#41)
Browse files Browse the repository at this point in the history
* update consumer id and topic names

* fix: use logical references

* docs: update README

* remove unused narrative feature

* ci: fix linting
  • Loading branch information
jabberwoc committed Jul 30, 2024
1 parent fb26e7d commit 40d1380
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 277 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ update consumer group is deleted.
On startup, the application checks the configured mapping version and
determines a diff between the mappings of the current and the last used
mapping version. This data is stored in the Kafka topic `mapping` with the key
`lab-update`.
`aim-lab-update`.

In case there are no changes or the mapping versions used are equal, the
update processor is not started.
Expand Down Expand Up @@ -104,8 +104,8 @@ tags can be found at the [Container Registry](https://github.com/orgs/diz-unimr/

## Development

A [test setup](dev/compose.yaml) and [test data provider](dev/compose-data.yaml)
is available for development purposes.
A [test setup](dev/compose.yaml) with test data is available for development
purposes.

### Builds

Expand Down
13 changes: 0 additions & 13 deletions dev/compose-data.yaml

This file was deleted.

18 changes: 16 additions & 2 deletions dev/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.7"

name: lab-to-fhir

services:
Expand Down Expand Up @@ -30,6 +28,11 @@ services:
KAFKA_COMPRESSION_TYPE: gzip
depends_on:
- zoo
healthcheck:
test: kafka-topics --bootstrap-server kafka:9092 --list
interval: 30s
timeout: 10s
retries: 3

akhq:
image: tchiotludo/akhq:0.19.0
Expand All @@ -44,3 +47,14 @@ services:
- "9000:8080"
depends_on:
- kafka

lab-data-loader:
image: confluentinc/cp-kafkacat:7.0.1
entrypoint: ["/bin/bash", "-c"]
command: >
"kafkacat -b kafka:19092 -K: -t laboratory -P -l /data/lab-data.ndjson"
volumes:
- ./lab-data.ndjson:/data/lab-data.ndjson:ro
depends_on:
kafka:
condition: service_healthy
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,11 @@
public class FhirProperties {

private final Systems systems = new Systems();
@NotNull
private Boolean generateNarrative;

public Systems getSystems() {
return systems;
}

public Boolean getGenerateNarrative() {
return generateNarrative;
}

public void setGenerateNarrative(Boolean generateNarrative) {
this.generateNarrative = generateNarrative;
}

public static class Systems {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
@Configuration
public class MappingUpdateConfiguration {

private static final Logger LOG = LoggerFactory.getLogger(
MappingUpdateConfiguration.class);
private static final Logger LOG =
LoggerFactory.getLogger(MappingUpdateConfiguration.class);
private static final long MAX_CONSUMER_POLL_DURATION_SECONDS = 10L;

@Bean("mappingInfo")
Expand All @@ -33,10 +33,7 @@ public MappingInfo buildMappingUpdate(LoincMapper loincMapper,
Producer<String, MappingUpdate> producer)
throws ExecutionException, InterruptedException, IOException {
// check versions
var configuredVersion = loincMapper
.getMap()
.getMetadata()
.getVersion();
var configuredVersion = loincMapper.getMap().getMetadata().getVersion();

// 1. consume latest from (mapping) update topic
var lastUpdate = getLastMappingUpdate(consumer);
Expand All @@ -54,13 +51,12 @@ public MappingInfo buildMappingUpdate(LoincMapper loincMapper,

// 2. check if already on latest
if (Objects.equals(configuredVersion, lastUpdate.getVersion())) {
LOG.info("Configured mapping version ({}) matches last version "
+ "({}). " + "No update necesssary", configuredVersion,
LOG.info(
"Configured mapping version ({}) matches last version ({}). "
+ "No update necesssary", configuredVersion,
lastUpdate.getVersion());

if (!labOffsets
.updateOffsets()
.isEmpty()) {
if (!labOffsets.updateOffsets().isEmpty()) {
// update in progress, continue
return new MappingInfo(lastUpdate, true);
}
Expand All @@ -72,36 +68,29 @@ public MappingInfo buildMappingUpdate(LoincMapper loincMapper,
// get last version's mapping
var lastMap = LoincMapper.getSwlLoincMapping(
ResourceHelper.getMappingFile(lastUpdate.getVersion(),
mappingProperties
.getLoinc()
.getCredentials()
.getUser(), mappingProperties
.getLoinc()
.getCredentials()
.getPassword(), mappingProperties
.getLoinc()
.getProxy(), mappingProperties
.getLoinc()
.getLocal()));
mappingProperties.getLoinc().getCredentials().getUser(),
mappingProperties.getLoinc().getCredentials().getPassword(),
mappingProperties.getLoinc().getProxy(),
mappingProperties.getLoinc().getLocal()));
// ceate diff
var updates = loincMapper
.getMap()
.diff(lastMap);
var update = new MappingUpdate(configuredVersion,
lastUpdate.getVersion(), updates);
var updates = loincMapper.getMap().diff(lastMap);
var update =
new MappingUpdate(configuredVersion, lastUpdate.getVersion(),
updates);

// save new mapping update
saveMappingUpdate(producer, update);

return new MappingInfo(update, false);
}

@SuppressWarnings("checkstyle:LineLength")
private void saveMappingUpdate(Producer<String, MappingUpdate> producer,
MappingUpdate mappingUpdate)
throws ExecutionException, InterruptedException {

producer
.send(new ProducerRecord<>("mapping", "lab-update", mappingUpdate))
producer.send(
new ProducerRecord<>("mapping", "aim-lab-update", mappingUpdate))
.get();
}

Expand All @@ -124,8 +113,7 @@ private MappingUpdate getLastMappingUpdate(

var record = consumer
.poll(Duration.ofSeconds(MAX_CONSUMER_POLL_DURATION_SECONDS))
.iterator()
.next();
.iterator().next();

consumer.unsubscribe();
return record.value();
Expand Down
Loading

0 comments on commit 40d1380

Please sign in to comment.