Skip to content

Commit b0a701c

Browse files
authored
[Improve][Catalog] refactor catalog (#4540)
1 parent 93471c9 commit b0a701c

File tree

11 files changed

+365
-605
lines changed

11 files changed

+365
-605
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java

Lines changed: 226 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,20 @@
2121
import org.apache.seatunnel.api.table.catalog.Catalog;
2222
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
24+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
2425
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
26+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2527
import org.apache.seatunnel.api.table.catalog.TablePath;
28+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2629
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
2730
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
2831
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
2932
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
3033
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
34+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3135
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
36+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
37+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
3238

3339
import org.apache.commons.collections4.CollectionUtils;
3440
import org.apache.commons.lang3.StringUtils;
@@ -40,29 +46,35 @@
4046
import java.sql.Connection;
4147
import java.sql.DatabaseMetaData;
4248
import java.sql.DriverManager;
49+
import java.sql.PreparedStatement;
4350
import java.sql.ResultSet;
51+
import java.sql.ResultSetMetaData;
4452
import java.sql.SQLException;
4553
import java.util.ArrayList;
54+
import java.util.Collections;
4655
import java.util.Comparator;
4756
import java.util.HashMap;
57+
import java.util.HashSet;
4858
import java.util.List;
4959
import java.util.Map;
5060
import java.util.Optional;
61+
import java.util.Set;
5162
import java.util.stream.Collectors;
5263

5364
import static com.google.common.base.Preconditions.checkArgument;
5465
import static com.google.common.base.Preconditions.checkNotNull;
5566

5667
public abstract class AbstractJdbcCatalog implements Catalog {
5768
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
58-
5969
protected final String catalogName;
6070
protected final String defaultDatabase;
6171
protected final String username;
6272
protected final String pwd;
6373
protected final String baseUrl;
6474
protected final String suffix;
6575
protected final String defaultUrl;
76+
protected final JdbcDialect jdbcDialect;
77+
protected static final Set<String> SYS_DATABASES = new HashSet<>();
6678

6779
public AbstractJdbcCatalog(
6880
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
@@ -80,6 +92,7 @@ public AbstractJdbcCatalog(
8092
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
8193
this.defaultUrl = urlInfo.getOrigin();
8294
this.suffix = urlInfo.getSuffix();
95+
this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl);
8396
}
8497

8598
@Override
@@ -107,6 +120,7 @@ public String getBaseUrl() {
107120
public void open() throws CatalogException {
108121
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
109122
// test connection, fail early if we cannot connect to database
123+
conn.getCatalog();
110124
} catch (SQLException e) {
111125
throw new CatalogException(
112126
String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
@@ -120,6 +134,56 @@ public void close() throws CatalogException {
120134
LOG.info("Catalog {} closing", catalogName);
121135
}
122136

137+
@Override
138+
public List<String> listDatabases() throws CatalogException {
139+
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
140+
141+
PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases());
142+
143+
List<String> databases = new ArrayList<>();
144+
ResultSet rs = ps.executeQuery();
145+
146+
while (rs.next()) {
147+
String databaseName = rs.getString(1);
148+
if (!getSysDatabases().contains(databaseName)) {
149+
databases.add(rs.getString(1));
150+
}
151+
}
152+
153+
return databases;
154+
} catch (Exception e) {
155+
throw new CatalogException(
156+
String.format("Failed listing database in catalog %s", this.catalogName), e);
157+
}
158+
}
159+
160+
@Override
161+
public List<String> listTables(String databaseName)
162+
throws CatalogException, DatabaseNotExistException {
163+
if (!databaseExists(databaseName)) {
164+
throw new DatabaseNotExistException(this.catalogName, databaseName);
165+
}
166+
167+
String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix);
168+
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
169+
PreparedStatement ps =
170+
conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) {
171+
172+
ResultSet rs = ps.executeQuery();
173+
174+
List<String> tables = new ArrayList<>();
175+
176+
while (rs.next()) {
177+
tables.add(jdbcDialect.getTableName(rs));
178+
}
179+
180+
return tables;
181+
} catch (Exception e) {
182+
throw new CatalogException(
183+
String.format("Failed listing database in catalog %s", catalogName), e);
184+
}
185+
}
186+
123187
protected Optional<PrimaryKey> getPrimaryKey(
124188
DatabaseMetaData metaData, String database, String table) throws SQLException {
125189
return getPrimaryKey(metaData, database, table, table);
@@ -226,7 +290,8 @@ public boolean databaseExists(String databaseName) throws CatalogException {
226290
public boolean tableExists(TablePath tablePath) throws CatalogException {
227291
try {
228292
return databaseExists(tablePath.getDatabaseName())
229-
&& listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
293+
&& listTables(tablePath.getDatabaseName())
294+
.contains(jdbcDialect.getTableName(tablePath));
230295
} catch (DatabaseNotExistException e) {
231296
return false;
232297
}
@@ -245,8 +310,86 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
245310
}
246311
}
247312

248-
protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table)
249-
throws CatalogException;
313+
public CatalogTable getTable(TablePath tablePath)
314+
throws CatalogException, TableNotExistException {
315+
if (!tableExists(tablePath)) {
316+
throw new TableNotExistException(catalogName, tablePath);
317+
}
318+
319+
String dbUrl =
320+
jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
321+
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
322+
DatabaseMetaData metaData = conn.getMetaData();
323+
Optional<PrimaryKey> primaryKey =
324+
getPrimaryKey(
325+
metaData,
326+
tablePath.getDatabaseName(),
327+
tablePath.getSchemaName(),
328+
tablePath.getTableName());
329+
List<ConstraintKey> constraintKeys =
330+
getConstraintKeys(
331+
metaData,
332+
tablePath.getDatabaseName(),
333+
tablePath.getSchemaName(),
334+
tablePath.getTableName());
335+
336+
try (PreparedStatement ps =
337+
conn.prepareStatement(
338+
String.format(
339+
"SELECT * FROM %s WHERE 1 = 0;",
340+
tablePath.getFullNameWithQuoted("\"")))) {
341+
ResultSetMetaData tableMetaData = ps.getMetaData();
342+
TableSchema.Builder builder = TableSchema.builder();
343+
// add column
344+
for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
345+
String columnName = tableMetaData.getColumnName(i);
346+
SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
347+
int columnDisplaySize = tableMetaData.getColumnDisplaySize(i);
348+
String comment = tableMetaData.getColumnLabel(i);
349+
boolean isNullable =
350+
tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable;
351+
Object defaultValue =
352+
getColumnDefaultValue(
353+
metaData,
354+
tablePath.getDatabaseName(),
355+
tablePath.getSchemaName(),
356+
tablePath.getTableName(),
357+
columnName)
358+
.orElse(null);
359+
360+
PhysicalColumn physicalColumn =
361+
PhysicalColumn.of(
362+
columnName,
363+
type,
364+
columnDisplaySize,
365+
isNullable,
366+
defaultValue,
367+
comment);
368+
builder.column(physicalColumn);
369+
}
370+
// add primary key
371+
primaryKey.ifPresent(builder::primaryKey);
372+
// add constraint key
373+
constraintKeys.forEach(builder::constraintKey);
374+
TableIdentifier tableIdentifier =
375+
TableIdentifier.of(
376+
catalogName,
377+
tablePath.getDatabaseName(),
378+
tablePath.getSchemaName(),
379+
tablePath.getTableName());
380+
return CatalogTable.of(
381+
tableIdentifier,
382+
builder.build(),
383+
buildConnectorOptions(tablePath),
384+
Collections.emptyList(),
385+
"");
386+
}
387+
388+
} catch (Exception e) {
389+
throw new CatalogException(
390+
String.format("Failed getting table %s", tablePath.getFullName()), e);
391+
}
392+
}
250393

251394
@Override
252395
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@@ -257,7 +400,20 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
257400
}
258401
}
259402

260-
protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;
403+
protected boolean dropTableInternal(TablePath tablePath) throws CatalogException {
404+
String dbUrl =
405+
jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
406+
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
407+
PreparedStatement ps =
408+
conn.prepareStatement(
409+
jdbcDialect.getDropTableSql(tablePath.getFullName()))) {
410+
// Will there exist concurrent drop for one table?
411+
return ps.execute();
412+
} catch (SQLException e) {
413+
throw new CatalogException(
414+
String.format("Failed dropping table %s", tablePath.getFullName()), e);
415+
}
416+
}
261417

262418
@Override
263419
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@@ -273,8 +429,6 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
273429
}
274430
}
275431

276-
protected abstract boolean createDatabaseInternal(String databaseName);
277-
278432
@Override
279433
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
280434
throws DatabaseNotExistException, CatalogException {
@@ -286,5 +440,69 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
286440
}
287441
}
288442

289-
protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException;
443+
protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex)
444+
throws SQLException {
445+
return null;
446+
}
447+
448+
protected Set<String> getSysDatabases() {
449+
return SYS_DATABASES;
450+
}
451+
452+
protected Map<String, String> buildConnectorOptions(TablePath tablePath) {
453+
Map<String, String> options = new HashMap<>(8);
454+
options.put("connector", "jdbc");
455+
options.put(
456+
"url",
457+
jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix));
458+
options.put("table-name", tablePath.getFullName());
459+
options.put("username", username);
460+
options.put("password", pwd);
461+
return options;
462+
}
463+
464+
protected boolean createDatabaseInternal(String databaseName) {
465+
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
466+
PreparedStatement ps =
467+
conn.prepareStatement(
468+
String.format(jdbcDialect.createDatabaseSql(databaseName)))) {
469+
return ps.execute();
470+
} catch (Exception e) {
471+
throw new CatalogException(
472+
String.format(
473+
"Failed creating database %s in catalog %s",
474+
databaseName, this.catalogName),
475+
e);
476+
}
477+
}
478+
479+
protected boolean dropDatabaseInternal(String databaseName) throws CatalogException {
480+
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
481+
PreparedStatement ps =
482+
conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) {
483+
return ps.execute();
484+
} catch (Exception e) {
485+
throw new CatalogException(
486+
String.format(
487+
"Failed dropping database %s in catalog %s",
488+
databaseName, this.catalogName),
489+
e);
490+
}
491+
}
492+
493+
// todo: If the origin source is mysql, we can directly use create table like to create the
494+
// target table?
495+
protected boolean createTableInternal(TablePath tablePath, CatalogTable table)
496+
throws CatalogException {
497+
String dbUrl =
498+
jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix);
499+
String createTableSql = jdbcDialect.createTableSql(tablePath, table);
500+
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd);
501+
PreparedStatement ps = conn.prepareStatement(createTableSql)) {
502+
return ps.execute();
503+
} catch (Exception e) {
504+
throw new CatalogException(
505+
String.format("Failed creating table %s", tablePath.getFullName()), e);
506+
}
507+
}
290508
}

0 commit comments

Comments
 (0)