Skip to content

Commit

Permalink
[Refactor] Synchronize OLAP external table metadata when loading data…
Browse files Browse the repository at this point in the history
…(cherry-pick 3.1) (#31651)

backport #24739 #30124

Signed-off-by: gengjun-git <gengjun@starrocks.com>
  • Loading branch information
gengjun-git committed Sep 25, 2023
1 parent b1456c5 commit 587f4ce
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 357 deletions.
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ public void readUnlock() {
this.rwLock.readLock().unlock();
}

public boolean isReadLockHeldByCurrentThread() {
return this.rwLock.getReadHoldCount() > 0;
}

public void writeLock() {
long startMs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
String threadDump = getOwnerInfo(rwLock.getOwner());
Expand Down
427 changes: 212 additions & 215 deletions fe/fe-core/src/main/java/com/starrocks/catalog/ExternalOlapTable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ public OlapTable(long id, String tableName, List<Column> baseSchema, KeysType ke

// Only Copy necessary metadata for query.
// We don't do deep copy, because which is very expensive;
public OlapTable copyOnlyForQuery() {
OlapTable olapTable = new OlapTable();
public void copyOnlyForQuery(OlapTable olapTable) {
olapTable.id = this.id;
olapTable.name = this.name;
olapTable.fullSchema = Lists.newArrayList(this.fullSchema);
Expand Down Expand Up @@ -321,7 +320,6 @@ public OlapTable copyOnlyForQuery() {
if (this.tableProperty != null) {
olapTable.tableProperty = this.tableProperty.copy();
}
return olapTable;
}

public BinlogConfig getCurBinlogConfig() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.starrocks.catalog.ExternalOlapTable;
import com.starrocks.common.Config;
import com.starrocks.rpc.FrontendServiceProxy;
import com.starrocks.sql.common.MetaNotFoundException;
import com.starrocks.thrift.TGetTableMetaRequest;
import com.starrocks.thrift.TGetTableMetaResponse;
import com.starrocks.thrift.TNetworkAddress;
Expand All @@ -30,7 +31,7 @@
public class TableMetaSyncer {
private static final Logger LOG = LogManager.getLogger(TableMetaSyncer.class);

public void syncTable(ExternalOlapTable table) {
public void syncTable(ExternalOlapTable table) throws MetaNotFoundException {
String host = table.getSourceTableHost();
int port = table.getSourceTablePort();
TNetworkAddress addr = new TNetworkAddress(host, port);
Expand All @@ -50,11 +51,13 @@ public void syncTable(ExternalOlapTable table) {
errMsg = "";
}
LOG.warn("get TableMeta failed: {}", errMsg);
throw new MetaNotFoundException(errMsg);
} else {
table.updateMeta(request.getDb_name(), response.getTable_meta(), response.getBackends());
}
} catch (Exception e) {
LOG.warn("call fe {} refreshTable rpc method failed", addr, e);
throw new MetaNotFoundException("get TableMeta failed from " + addr + ", error: " + e.getMessage());
}
}
};
}
9 changes: 5 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1559,10 +1559,11 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
String dbName = stmt.getTableName().getDb();
String tableName = stmt.getTableName().getTbl();
Database database = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogName, dbName);
Table targetTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(catalogName, dbName, tableName);
if (isExplainAnalyze) {
Preconditions.checkState(targetTable instanceof OlapTable,
"explain analyze only supports insert into olap native table");
Table targetTable;
if (stmt instanceof InsertStmt && ((InsertStmt) stmt).getTargetTable() != null) {
targetTable = ((InsertStmt) stmt).getTargetTable();
} else {
targetTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(catalogName, dbName, tableName);
}

if (parsedStmt instanceof InsertStmt && ((InsertStmt) parsedStmt).isOverwrite() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
import com.starrocks.connector.hive.events.MetastoreEventsProcessor;
import com.starrocks.consistency.ConsistencyChecker;
import com.starrocks.credential.CredentialUtil;
import com.starrocks.external.starrocks.StarRocksRepository;
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.ha.HAProtocol;
import com.starrocks.ha.LeaderInfo;
Expand Down Expand Up @@ -388,7 +387,6 @@ public class GlobalStateMgr {
private Daemon replayer;
private Daemon timePrinter;
private EsRepository esRepository; // it is a daemon, so add it here
private StarRocksRepository starRocksRepository;
private MetastoreEventsProcessor metastoreEventsProcessor;
private ConnectorTableMetadataProcessor connectorTableMetadataProcessor;

Expand Down Expand Up @@ -693,7 +691,6 @@ private GlobalStateMgr(boolean isCkptGlobalState) {
this.resourceGroupMgr = new ResourceGroupMgr();

this.esRepository = new EsRepository();
this.starRocksRepository = new StarRocksRepository();
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.connectorTableMetadataProcessor = new ConnectorTableMetadataProcessor();

Expand Down Expand Up @@ -1383,7 +1380,6 @@ private void startAllNodeTypeDaemonThreads() {
labelCleaner.start();
// ES state store
esRepository.start();
starRocksRepository.start();

if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
Expand Down Expand Up @@ -1554,7 +1550,6 @@ public void loadImage(String imageDir) throws IOException, DdlException {
localMetastore.recreateTabletInvertIndex();
// rebuild es state state
esRepository.loadTableFromCatalog();
starRocksRepository.loadTableFromCatalog();

checksum = load.loadLoadJob(dis, checksum);
checksum = loadAlterJob(dis, checksum);
Expand Down Expand Up @@ -3216,10 +3211,6 @@ public EsRepository getEsRepository() {
return this.esRepository;
}

public StarRocksRepository getStarRocksRepository() {
return this.starRocksRepository;
}

public MetastoreEventsProcessor getMetastoreEventsProcessor() {
return this.metastoreEventsProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4840,7 +4840,6 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept

recreateTabletInvertIndex();
GlobalStateMgr.getCurrentState().getEsRepository().loadTableFromCatalog();
GlobalStateMgr.getCurrentState().getStarRocksRepository().loadTableFromCatalog();

/*
* defaultCluster has no meaning, it is only for compatibility with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ public Void visitTable(TableRelation node, Void context) {
OlapTable table = (OlapTable) node.getTable();
olapTables.add(table);
// Only copy the necessary olap table meta to avoid the lock when plan query
OlapTable copied = table.copyOnlyForQuery();
OlapTable copied = new OlapTable();
table.copyOnlyForQuery(copied);
node.setTable(copied);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.analysis.NullLiteral;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ExternalOlapTable;
import com.starrocks.catalog.HiveTable;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.MaterializedView;
Expand All @@ -32,6 +33,7 @@
import com.starrocks.common.AnalysisException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.external.starrocks.TableMetaSyncer;
import com.starrocks.planner.IcebergTableSink;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.CatalogMgr;
Expand Down Expand Up @@ -80,6 +82,10 @@ public static void analyze(InsertStmt insertStmt, ConnectContext session) {
Database database = MetaUtils.getDatabase(catalogName, dbName);
Table table = MetaUtils.getTable(catalogName, dbName, tableName);

if (table instanceof ExternalOlapTable) {
table = getOLAPExternalTableMeta(database, (ExternalOlapTable) table);
}

if (table instanceof MaterializedView && !insertStmt.isSystem()) {
throw new SemanticException(
"The data of '%s' cannot be inserted because '%s' is a materialized view," +
Expand Down Expand Up @@ -310,4 +316,23 @@ private static void checkStaticKeyPartitionInsert(InsertStmt insertStmt, Table t
}
}
}

private static ExternalOlapTable getOLAPExternalTableMeta(Database database, ExternalOlapTable externalOlapTable) {
// copy the table, and release database lock when synchronize table meta
ExternalOlapTable copiedTable = new ExternalOlapTable();
externalOlapTable.copyOnlyForQuery(copiedTable);
int lockTimes = 0;
while (database.isReadLockHeldByCurrentThread()) {
database.readUnlock();
lockTimes++;
}
try {
new TableMetaSyncer().syncTable(copiedTable);
} finally {
while (lockTimes-- > 0) {
database.readLock();
}
}
return copiedTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,8 @@ private Table resolveTable(TableRelation tableRelation) {
try {
// Add read lock to avoid concurrent problems.
database.readLock();
OlapTable mvOlapTable = ((OlapTable) mvTable).copyOnlyForQuery();
OlapTable mvOlapTable = new OlapTable();
((OlapTable) mvTable).copyOnlyForQuery(mvOlapTable);
// Copy the necessary olap table meta to avoid changing original meta;
mvOlapTable.setBaseIndexId(materializedIndex.second.getIndexId());
table = mvOlapTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public void testCopyOnlyForQuery() {
OlapTable olapTable = new OlapTable();
olapTable.setHasDelete();

OlapTable copied = olapTable.copyOnlyForQuery();
OlapTable copied = new OlapTable();
olapTable.copyOnlyForQuery(copied);

Assert.assertEquals(olapTable.hasDelete(), copied.hasDelete());
Assert.assertEquals(olapTable.hasForbitGlobalDict(), copied.hasForbitGlobalDict());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import com.starrocks.catalog.ExternalOlapTable;
import com.starrocks.catalog.Table;
import com.starrocks.leader.LeaderImpl;
import com.starrocks.meta.MetaContext;
import com.starrocks.qe.DDLStmtExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.thrift.TGetTableMetaRequest;
import com.starrocks.thrift.TGetTableMetaResponse;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -108,13 +110,6 @@ public void syncTableMeta() throws Exception {
"is_robot," +
"is_unpatrolled" +
")" +
"PARTITION BY RANGE(event_time)" +
"(" +
"PARTITION p06 VALUES LESS THAN ('2015-09-12 06:00:00')," +
"PARTITION p12 VALUES LESS THAN ('2015-09-12 12:00:00')," +
"PARTITION p18 VALUES LESS THAN ('2015-09-12 18:00:00')," +
"PARTITION p24 VALUES LESS THAN ('2015-09-13 00:00:00')" +
")" +
"DISTRIBUTED BY HASH(user)" +
"properties (" +
"\"host\" = \"127.0.0.2\"," +
Expand All @@ -134,6 +129,10 @@ public void syncTableMeta() throws Exception {

Table table = GlobalStateMgr.getCurrentState().getDb("test_db").getTable("test_ext_table");
ExternalOlapTable extTable = (ExternalOlapTable) table;
// remove the thread local meta context
MetaContext.remove();
extTable.updateMeta(request.getDb_name(), response.getTable_meta(), response.getBackends());
Assert.assertNull(MetaContext.get());
Assert.assertEquals(4, extTable.getPartitions().size());
}
}

0 comments on commit 587f4ce

Please sign in to comment.