Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1222,14 +1222,14 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException {
if (jdbcConnectionHolder != null) {

try (Statement s = getJdbcConnection().createStatement()) {
s.execute("USE `" + key.getDatabaseName() + "`");
ResultSet rs = s.executeQuery("SELECT * FROM `" + key.getTableName() + "` LIMIT 0");
ResultSetMetaData rsmd = rs.getMetaData();
int numCols = rsmd.getColumnCount();
List<ColumnDefinition> columnDefinitions = new ArrayList<>();
for (int i = 1; i <= numCols; i++) {
final String tableInfoQuery = getTableInfoQuery(s, key);
final ResultSet rs = s.executeQuery(tableInfoQuery);
final ResultSetMetaData rsmd = rs.getMetaData();
final int columnCount = rsmd.getColumnCount();
final List<ColumnDefinition> columnDefinitions = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
// Use the column label if it exists, otherwise use the column name. We're not doing aliasing here, but it's better practice.
String columnLabel = rsmd.getColumnLabel(i);
final String columnLabel = rsmd.getColumnLabel(i);
columnDefinitions.add(new ColumnDefinition(rsmd.getColumnType(i), columnLabel != null ? columnLabel : rsmd.getColumnName(i)));
}

Expand All @@ -1240,6 +1240,12 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException {
return tableInfo;
}

protected String getTableInfoQuery(final Statement statement, final TableInfoCacheKey tableInfoCacheKey) throws SQLException {
final String databaseNameQuoted = statement.enquoteIdentifier(tableInfoCacheKey.getDatabaseName(), true);
final String tableNameQuoted = statement.enquoteIdentifier(tableInfoCacheKey.getTableName(), true);
return "SELECT * FROM %s.%s LIMIT 0".formatted(databaseNameQuoted, tableNameQuoted);
}

protected Connection getJdbcConnection() throws SQLException {
return jdbcConnectionHolder.getConnection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.io.Serializable;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
Expand All @@ -71,6 +73,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import javax.net.ssl.SSLContext;
Expand All @@ -80,7 +83,9 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -122,7 +127,7 @@ public class CaptureChangeMySQLTest {
private static final String TEN = "10";
private static final ObjectMapper MAPPER = new ObjectMapper();

private CaptureChangeMySQL processor;
private MockCaptureChangeMySQL processor;
private TestRunner testRunner;
private MockBinlogClient client;

Expand Down Expand Up @@ -1295,6 +1300,22 @@ public void testNormalizeQuery() {
assertEquals("alter table", processor.normalizeQuery(" /* This is a \n multiline comment test */ alter table"));
}

@Test
public void testGetTableInfoQuery() throws SQLException {
final Statement statement = mock(Statement.class, CALLS_REAL_METHODS);

final String prefix = UUID.randomUUID().toString();
final long tableId = 0;

final String databaseName = "NiFi 'Quoted' Repository";
final String tableName = "FlowFile";

final TableInfoCacheKey cacheKey = new TableInfoCacheKey(prefix, databaseName, tableName, tableId);
final String tableInfoQuery = processor.getTableInfoQuery(statement, cacheKey);

assertEquals("SELECT * FROM \"NiFi 'Quoted' Repository\".\"FlowFile\" LIMIT 0", tableInfoQuery);
}

@Test
void testMigration() {
final Map<String, String> expectedRenamed = Map.ofEntries(
Expand Down Expand Up @@ -1349,9 +1370,8 @@ protected BinaryLogClient createBinlogClient(String hostname, int port, String u

@Override
protected TableInfo loadTableInfo(TableInfoCacheKey key) {
TableInfo tableInfo = cache.computeIfAbsent(key, k -> new TableInfo(k.getDatabaseName(), k.getTableName(), k.getTableId(),
return cache.computeIfAbsent(key, k -> new TableInfo(k.getDatabaseName(), k.getTableName(), k.getTableId(),
Collections.singletonList(new ColumnDefinition((byte) -4, "string1"))));
return tableInfo;
}

@Override
Expand Down
Loading