Skip to content

Commit

Permalink
Replay previous tagging request (fix #62)
Browse files Browse the repository at this point in the history
  • Loading branch information
alainbodiguel committed Nov 25, 2019
1 parent 88eb64c commit 8099f37
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -12,3 +12,4 @@ dependency-reduced-pom.xml
*.versionsBackup
*.bak
tmp/
release_test_logs.txt
Expand Up @@ -22,6 +22,8 @@
public class Documentation {

public static final String TAG_OPERATION = "Search and tag the elements found in the collection, given the filters";
public static final String TAG_REPLAY = "Scan the tagref topic and replay tagging operations from the given ID";
public static final String TAG_REPLAY_PARAM_OFFSET = "The offset from which the replay must be done.";
public static final String UNTAG_OPERATION = "Search and untag the elements found in the collection, given the filters";
public static final String TAGSTATUS_OPERATION = "Get the status of the (un)tagging operation, given the id of a previously requested operation";
public static final String TAGLIST_OPERATION = "Get the list of previously submitted tag requests";
Expand Down
Expand Up @@ -31,6 +31,8 @@ public class TaggingStatus {

private SelfExpiringMap<String, UpdateResponse> statusMap;

private volatile boolean doReset;

private TaggingStatus() {
statusMap = new SelfExpiringHashMap<>();
}
Expand All @@ -48,6 +50,10 @@ public void initStatus(String id, UpdateResponse status, long timeout) {
}

public synchronized UpdateResponse updateStatus(TagRefRequest tagRequest, UpdateResponse updResp, boolean incrNbRequest, long statusTimeout) {
if (doReset) {
statusMap.clear();
doReset = false;
}
UpdateResponse updateResponse = getStatus(tagRequest.id).orElse(new UpdateResponse());
updateResponse.id = tagRequest.id;
updateResponse.label = tagRequest.label;
Expand All @@ -57,4 +63,8 @@ public synchronized UpdateResponse updateStatus(TagRefRequest tagRequest, Update
statusMap.put(tagRequest.id, updateResponse, statusTimeout);
return updateResponse;
}

public void reset() {
doReset = true;
}
}
Expand Up @@ -32,6 +32,7 @@ public class TagRefRequest extends TagRequest {
public String partitionFilter;
public long propagated = -1l; // initial value indicates the propagation has not been evaluated yet
public long creationTime;
public long offset;

public static TagRefRequest fromTagRequest(TagRequest t, String collection, String partitionFilter, Action action) {
TagRefRequest tagRefRequest = new TagRefRequest();
Expand Down
Expand Up @@ -22,27 +22,31 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.arlas.tagger.app.ArlasTaggerConfiguration;
import io.arlas.tagger.kafka.TagKafkaConsumer;
import io.arlas.tagger.model.TaggingStatus;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class KafkaConsumerRunner implements Runnable {
Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerRunner.class);

private final ArlasTaggerConfiguration configuration;
private final String topic;
protected final String topic;
private final String consumerGroupId;
private final Integer batchSize;
private final int nbThread;
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer consumer;
protected static ObjectMapper MAPPER = new ObjectMapper();
volatile protected long replayFromOffset = -1l;

public KafkaConsumerRunner(int nbThread, ArlasTaggerConfiguration configuration, String topic, String consumerGroupId, Integer batchSize) {
this.configuration = configuration;
Expand All @@ -54,17 +58,31 @@ public KafkaConsumerRunner(int nbThread, ArlasTaggerConfiguration configuration,

public abstract void processRecords(ConsumerRecords<String, String> records);

public abstract void setReplayFromOffset(long replayFromOffset);

@Override
public void run() {
try {
LOGGER.info("[{}-{}] Starting consumer", topic, nbThread);
consumer = TagKafkaConsumer.build(configuration, topic, consumerGroupId, batchSize);
long start = System.currentTimeMillis();
long duration = System.currentTimeMillis();
long duration;
int nbFailure = 0;

while (true) {
try {
if (replayFromOffset != -1l) {
// replay is only possible when working with 1 partition
Long maxOffset= (Long) consumer.endOffsets(consumer.assignment()).values().toArray()[0];
if (replayFromOffset <= maxOffset) {
consumer.seek((TopicPartition) (consumer.assignment().toArray()[0]), replayFromOffset);
// resetting all past status information else we can't get the new status
TaggingStatus.getInstance().reset();
} else {
LOGGER.warn("Ignoring attempt of replay from offset " + replayFromOffset + " because it is larger than max offset " + maxOffset);
}
replayFromOffset = -1l;
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(configuration.kafkaConfiguration.consumerPollTimeout));
if (records.count() > 0) {
LOGGER.debug("[{}-{}] Nb records polled={}", topic, nbThread, records.count());
Expand Down
Expand Up @@ -31,9 +31,11 @@ public class ManagedKafkaConsumers implements Managed {
private TagRefService tagRefService;
private List<TagExecService> tagExecServices;
private UpdateServices updateServices;

private TagKafkaProducer tagKafkaProducer;

public ManagedKafkaConsumers(ArlasTaggerConfiguration configuration, TagKafkaProducer tagKafkaProducer, UpdateServices updateServices) {
this.configuration = configuration;
this.tagKafkaProducer = tagKafkaProducer;
this.tagRefService = new TagRefService(configuration,
configuration.kafkaConfiguration.tagRefLogTopic,
configuration.kafkaConfiguration.tagRefLogConsumerGroupId,
Expand All @@ -43,6 +45,10 @@ public ManagedKafkaConsumers(ArlasTaggerConfiguration configuration, TagKafkaPro
this.updateServices = updateServices;
}

public TagKafkaProducer getTagKafkaProducer() {
return tagKafkaProducer;
}

@Override
public void start() throws Exception {
new Thread(tagRefService).start();
Expand All @@ -63,4 +69,8 @@ public void stop() throws Exception {
try { this.tagExecServices.get(i).stop(); } catch (Exception e) {};
}
}

public void replayFrom(long offset) {
tagRefService.setReplayFromOffset(offset);
}
}
Expand Up @@ -51,6 +51,11 @@ public TagExecService(int nbThread, ArlasTaggerConfiguration configuration, Stri
this.statusTimeout = configuration.statusTimeout;
}

@Override
public void setReplayFromOffset(long replayFromOffset) {
throw new UnsupportedOperationException("Replay is not available on topic '" + topic + "' ");
}

@Override
public void processRecords(ConsumerRecords<String, String> records) {
long start = System.currentTimeMillis();
Expand Down
Expand Up @@ -59,7 +59,9 @@ public List<TagRefRequest> getTagRefList() {
for (ConsumerRecord<String, String> record : records) {

try {
results.add(MAPPER.readValue(record.value(), TagRefRequest.class));
TagRefRequest tr = MAPPER.readValue(record.value(), TagRefRequest.class);
tr.offset = record.offset();
results.add(tr);
} catch (IOException e) {
LOGGER.warn("Could not parse record (ignored) " + record.value());
}
Expand Down
Expand Up @@ -47,7 +47,9 @@
import java.util.Collections;
import java.util.Optional;


/**
* /!\ With the current implementation, the topic read by TagRefService *MUST* have only 1 partition.
*/
public class TagRefService extends KafkaConsumerRunner {
private Logger LOGGER = LoggerFactory.getLogger(TagRefService.class);
private final TagKafkaProducer tagKafkaProducer;
Expand All @@ -62,6 +64,11 @@ public TagRefService(ArlasTaggerConfiguration configuration, String topic, Strin
this.statusTimeout = configuration.statusTimeout;
}

@Override
public void setReplayFromOffset(long replayFromOffset) {
this.replayFromOffset = replayFromOffset;
}

@Override
public void processRecords(ConsumerRecords<String, String> records) {
long start = System.currentTimeMillis();
Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.arlas.tagger.model.request.TagRefRequest;
import io.arlas.tagger.model.request.TagRequest;
import io.arlas.tagger.model.response.UpdateResponse;
import io.arlas.tagger.service.ManagedKafkaConsumers;
import io.swagger.annotations.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,11 +51,11 @@ public class TagRESTService {
protected static Logger LOGGER = LoggerFactory.getLogger(TagRESTService.class);
public static final String UTF8JSON = MediaType.APPLICATION_JSON + ";charset=utf-8";

private TagKafkaProducer tagKafkaProducer;
private ManagedKafkaConsumers consumersManager;
private Long statusTimeout;

public TagRESTService(TagKafkaProducer tagKafkaProducer, Long statusTimeout) {
this.tagKafkaProducer = tagKafkaProducer;
public TagRESTService(ManagedKafkaConsumers consumersManager, Long statusTimeout) {
this.consumersManager = consumersManager;
this.statusTimeout = statusTimeout;
}

Expand Down Expand Up @@ -106,6 +107,45 @@ public Response tagPost(
}
}

@Timed
@Path("/{collection}/_tagreplay")
@POST
@Produces(UTF8JSON)
@Consumes(UTF8JSON)
@ApiOperation(value = "TagReplay", produces = UTF8JSON, notes = Documentation.TAG_REPLAY, consumes = UTF8JSON, response = Long.class)
@ApiResponses(value = { @ApiResponse(code = 200, message = "Successful operation", response = Long.class),
@ApiResponse(code = 500, message = "Arlas Server Error.", response = Error.class), @ApiResponse(code = 400, message = "Bad request.", response = Error.class) })
public Response tagReplay(
// --------------------------------------------------------
// ----------------------- PATH -----------------------
// --------------------------------------------------------
@ApiParam(
name = "collection",
value = "collection",
allowMultiple = false,
required = true)
@PathParam(value = "collection") String collection,
// --------------------------------------------------------
// ----------------------- SEARCH -----------------------
// --------------------------------------------------------
@ApiParam(name = "offset", value = Documentation.TAG_REPLAY_PARAM_OFFSET,
allowMultiple = false,
required = true)
@QueryParam(value = "offset") Long offset,

// --------------------------------------------------------
// ----------------------- FORM -----------------------
// --------------------------------------------------------
@ApiParam(name ="pretty", value=Documentation.FORM_PRETTY,
allowMultiple = false,
defaultValue = "false",
required=false)
@QueryParam(value="pretty") Boolean pretty
) {

consumersManager.replayFrom(offset);
return Response.ok(offset).build();
}

@Timed
@Path("/{collection}/_untag")
Expand Down Expand Up @@ -157,7 +197,7 @@ public Response untagPost(
}

private Response doAction(TagRefRequest tagRefRequest) {
tagKafkaProducer.sendToTagRefLog(tagRefRequest);
consumersManager.getTagKafkaProducer().sendToTagRefLog(tagRefRequest);

UpdateResponse updateResponse = new UpdateResponse();
updateResponse.id = tagRefRequest.id;
Expand Down
Expand Up @@ -108,7 +108,7 @@ public void run(ArlasTaggerConfiguration configuration, Environment environment)
environment.jersey().register(new JsonProcessingExceptionMapper());
environment.jersey().register(new ConstraintViolationExceptionMapper());
environment.jersey().register(new ElasticsearchExceptionMapper());
environment.jersey().register(new TagRESTService(tagKafkaProducer, configuration.statusTimeout));
environment.jersey().register(new TagRESTService(consumersManager, configuration.statusTimeout));
environment.jersey().register(new TagStatusRESTService(tagExploreService));

//filters
Expand Down
8 changes: 7 additions & 1 deletion docs/arlas-api-tagging.md
Expand Up @@ -48,10 +48,16 @@ that can be used to request the status of the tagging process itself.
```shell
curl -X GET \
--header 'Accept: application/json;charset=utf-8' \
-d '{ "search": {}, "tag": { "path": "plant.color"}}' \
'http://<arlas-tagger-host>:<arlas-tagger-port>/arlas/status/geodata/_tag?id=...'
```

Tagging can be replayed. The Kafka reference log is read again from the given offset and all tagging requests
are executed again in the same order than initially received.
```shell
curl -X GET \
--header 'Accept: application/json;charset=utf-8' \
'http://<arlas-tagger-host>:<arlas-tagger-port>/arlas/write/geodata/_tagreplay?offset=...'
```

!!! warning
Only taggable fields can be tagged. In order to be taggable, a field must have its path provided in the `CollectionReference`, more specifically in `params.taggable_fields`.
2 changes: 1 addition & 1 deletion docs/arlas-tagger-overview.md
Expand Up @@ -3,7 +3,7 @@
The ARLAS Tagger offers 3 APIs:

- a `write` API for [tagging](arlas-api-tagging.md), meaning adding (or removing) a value to a field in ARLAS `collections` (`http://.../arlas_tagger/write/`)
- a `status` API for monitoring the `tagging` operation status (`http://.../arlas_tagger/status/`
- a `status` API for monitoring the `tagging` operation status (`http://.../arlas_tagger/status/`)
- an API for monitoring the server health and performances
- endpoints for testing the write API and the status API with swagger

Expand Down

0 comments on commit 8099f37

Please sign in to comment.