From 85eb45c8b4756e11db0756256810812842617ef3 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 24 Jul 2017 11:14:03 -0400 Subject: [PATCH] NIFI-4217: Retain original DDL in CaptureChangeMySQL --- .../apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java | 2 +- .../nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 037882d90502..f7f075e9a517 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -813,7 +813,7 @@ public void outputEvents(ProcessSession session, StateManager stateManager, Comp if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { // If we don't have table information, we can still use the database name TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null); - DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); + DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql); currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS)); } // Remove all the keys from the cache that this processor added diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 3eb1f176ea86..f876b6ae93b3 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -446,6 +446,10 @@ class CaptureChangeMySQLTest { assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY)) assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L) assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type')) + // Check that DDL didn't change + if (e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY) == "32") { + assertEquals('ALTER TABLE myTable add column col1 int', new JsonSlurper().parse(testRunner.getContentAsByteArray(e)).query?.toString()) + } } assertEquals(13, resultFiles.size()) assertEquals(13, testRunner.provenanceEvents.size())