From c896909af946111cf5a5d9d09fdd1c519183e1c0 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Mon, 4 May 2026 20:35:47 -0500 Subject: [PATCH] NIFI-15905 Switched to qualified Table Names in CaptureChangeMySQL --- .../mysql/processors/CaptureChangeMySQL.java | 20 +++++++++----- .../processors/CaptureChangeMySQLTest.java | 26 ++++++++++++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 75aa5acaa363..ac4dca7bd828 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -1222,14 +1222,14 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException { if (jdbcConnectionHolder != null) { try (Statement s = getJdbcConnection().createStatement()) { - s.execute("USE `" + key.getDatabaseName() + "`"); - ResultSet rs = s.executeQuery("SELECT * FROM `" + key.getTableName() + "` LIMIT 0"); - ResultSetMetaData rsmd = rs.getMetaData(); - int numCols = rsmd.getColumnCount(); - List columnDefinitions = new ArrayList<>(); - for (int i = 1; i <= numCols; i++) { + final String tableInfoQuery = getTableInfoQuery(s, key); + final ResultSet rs = s.executeQuery(tableInfoQuery); + final ResultSetMetaData rsmd = rs.getMetaData(); + final int columnCount = rsmd.getColumnCount(); + final List columnDefinitions = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { // Use the column label if it exists, otherwise use the column name. We're not doing aliasing here, but it's better practice. - String columnLabel = rsmd.getColumnLabel(i); + final String columnLabel = rsmd.getColumnLabel(i); columnDefinitions.add(new ColumnDefinition(rsmd.getColumnType(i), columnLabel != null ? columnLabel : rsmd.getColumnName(i))); } @@ -1240,6 +1240,12 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException { return tableInfo; } + protected String getTableInfoQuery(final Statement statement, final TableInfoCacheKey tableInfoCacheKey) throws SQLException { + final String databaseNameQuoted = statement.enquoteIdentifier(tableInfoCacheKey.getDatabaseName(), true); + final String tableNameQuoted = statement.enquoteIdentifier(tableInfoCacheKey.getTableName(), true); + return "SELECT * FROM %s.%s LIMIT 0".formatted(databaseNameQuoted, tableNameQuoted); + } + protected Connection getJdbcConnection() throws SQLException { return jdbcConnectionHolder.getConnection(); } diff --git a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.java b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.java index 08773db99e14..a245243a0c24 100644 --- a/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.java +++ b/nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.java @@ -62,6 +62,8 @@ import java.io.Serializable; import java.security.NoSuchAlgorithmException; import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -71,6 +73,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; import javax.net.ssl.SSLContext; @@ -80,7 +83,9 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** @@ -122,7 +127,7 @@ public class CaptureChangeMySQLTest { private static final String TEN = "10"; private static final ObjectMapper MAPPER = new ObjectMapper(); - private CaptureChangeMySQL processor; + private MockCaptureChangeMySQL processor; private TestRunner testRunner; private MockBinlogClient client; @@ -1295,6 +1300,22 @@ public void testNormalizeQuery() { assertEquals("alter table", processor.normalizeQuery(" /* This is a \n multiline comment test */ alter table")); } + @Test + public void testGetTableInfoQuery() throws SQLException { + final Statement statement = mock(Statement.class, CALLS_REAL_METHODS); + + final String prefix = UUID.randomUUID().toString(); + final long tableId = 0; + + final String databaseName = "NiFi 'Quoted' Repository"; + final String tableName = "FlowFile"; + + final TableInfoCacheKey cacheKey = new TableInfoCacheKey(prefix, databaseName, tableName, tableId); + final String tableInfoQuery = processor.getTableInfoQuery(statement, cacheKey); + + assertEquals("SELECT * FROM \"NiFi 'Quoted' Repository\".\"FlowFile\" LIMIT 0", tableInfoQuery); + } + @Test void testMigration() { final Map expectedRenamed = Map.ofEntries( @@ -1349,9 +1370,8 @@ protected BinaryLogClient createBinlogClient(String hostname, int port, String u @Override protected TableInfo loadTableInfo(TableInfoCacheKey key) { - TableInfo tableInfo = cache.computeIfAbsent(key, k -> new TableInfo(k.getDatabaseName(), k.getTableName(), k.getTableId(), + return cache.computeIfAbsent(key, k -> new TableInfo(k.getDatabaseName(), k.getTableName(), k.getTableId(), Collections.singletonList(new ColumnDefinition((byte) -4, "string1")))); - return tableInfo; } @Override