Skip to content

Commit

Permalink
[BugFix] Table UUID should only be related with original information …
Browse files Browse the repository at this point in the history
…of the table in connector mode (StarRocks#22244)

Signed-off-by: zombee0 <flylucas_10@163.com>

we use uuid to identify the external table. in catalog scenario,
tableid and table createtime changed offenly in deferent queries,
we should use external table's catalog, remote dbname, remote
tablename to identify the external table. To deal with that table
created after deleted, we also use the original table's createtime.
for iceberg table, we can use native uuid instread of createtime.

TODO: for jdbctable estable we can't identify the table created
after deleted.
  • Loading branch information
zombee0 committed Apr 24, 2023
1 parent 0fb6817 commit 976909e
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ public class DeltaLakeTable extends Table {
public static final String PARTITION_NULL_VALUE = "null";

public DeltaLakeTable(long id, String catalogName, String dbName, String tableName, List<Column> schema,
List<String> partitionNames, DeltaLog deltaLog) {
List<String> partitionNames, DeltaLog deltaLog, long createTime) {
super(id, tableName, TableType.DELTALAKE, schema);
this.catalogName = catalogName;
this.dbName = dbName;
this.tableName = tableName;
this.partColumnNames = partitionNames;
this.deltaLog = deltaLog;
this.createTime = createTime;
}

@Override
Expand Down
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public class EsTable extends Table {

// record the latest and recently exception when sync ES table metadata (mapping, shard location)
private Throwable lastMetaDataSyncException = null;
// used for catalog to identify the remote table.
private String catalogName = null;
private String dbName = null;

public EsTable() {
super(TableType.ELASTICSEARCH);
Expand All @@ -142,6 +145,15 @@ public EsTable(long id, String name, List<Column> schema, Map<String, String> pr
validate(properties);
}

public EsTable(long id, String catalogName, String dbName, String name, List<Column> schema, Map<String, String> properties,
PartitionInfo partitionInfo) throws DdlException {
super(id, name, TableType.ELASTICSEARCH, schema);
this.partitionInfo = partitionInfo;
this.catalogName = catalogName;
this.dbName = dbName;
validate(properties);
}

public Map<String, String> fieldsContext() {
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
Expand Down Expand Up @@ -318,6 +330,16 @@ public TTableDescriptor toThrift(List<ReferencedPartitionInfo> partitions) {
return tTableDescriptor;
}

// TODO, identify the remote table that created after deleted
@Override
public String getUUID() {
if (!Strings.isNullOrEmpty(catalogName)) {
return String.join(".", catalogName, dbName, name);
} else {
return Long.toString(id);
}
}

@Override
public int getSignature(int signatureVersion) {
Adler32 adler32 = new Adler32();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public String getRemoteTableName() {
@Override
public String getUUID() {
if (CatalogMgr.isExternalCatalog(catalogName)) {
return String.join(".", catalogName, remoteDbName, remoteTableName, Long.toString(createTime));
return String.join(".", catalogName, remoteDbName, remoteTableName,
((BaseTable) getNativeTable()).operations().current().uuid());
} else {
return Long.toString(id);
}
Expand Down
14 changes: 13 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class JDBCTable extends Table {
private String jdbcTable;
private Map<String, String> properties;
private String dbName;
private String catalogName;

public JDBCTable() {
super(TableType.JDBC);
Expand All @@ -55,10 +56,11 @@ public JDBCTable(long id, String name, List<Column> schema, Map<String, String>
validate(properties);
}

public JDBCTable(long id, String name, List<Column> schema, String dbName,
public JDBCTable(long id, String name, List<Column> schema, String dbName, String catalogName,
Map<String, String> properties) throws DdlException {
super(id, name, TableType.JDBC, schema);
this.dbName = dbName;
this.catalogName = catalogName;
validate(properties);
}

Expand Down Expand Up @@ -108,6 +110,16 @@ private void validate(Map<String, String> properties) throws DdlException {
}
}

// TODO, identify the remote table that created after deleted
@Override
public String getUUID() {
if (!Strings.isNullOrEmpty(catalogName)) {
return String.join(".", catalogName, dbName, name);
} else {
return Long.toString(id);
}
}

@Override
public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> partitions) {
TJDBCTable tJDBCTable = new TJDBCTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public Table getTable(String dbName, String tblName) {
}
HiveTable hiveTable = (HiveTable) table;
String path = hiveTable.getTableLocation();
return DeltaUtils.convertDeltaToSRTable(catalogName, dbName, tblName, path, configuration);
long createTime = table.getCreateTime();
return DeltaUtils.convertDeltaToSRTable(catalogName, dbName, tblName, path, configuration, createTime);
} catch (Exception e) {
LOG.warn(e.getMessage());
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DeltaUtils {
private static final Logger LOG = LogManager.getLogger(DeltaUtils.class);

public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path,
Configuration configuration) {
Configuration configuration, long createTime) {
DeltaLog deltaLog = DeltaLog.forTable(configuration, path);

if (!deltaLog.tableExists()) {
Expand Down Expand Up @@ -70,7 +70,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName
}

return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName,
fullSchema, metadata.getPartitionColumns(), deltaLog);
fullSchema, metadata.getPartitionColumns(), deltaLog, createTime);
}

public static RemoteFileInputFormat getRemoteFileFormat(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ElasticsearchConnector(ConnectorContext contex) {
public ConnectorMetadata getMetadata() {
if (metadata == null) {
try {
metadata = new ElasticsearchMetadata(esRestClient, esConfig.getProperties());
metadata = new ElasticsearchMetadata(esRestClient, esConfig.getProperties(), catalogName);
} catch (StarRocksConnectorException e) {
LOG.error("Failed to create elasticsearch metadata on [catalog : {}]", catalogName, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ public class ElasticsearchMetadata

private final EsRestClient esRestClient;
private final Map<String, String> properties;
private final String catalogName;
public static final String DEFAULT_DB = "default_db";
public static final long DEFAULT_DB_ID = 1L;


public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties) {
public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties, String catalogName) {
this.esRestClient = esRestClient;
this.properties = properties;
this.catalogName = catalogName;
}

@Override
Expand All @@ -67,17 +69,17 @@ public Table getTable(String dbName, String tblName) {
if (!DEFAULT_DB.equalsIgnoreCase(dbName)) {
return null;
}
return toEsTable(esRestClient, properties, tblName);
return toEsTable(esRestClient, properties, tblName, dbName, catalogName);
}

public static EsTable toEsTable(EsRestClient esRestClient,
Map<String, String> properties,
String tableName) {
String tableName, String dbName, String catalogName) {
try {
List<Column> columns = EsUtil.convertColumnSchema(esRestClient, tableName);
properties.put(EsTable.INDEX, tableName);
EsTable esTable = new EsTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(),
tableName, columns, properties, new SinglePartitionInfo());
catalogName, dbName, tableName, columns, properties, new SinglePartitionInfo());
esTable.setComment("created by external es catalog");
esTable.syncTableMetaData(esRestClient);
return esTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void computeDriverChecksum() {
public ConnectorMetadata getMetadata() {
if (metadata == null) {
try {
metadata = new JDBCMetadata(properties);
metadata = new JDBCMetadata(properties, catalogName);
} catch (StarRocksConnectorException e) {
LOG.error("Failed to create jdbc metadata on [catalog : {}]", catalogName, e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public class JDBCMetadata implements ConnectorMetadata {
private static Logger LOG = LogManager.getLogger(JDBCMetadata.class);

private Map<String, String> properties;
private String catalogName;
private JDBCSchemaResolver schemaResolver;

public JDBCMetadata(Map<String, String> properties) {
public JDBCMetadata(Map<String, String> properties, String catalogName) {
this.properties = properties;
this.catalogName = catalogName;
try {
Class.forName(properties.get(JDBCResource.DRIVER_CLASS));
} catch (ClassNotFoundException e) {
Expand Down Expand Up @@ -112,7 +114,7 @@ public Table getTable(String dbName, String tblName) {
return null;
}
return schemaResolver.getTable(ConnectorTableId.CONNECTOR_ID_GENERATOR.getNextId().asInt(),
tblName, fullSchema, dbName, properties);
tblName, fullSchema, dbName, catalogName, properties);
} catch (SQLException | DdlException e) {
LOG.warn(e.getMessage());
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public ResultSet getColumns(Connection connection, String dbName, String tblName
return connection.getMetaData().getColumns(dbName, null, tblName, "%");
}

public Table getTable(long id, String name, List<Column> schema, String dbName,
public Table getTable(long id, String name, List<Column> schema, String dbName, String catalogName,
Map<String, String> properties) throws DdlException {
return new JDBCTable(id, name, schema, dbName, properties);
return new JDBCTable(id, name, schema, dbName, catalogName, properties);
}

public List<Column> convertToSRTable(ResultSet columnSet) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public ResultSet getColumns(Connection connection, String dbName, String tblName
}

@Override
public Table getTable(long id, String name, List<Column> schema, String dbName,
public Table getTable(long id, String name, List<Column> schema, String dbName, String catalogName,
Map<String, String> properties) throws DdlException {
return new JDBCTable(id, dbName + "." + name, schema, "", properties);
return new JDBCTable(id, dbName + "." + name, schema, "", catalogName, properties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testGetTable(@Mocked EsRestClient client) {
}
};

ElasticsearchMetadata metadata = new ElasticsearchMetadata(client, new HashMap<>());
ElasticsearchMetadata metadata = new ElasticsearchMetadata(client, new HashMap<>(), "catalog");
ExceptionChecker.expectThrowsWithMsg(StarRocksConnectorException.class,
"Unknown index not_exist_index",
() -> metadata.getTable("default_db", "not_exist_index"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setUp() throws SQLException {
@Test
public void testListDatabaseNames() {
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
List<String> result = jdbcMetadata.listDbNames();
List<String> expectResult = Lists.newArrayList("test");
Assert.assertEquals(expectResult, result);
Expand All @@ -111,7 +111,7 @@ public void testListDatabaseNames() {
@Test
public void testGetDb() {
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
Database db = jdbcMetadata.getDb("test");
Assert.assertEquals("test", db.getOriginName());
} catch (Exception e) {
Expand All @@ -122,7 +122,7 @@ public void testGetDb() {
@Test
public void testListTableNames() {
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
List<String> result = jdbcMetadata.listTableNames("test");
List<String> expectResult = Lists.newArrayList("tbl1", "tbl2", "tbl3");
Assert.assertEquals(expectResult, result);
Expand All @@ -134,7 +134,7 @@ public void testListTableNames() {
@Test
public void testGetTable() {
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
Table table = jdbcMetadata.getTable("test", "tbl1");
Assert.assertTrue(table instanceof JDBCTable);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testListDatabaseNames() throws SQLException {
}
};
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
List<String> result = jdbcMetadata.listDbNames();
List<String> expectResult = Lists.newArrayList("postgres", "template1", "test");
Assert.assertEquals(expectResult, result);
Expand All @@ -109,7 +109,7 @@ public void testGetDb() throws SQLException {
}
};
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
Database db = jdbcMetadata.getDb("test");
Assert.assertEquals("test", db.getOriginName());
} catch (Exception e) {
Expand All @@ -136,7 +136,7 @@ public void testListTableNames() throws SQLException {
}
};
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
List<String> result = jdbcMetadata.listTableNames("test");
List<String> expectResult = Lists.newArrayList("tbl1", "tbl2", "tbl3");
Assert.assertEquals(expectResult, result);
Expand All @@ -163,7 +163,7 @@ public void testGetTable() throws SQLException {
}
};
try {
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties);
JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog");
Table table = jdbcMetadata.getTable("test", "tbl1");
Assert.assertTrue(table instanceof JDBCTable);
} catch (Exception e) {
Expand Down

0 comments on commit 976909e

Please sign in to comment.