Skip to content

Commit

Permalink
fix(post-importer): refactor incident processing (#4567)
Browse files Browse the repository at this point in the history
* create new index `post-importer-queue`
* store last processed post import position in `import-position` index
  (new version)
* we don't use `pendingIncident` flag in `list-view` index anymore, instead
  we iterate over `post-importer-queue` records
* we avoid updating incident entities from different thread:
  * importer only inserts data
  * post importer updates
* implement migration to fill in `post-importer-queue`
* unignore flaky tests related with incidents

closes #4524

* fix(post-importer): fix migration...

...to not be applied on every restart

* fix(post-importer): fix migration step equals method
  • Loading branch information
sdorokhova committed Jun 7, 2023
1 parent fb591dd commit 2466a0b
Show file tree
Hide file tree
Showing 50 changed files with 1,444 additions and 543 deletions.
Expand Up @@ -6,6 +6,8 @@
*/
package io.camunda.operate.entities;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.time.OffsetDateTime;
import java.util.Objects;

Expand All @@ -14,7 +16,7 @@ public class IncidentEntity extends OperateZeebeEntity<IncidentEntity> {
private ErrorType errorType;

private String errorMessage;

// Is only used by binding to ES results
private Integer errorMessageHash;

Expand All @@ -36,6 +38,8 @@ public class IncidentEntity extends OperateZeebeEntity<IncidentEntity> {

private String treePath;

@Deprecated
@JsonIgnore
private boolean pending = true;

public ErrorType getErrorType() {
Expand All @@ -56,7 +60,7 @@ public IncidentEntity setErrorMessage(String errorMessage) {
setErrorMessageHash(errorMessage.hashCode());
return this;
}

public void setErrorMessageHash(Integer errorMessageHash) {
this.errorMessageHash = errorMessageHash;
}
Expand Down
Expand Up @@ -6,9 +6,31 @@
*/
package io.camunda.operate.entities;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public enum IncidentState {

ACTIVE,
RESOLVED
ACTIVE("CREATED"), RESOLVED("RESOLVED"), PENDING(null);

private static Map<String, IncidentState> intentMap = new HashMap<>();

static {
Arrays.stream(IncidentState.values()).forEach(is -> intentMap.put(is.getZeebeIntent(), is));
}

private String zeebeIntent;

IncidentState(String zeebeIntent) {
this.zeebeIntent = zeebeIntent;
}

public String getZeebeIntent() {
return zeebeIntent;
}

public static IncidentState createFrom(String zeebeIntent) {
return intentMap.get(zeebeIntent);
}
}
Expand Up @@ -6,6 +6,7 @@
*/
package io.camunda.operate.entities.listview;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.camunda.operate.entities.FlowNodeState;
import io.camunda.operate.entities.FlowNodeType;
import io.camunda.operate.entities.OperateZeebeEntity;
Expand All @@ -21,10 +22,14 @@ public class FlowNodeInstanceForListViewEntity extends OperateZeebeEntity<FlowNo
private String activityId;
private FlowNodeState activityState;
private FlowNodeType activityType;
@Deprecated
@JsonIgnore
private List<Long> incidentKeys = new ArrayList<>();
private String errorMessage;
private boolean incident;

@Deprecated
@JsonIgnore
private boolean pendingIncident;

private ListViewJoinRelation joinRelation = new ListViewJoinRelation(ListViewTemplate.ACTIVITIES_JOIN_RELATION);
Expand Down Expand Up @@ -117,14 +122,15 @@ public void setJoinRelation(ListViewJoinRelation joinRelation) {
if (!super.equals(o))
return false;
FlowNodeInstanceForListViewEntity that = (FlowNodeInstanceForListViewEntity) o;
return incident == that.incident && pendingIncident == that.pendingIncident && Objects.equals(processInstanceKey,
that.processInstanceKey) && Objects.equals(activityId, that.activityId) && activityState == that.activityState
&& activityType == that.activityType && Objects.equals(incidentKeys, that.incidentKeys) && Objects.equals(
return incident == that.incident && Objects.equals(processInstanceKey, that.processInstanceKey) && Objects.equals(
activityId,
that.activityId) && activityState == that.activityState && activityType == that.activityType && Objects.equals(
errorMessage, that.errorMessage) && Objects.equals(joinRelation, that.joinRelation);
}

@Override public int hashCode() {
return Objects.hash(super.hashCode(), processInstanceKey, activityId, activityState, activityType, incidentKeys,
errorMessage, incident, pendingIncident, joinRelation);
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), processInstanceKey, activityId, activityState, activityType, errorMessage,
incident, joinRelation);
}
}
Expand Up @@ -20,6 +20,8 @@ public class ImportPositionEntity extends OperateEntity<ImportPositionEntity> {

private long sequence;

private Long postImporterPosition;

private String indexName;

public String getAliasName() {
Expand Down Expand Up @@ -58,6 +60,15 @@ public ImportPositionEntity setSequence(final long sequence){
return this;
}

public Long getPostImporterPosition() {
return postImporterPosition;
}

public ImportPositionEntity setPostImporterPosition(Long postImporterPosition) {
this.postImporterPosition = postImporterPosition;
return this;
}

public String getIndexName() {
return indexName;
}
Expand Down Expand Up @@ -90,17 +101,17 @@ public boolean equals(Object o) {
return false;
ImportPositionEntity that = (ImportPositionEntity) o;
return partitionId == that.partitionId && position == that.position && sequence == that.sequence && Objects.equals(
aliasName, that.aliasName) && Objects.equals(indexName, that.indexName);
aliasName, that.aliasName) && Objects.equals(postImporterPosition, that.postImporterPosition) && Objects.equals(
indexName, that.indexName);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), aliasName, partitionId, position, sequence, indexName);
return Objects.hash(super.hashCode(), aliasName, partitionId, position, sequence, postImporterPosition, indexName);
}

@Override
public String toString() {
return "ImportPositionEntity{" + "aliasName='" + aliasName + '\''+ ", sequence=" + sequence + ", partitionId=" + partitionId + ", position=" + position + ", indexName='" + indexName
+ '\'' + '}';
return "ImportPositionEntity{" + "aliasName='" + aliasName + '\'' + ", partitionId=" + partitionId + ", position=" + position + ", sequence=" + sequence + ", postImporterSequence=" + postImporterPosition + ", indexName='" + indexName + '\'' + '}';
}
}
@@ -0,0 +1,13 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.operate.entities.post;

public enum PostImporterActionType {

INCIDENT

}
@@ -0,0 +1,113 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.operate.entities.post;

import io.camunda.operate.entities.OperateEntity;

import java.time.OffsetDateTime;
import java.util.Objects;

public class PostImporterQueueEntity extends OperateEntity<PostImporterQueueEntity> {

private Long key;

private PostImporterActionType actionType;

private String intent;

private OffsetDateTime creationTime;

private Integer partitionId;

private Long processInstanceKey;

private Long position;

public Long getKey() {
return key;
}

public PostImporterQueueEntity setKey(Long key) {
this.key = key;
return this;
}

public PostImporterActionType getActionType() {
return actionType;
}

public PostImporterQueueEntity setActionType(PostImporterActionType actionType) {
this.actionType = actionType;
return this;
}

public String getIntent() {
return intent;
}

public PostImporterQueueEntity setIntent(String intent) {
this.intent = intent;
return this;
}

public OffsetDateTime getCreationTime() {
return creationTime;
}

public PostImporterQueueEntity setCreationTime(OffsetDateTime creationTime) {
this.creationTime = creationTime;
return this;
}

public Integer getPartitionId() {
return partitionId;
}

public PostImporterQueueEntity setPartitionId(Integer partitionId) {
this.partitionId = partitionId;
return this;
}

public Long getProcessInstanceKey() {
return processInstanceKey;
}

public PostImporterQueueEntity setProcessInstanceKey(Long processInstanceKey) {
this.processInstanceKey = processInstanceKey;
return this;
}

public Long getPosition() {
return position;
}

public PostImporterQueueEntity setPosition(Long position) {
this.position = position;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
PostImporterQueueEntity that = (PostImporterQueueEntity) o;
return Objects.equals(key, that.key) && actionType == that.actionType && Objects.equals(intent,
that.intent) && Objects.equals(creationTime, that.creationTime) && Objects.equals(partitionId,
that.partitionId) && Objects.equals(processInstanceKey, that.processInstanceKey) && Objects.equals(position,
that.position);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), key, actionType, intent, creationTime, partitionId, processInstanceKey,
position);
}
}
Expand Up @@ -48,7 +48,12 @@ public void initializeSchema() throws MigrationException {
}
if (migrationProperties.isMigrationEnabled()) {
LOGGER.info("SchemaStartup: migrate schema.");
migrator.migrate();
try {
migrator.migrate();
} catch (Exception ex) {
LOGGER.error("Exception occured during migration: " + ex.getMessage(), ex);
throw ex;
}
}
LOGGER.info("SchemaStartup finished.");
}
Expand Down
Expand Up @@ -17,6 +17,8 @@ public class ImportPositionIndex extends AbstractIndexDescriptor implements Prio
public static final String ID = "id";
public static final String POSITION = "position";
public static final String SEQUENCE = "sequence";

public static final String POST_IMPORTER_POSITION = "postImporterPosition";
public static final String FIELD_INDEX_NAME = "indexName";

@Override
Expand All @@ -26,7 +28,7 @@ public String getIndexName() {

@Override
public String getVersion() {
return "8.2.0";
return "8.3.0";
}

}
Expand Up @@ -88,15 +88,13 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass())
return false;
AbstractStep that = (AbstractStep) o;
return isApplied == that.isApplied && Objects.equals(content, that.content) && Objects.equals(description,
that.description) && Objects.equals(createdDate, that.createdDate) && Objects.equals(appliedDate,
that.appliedDate) && Objects.equals(indexName, that.indexName) && Objects.equals(version,
that.version) && Objects.equals(order, that.order);
return Objects.equals(indexName, that.indexName) && Objects.equals(version, that.version) && Objects.equals(order,
that.order);
}

@Override
public int hashCode() {
return Objects.hash(content, description, createdDate, appliedDate, indexName, isApplied, version, order);
return Objects.hash(indexName, version, order);
}

@Override
Expand Down
@@ -0,0 +1,13 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.operate.schema.migration;

/**
* This migration step fills the new index with data. Does not require index of older version.
*/
public interface DataInitializerStep {
}

0 comments on commit 2466a0b

Please sign in to comment.