Skip to content

Commit

Permalink
Merge branch 'master' into issue-2423
Browse files Browse the repository at this point in the history
  • Loading branch information
baiyangtx authored Dec 13, 2023
2 parents 0a5a292 + cf9c193 commit a00948e
Show file tree
Hide file tree
Showing 146 changed files with 2,849 additions and 4,386 deletions.
4 changes: 4 additions & 0 deletions ams/dist/src/main/arctic-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ ams:
enabled: true
thread-count: 10

clean-dangling-delete-files:
enabled: true
thread-count: 10

sync-hive-tables:
enabled: true
thread-count: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ public class ArcticManagementConf {
.defaultValue(10)
.withDescription("The number of threads used for orphan files cleaning.");

public static final ConfigOption<Boolean> CLEAN_DANGLING_DELETE_FILES_ENABLED =
ConfigOptions.key("clean-dangling-delete-files.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Enable dangling delete files cleaning.");

public static final ConfigOption<Integer> CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT =
ConfigOptions.key("clean-dangling-delete-files.thread-count")
.intType()
.defaultValue(10)
.withDescription("The number of threads used for dangling delete files cleaning.");

public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void startService() throws Exception {
addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) {

// clear metadata files
cleanMetadata(System.currentTimeMillis() - keepTime);
}

@Override
public void cleanDanglingDeleteFiles(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();

if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
return;
}

// refresh
table.refresh();
Snapshot currentSnapshot = table.currentSnapshot();
java.util.Optional<String> totalDeleteFiles =
java.util.Optional.ofNullable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
// TODO TableMaintainer should not be in this optimizing.xxx package.
public interface TableMaintainer {

/** Clean table orphan files. Includes: data files, metadata files, dangling delete files. */
/** Clean table orphan files. Includes: data files, metadata files. */
void cleanOrphanFiles(TableRuntime tableRuntime);

/** Clean table dangling delete files. */
default void cleanDanglingDeleteFiles(TableRuntime tableRuntime) {
// DO nothing by default
}
/**
* Expire snapshots. The optimizing based on the snapshot that the current table relies on will
* not expire according to TableRuntime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class AsyncTableExecutors {
private SnapshotsExpiringExecutor snapshotsExpiringExecutor;
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor;
private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor;
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
private OptimizingExpiringExecutor optimizingExpiringExecutor;
Expand All @@ -32,6 +33,12 @@ public void setup(TableManager tableManager, Configurations conf) {
new OrphanFilesCleaningExecutor(
tableManager, conf.getInteger(ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT));
}
if (conf.getBoolean(ArcticManagementConf.CLEAN_DANGLING_DELETE_FILES_ENABLED)) {
this.danglingDeleteFilesCleaningExecutor =
new DanglingDeleteFilesCleaningExecutor(
tableManager,
conf.getInteger(ArcticManagementConf.CLEAN_DANGLING_DELETE_FILES_THREAD_COUNT));
}
this.optimizingCommitExecutor =
new OptimizingCommitExecutor(
tableManager, conf.getInteger(ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT));
Expand Down Expand Up @@ -75,6 +82,10 @@ public OrphanFilesCleaningExecutor getOrphanFilesCleaningExecutor() {
return orphanFilesCleaningExecutor;
}

public DanglingDeleteFilesCleaningExecutor getDanglingDeleteFilesCleaningExecutor() {
return danglingDeleteFilesCleaningExecutor;
}

public BlockerExpiringExecutor getBlockerExpiringExecutor() {
return blockerExpiringExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server.table.executor;

import static com.netease.arctic.server.optimizing.maintainer.TableMaintainer.ofTable;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.server.optimizing.maintainer.TableMaintainer;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Clean table dangling delete files */
public class DanglingDeleteFilesCleaningExecutor extends BaseTableExecutor {

private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleaningExecutor.class);

private static final long INTERVAL = 24 * 60 * 60 * 1000L;

protected DanglingDeleteFilesCleaningExecutor(TableManager tableManager, int poolSize) {
super(tableManager, poolSize);
}

@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime.getTableConfiguration().isDeleteDanglingDeleteFilesEnabled();
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}

@Override
protected void execute(TableRuntime tableRuntime) {
try {
LOG.info("{} start cleaning dangling delete files", tableRuntime.getTableIdentifier());
AmoroTable<?> amoroTable = loadTable(tableRuntime);
TableMaintainer tableMaintainer = ofTable(amoroTable);
tableMaintainer.cleanDanglingDeleteFiles(tableRuntime);
} catch (Throwable t) {
LOG.error("{} failed to clean dangling delete file", tableRuntime.getTableIdentifier(), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void cleanDanglingDeleteFiles() throws IOException {
.commit();
assertDanglingDeleteFiles(testTable, 1);

MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(testTable);
IcebergTableMaintainer tableMaintainer = new IcebergTableMaintainer(testTable);
tableMaintainer.cleanDanglingDeleteFiles();

assertDanglingDeleteFiles(testTable, 0);
Expand Down
24 changes: 23 additions & 1 deletion core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,29 @@ public class CommonUnifiedCatalog implements UnifiedCatalog {
private Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newHashMap();
private final Map<String, String> properties = Maps.newHashMap();

private TableMetaStore tableMetaStore;

public CommonUnifiedCatalog(
Supplier<CatalogMeta> catalogMetaSupplier, Map<String, String> properties) {
CatalogMeta catalogMeta = catalogMetaSupplier.get();
CatalogUtil.mergeCatalogProperties(catalogMeta, properties);
this.meta = catalogMeta;
this.tableMetaStore = CatalogUtil.buildMetaStore(catalogMeta);
this.properties.putAll(properties);
this.metaSupplier = catalogMetaSupplier;
initializeFormatCatalogs();
}

@Override
public String metastoreType() {
return meta.getCatalogType();
}

@Override
public TableMetaStore authenticationContext() {
return this.tableMetaStore;
}

@Override
public List<String> listDatabases() {
return findFirstFormatCatalog(TableFormat.values()).listDatabases();
Expand Down Expand Up @@ -163,19 +176,28 @@ public synchronized void refresh() {
if (newMeta.equals(this.meta)) {
return;
}
this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta);
this.meta = newMeta;
this.initializeFormatCatalogs();
}

@Override
public Map<String, String> properties() {
return this.meta.getCatalogProperties();
}

protected void initializeFormatCatalogs() {
ServiceLoader<FormatCatalogFactory> loader = ServiceLoader.load(FormatCatalogFactory.class);
Set<TableFormat> formats = CatalogUtil.tableFormats(this.meta);
TableMetaStore store = CatalogUtil.buildMetaStore(this.meta);
Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newConcurrentMap();
for (FormatCatalogFactory factory : loader) {
if (formats.contains(factory.format())) {
Map<String, String> catalogProperties =
factory.convertCatalogProperties(
name(), meta.getCatalogType(), meta.getCatalogProperties());
FormatCatalog catalog =
factory.create(name(), meta.getCatalogType(), meta.getCatalogProperties(), store);
factory.create(name(), meta.getCatalogType(), catalogProperties, store);
formatCatalogs.put(factory.format(), catalog);
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/com/netease/arctic/FormatCatalogFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,16 @@ FormatCatalog create(

/** format of this catalog factory */
TableFormat format();

/**
* Convert UnifiedCatalog Properties to corresponding format Properties and use them to initialize
* the corresponding Catalog.
*
* @param catalogName register in AMS
* @param metastoreType metastore type
* @param unifiedCatalogProperties properties of unified catalog.
* @return properties of the target format.
*/
Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties);
}
12 changes: 12 additions & 0 deletions core/src/main/java/com/netease/arctic/TableIDWithFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,16 @@ public TableIdentifier getIdentifier() {
public TableFormat getTableFormat() {
return tableFormat;
}

public String catalog() {
return this.identifier.getCatalog();
}

public String database() {
return this.identifier.getDatabase();
}

public String table() {
return this.identifier.getTableName();
}
}
24 changes: 22 additions & 2 deletions core/src/main/java/com/netease/arctic/UnifiedCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,29 @@

package com.netease.arctic;

import com.netease.arctic.table.TableMetaStore;

import java.util.List;
import java.util.Map;

/** UnifiedCatalog is a catalog that can visit tables with all types of formats. */
public interface UnifiedCatalog extends AmoroCatalog {

/** name of this catalog */
/** Name of this catalog */
String name();

/** Metastore type */
String metastoreType();

/**
* Get authentication context of this catalog.
*
* @return table metastore.
*/
TableMetaStore authenticationContext();

/**
* list tables with format
* List tables with format
*
* @param database given database
* @return identifier and format list
Expand All @@ -36,4 +49,11 @@ public interface UnifiedCatalog extends AmoroCatalog {

/** Refresh catalog meta */
void refresh();

/**
* Get catalog properties
*
* @return catalog properties
*/
Map<String, String> properties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public FormatCatalog create(
public TableFormat format() {
return TableFormat.ICEBERG;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
return com.netease.arctic.utils.CatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, metastoreType, unifiedCatalogProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.netease.arctic.FormatCatalog;
import com.netease.arctic.FormatCatalogFactory;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.ams.api.properties.CatalogMetaProperties;
import com.netease.arctic.catalog.ArcticCatalog;
import com.netease.arctic.catalog.CatalogLoader;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;

import java.util.Map;

Expand All @@ -43,4 +45,14 @@ public FormatCatalog create(
public TableFormat format() {
return TableFormat.MIXED_ICEBERG;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
Map<String, String> properties =
CatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, metastoreType, unifiedCatalogProperties);
properties.put(CatalogMetaProperties.TABLE_FORMATS, format().name());
return properties;
}
}
Loading

0 comments on commit a00948e

Please sign in to comment.