Skip to content

Commit

Permalink
[Hotfix]Delete sync tables when drop external catalogs (apache#2235)
Browse files Browse the repository at this point in the history
  • Loading branch information
shidayang authored and ShawHee committed Dec 29, 2023
1 parent 2c931c9 commit 830fea6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.netease.arctic.AmoroTable;
import com.netease.arctic.TableIDWithFormat;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.server.exception.IllegalMetadataException;
import com.netease.arctic.server.persistence.PersistentBase;
import com.netease.arctic.server.persistence.mapper.CatalogMetaMapper;

Expand Down Expand Up @@ -40,4 +41,15 @@ public void updateMetadata(CatalogMeta metadata) {
public abstract List<TableIDWithFormat> listTables(String database);

public abstract AmoroTable<?> loadTable(String database, String tableName);

public void dispose() {
doAsTransaction(
() ->
doAsExisted(
CatalogMetaMapper.class,
mapper -> mapper.deleteCatalog(name()),
() ->
new IllegalMetadataException(
"Catalog " + name() + " has more than one database or table")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ private void initServerCatalog(CatalogMeta catalogMeta) {
@Override
public void dropCatalog(String catalogName) {
checkStarted();
doAsExisted(
CatalogMetaMapper.class,
mapper -> mapper.deleteCatalog(catalogName),
() ->
new IllegalMetadataException(
"Catalog " + catalogName + " has more than one database or table"));
ServerCatalog serverCatalog = getServerCatalog(catalogName);
if (serverCatalog == null) {
throw new ObjectNotExistsException("Catalog " + catalogName);
}

// TableRuntime cleanup is responsibility by exploreExternalCatalog method
serverCatalog.dispose();
internalCatalogMap.remove(catalogName);
externalCatalogMap.remove(catalogName);
}
Expand Down Expand Up @@ -491,8 +492,7 @@ void exploreExternalCatalog() {
CompletableFuture.runAsync(
() -> {
try {
disposeTable(
externalCatalog, serverTableIdentifiers.get(tableIdentity));
disposeTable(serverTableIdentifiers.get(tableIdentity));
} catch (Exception e) {
LOG.error(
"TableExplorer dispose table {} error",
Expand All @@ -511,6 +511,20 @@ void exploreExternalCatalog() {
LOG.error("TableExplorer error", e);
}
}

// Clear TableRuntime objects that do not correspond to a catalog.
// This scenario is mainly due to the fact that TableRuntime objects were not cleaned up in a
// timely manner during the process of dropping the catalog due to concurrency considerations.
// It is permissible to have some erroneous states in the middle, as long as the final data is
// consistent.
Set<String> catalogNames =
listCatalogMetas().stream().map(CatalogMeta::getCatalogName).collect(Collectors.toSet());
for (TableRuntime tableRuntime : tableRuntimeMap.values()) {
if (!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) {
disposeTable(tableRuntime.getTableIdentifier());
}
}

long end = System.currentTimeMillis();
LOG.info("Syncing external catalogs took {} ms.", end - start);
}
Expand Down Expand Up @@ -600,9 +614,14 @@ private void revertTableRuntimeAdded(
}
}

private void disposeTable(
ExternalCatalog externalCatalog, ServerTableIdentifier tableIdentifier) {
externalCatalog.disposeTable(tableIdentifier.getDatabase(), tableIdentifier.getTableName());
private void disposeTable(ServerTableIdentifier tableIdentifier) {
doAs(
TableMetaMapper.class,
mapper ->
mapper.deleteTableIdByName(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));
Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier))
.ifPresent(
tableRuntime -> {
Expand Down

0 comments on commit 830fea6

Please sign in to comment.