Skip to content

Commit

Permalink
Add an extra delete mutation for CDC
Browse files Browse the repository at this point in the history
  • Loading branch information
kadirozde committed Jan 22, 2024
1 parent 7420443 commit da6ddad
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,13 @@ private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c
}
}
}
private Mutation getDeleteIndexMutation(Put dataRowState, IndexMaintainer indexMaintainer,
long ts, ImmutableBytesPtr rowKeyPtr) {
ValueGetter cdcDataRowVG = new IndexUtil.SimpleValueGetter(dataRowState);
byte[] indexRowKey = indexMaintainer.buildRowKey(cdcDataRowVG, rowKeyPtr, null, null, ts);
return indexMaintainer.buildRowDeleteMutation(indexRowKey,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
}

/**
* Generate the index update for a data row from the mutation that are obtained by merging the previous data row
Expand Down Expand Up @@ -960,13 +967,19 @@ private void prepareIndexMutations(BatchMutateContext context, List<IndexMaintai
}
} else if (currentDataRowState != null
&& indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState);
byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
null, null, ts);
Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
new Pair<Mutation, byte[]>(getDeleteIndexMutation(currentDataRowState,
indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get()));
if (indexMaintainer.isCDCIndex()) {
Put cdcDataRowState = new Put(currentDataRowState.getRow());
cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
ByteUtil.EMPTY_BYTE_ARRAY);
context.indexUpdates.put(hTableInterfaceReference,
new Pair<Mutation, byte[]>(getDeleteIndexMutation(cdcDataRowState,
indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get()));

}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.util.BitSet;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
Expand Down Expand Up @@ -437,6 +438,7 @@ public static IndexMaintainer getIndexMaintainer(List<IndexMaintainer> maintaine
private boolean isUncovered;
private Expression indexWhere;
private Set<ColumnReference> indexWhereColumns;
private boolean isCDCIndex;

protected IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
this.dataRowKeySchema = dataRowKeySchema;
Expand Down Expand Up @@ -674,6 +676,7 @@ private Void addDataColInfo(final PTable dataTable, Expression expression) {
this.indexWhere = index.getIndexWhereExpression(connection);
this.indexWhereColumns = index.getIndexWhereColumns(connection);
}
this.isCDCIndex = CDCUtil.isACDCIndex(index);

initCachedState();
}
Expand Down Expand Up @@ -1773,6 +1776,11 @@ public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer prot
maintainer.indexWhere = null;
maintainer.indexWhereColumns = null;
}
if (proto.hasIsCDCIndex()) {
maintainer.isCDCIndex = proto.getIsCDCIndex();
} else {
maintainer.isCDCIndex = false;
}
maintainer.initCachedState();
return maintainer;
}
Expand Down Expand Up @@ -1916,6 +1924,7 @@ public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer mainta
}
}
}
builder.setIsCDCIndex(maintainer.isCDCIndex);
return builder.build();
}

Expand Down Expand Up @@ -2239,6 +2248,9 @@ public boolean isLocalIndex() {
public boolean isUncovered() {
return isUncovered;
}
public boolean isCDCIndex() {
return isCDCIndex;
}

public boolean isImmutableRows() {
return immutableRows;
Expand Down
1 change: 1 addition & 0 deletions phoenix-core/src/main/protobuf/ServerCachingService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message IndexMaintainer {
optional bool isUncovered = 28;
optional bytes indexWhere = 29;
repeated ColumnReference indexWhereColumns = 30;
optional bool isCDCIndex = 31;
}

message TransformMaintainer {
Expand Down

0 comments on commit da6ddad

Please sign in to comment.