Skip to content

Commit

Permalink
[ARCTIC-1295] fix writing into an empty KeyedTable with flink/spark 0…
Browse files Browse the repository at this point in the history
….3.x/0.4.0 and AMS 0.4.1 (#1296)

* table with currentTxId == 0 shoule be valid for allocate TxId from AMS

* valid/invalid KeyedTable for allocating transaction id after table created/dropped
  • Loading branch information
wangtaohz committed Mar 29, 2023
1 parent 00c2d5b commit 7b11dc9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@ public long allocateTransactionId(TableIdentifier tableIdentifier, String signat
throw new TException(tableIdentifier + " is not keyed table");
}
}

public void validTable(TableIdentifier tableIdentifier) {
validTableIdentifiers.add(tableIdentifier);
LOG.info("{} is now valid for allocating transaction id", tableIdentifier);
}

public void inValidTable(TableIdentifier tableIdentifier) {
validTableIdentifiers.remove(tableIdentifier);
LOG.info("{} is now invalid for allocating transaction id", tableIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public void createTable(TableMetadata tableMetadata) {

TABLE_META_STORE_CACHE.put(new Key(tableMetadata.getTableIdentifier(), tableMetadata.getMetaStore()),
tableMetadata.getMetaStore());

try {
if (StringUtils.isNotBlank(tableMetadata.getPrimaryKey())) {
ServiceContainer.getArcticTransactionService()
.validTable(tableMetadata.getTableIdentifier().buildTableIdentifier());
}
} catch (Exception e) {
LOG.warn("createTable success but failed to valid for allocating transaction id", e);
}

try {
ServiceContainer.getOptimizeService().addNewTable(tableMetadata.getTableIdentifier());
} catch (Exception e) {
Expand Down Expand Up @@ -140,6 +150,16 @@ public void dropTableMetadata(TableIdentifier tableIdentifier,
}

TABLE_META_STORE_CACHE.remove(new Key(tableMetadata.getTableIdentifier(), tableMetadata.getMetaStore()));

try {
if (StringUtils.isNotBlank(tableMetadata.getPrimaryKey())) {
ServiceContainer.getArcticTransactionService()
.inValidTable(tableMetadata.getTableIdentifier().buildTableIdentifier());
}
} catch (Exception e) {
LOG.warn("dropTable success but failed to invalid for allocating transaction id", e);
}

try {
ServiceContainer.getOptimizeService().clearRemovedTable(tableMetadata.getTableIdentifier());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ private void updateTransactionIdOfAllKeyedTable() {
TableCommitMeta tableCommitMeta = getTableCommitMeta(tableIdentifier, createSnapshot);
ServiceContainer.getFileInfoCacheService().commitCacheFileInfo(tableCommitMeta);
}
} else {
ServiceContainer.getArcticTransactionService().validTable(tableIdentifier.buildTableIdentifier());
}
}
} catch (Throwable t) {
Expand Down

0 comments on commit 7b11dc9

Please sign in to comment.