Skip to content

Commit

Permalink
NIFI-11279: Allow event stream processing to continue in CaptureChang…
Browse files Browse the repository at this point in the history
…eMySQL after sync issue
  • Loading branch information
mattyb149 committed Mar 14, 2023
1 parent 1c883ad commit 836dd49
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 42 deletions.
Expand Up @@ -739,6 +739,11 @@ public synchronized void onTrigger(ProcessContext context, ProcessSessionFactory
setup(context);
}

// If no client could be created, try again
if (binlogClient == null) {
return;
}

// If the client has been disconnected, try to reconnect
if (!binlogClient.isConnected()) {
Exception e = lifecycleListener.getException();
Expand All @@ -764,7 +769,8 @@ public synchronized void onTrigger(ProcessContext context, ProcessSessionFactory

try {
outputEvents(currentSession, log);
} catch (IOException ioe) {
} catch (Exception eventException) {
getLogger().error("Exception during event processing at file={} pos={}", currentBinlogFile, currentBinlogPosition, eventException);
try {
// Perform some processor-level "rollback", then rollback the session
currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile;
Expand All @@ -773,13 +779,14 @@ public synchronized void onTrigger(ProcessContext context, ProcessSessionFactory
currentGtidSet = xactGtidSet;
inTransaction = false;
stop();
queue.clear();
currentSession.rollback();
} catch (Exception e) {
// Not much we can recover from here
log.warn("Error occurred during rollback", e);
log.error("Error stopping CDC client", e);
} finally {
queue.clear();
currentSession.rollback();
}
throw new ProcessException(ioe);
context.yield();
}
}

Expand Down Expand Up @@ -936,7 +943,7 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getPosition();
}
log.debug("Got message event type: {} ", header.getEventType().toString());
log.debug("Message event, type={} pos={} file={}", eventType, currentBinlogPosition, currentBinlogFile);
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be changed by subsequent events
Expand Down Expand Up @@ -988,7 +995,8 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad happened, alert the user
if (inTransaction) {
throw new IOException("BEGIN event received while already processing a transaction. This could indicate that your binlog position is invalid.");
getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
+ "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
}
// Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
xactBinlogFile = currentBinlogFile;
Expand All @@ -1010,8 +1018,9 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce
updateState(session);
} else if ("COMMIT".equals(sql)) {
if (!inTransaction) {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state "
+ "or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit) {
Expand Down Expand Up @@ -1093,8 +1102,9 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce

case XID:
if (!inTransaction) {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
getLogger().debug("COMMIT (XID) event received at pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.",
currentBinlogPosition, currentBinlogFile);
}
if (includeBeginCommit) {
if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
Expand All @@ -1113,7 +1123,6 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
currentSession.commitAsync();
}
}
// update inTransaction value and save next position
Expand Down
Expand Up @@ -60,7 +60,6 @@ import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

import javax.net.ssl.SSLContext
import java.sql.Connection
Expand All @@ -74,7 +73,6 @@ import java.util.regex.Pattern
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertTrue
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.mockito.ArgumentMatchers.anyString
import static org.mockito.Mockito.doReturn
import static org.mockito.Mockito.mock
Expand Down Expand Up @@ -364,7 +362,8 @@ class CaptureChangeMySQLTest {
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
assertThrows(AssertionError.class, { testRunner.run(1, true, false) } as Executable)
// This should not throw an exception, rather warn that a COMMIT event was sent out-of-sync
testRunner.run(1, true, false)
}

@Test
Expand Down Expand Up @@ -634,7 +633,8 @@ class CaptureChangeMySQLTest {
{} as EventData
))

assertThrows(AssertionError.class, { testRunner.run(1, true, false) } as Executable)
// Should not throw an exception
testRunner.run(1, true, false)
}

@Test
Expand Down Expand Up @@ -1445,10 +1445,10 @@ class CaptureChangeMySQLTest {
header2.setTimestamp(new Date().getTime())
EventData eventData = new EventData() {
};
client.sendEvent(new Event(header2, eventData));
client.sendEvent(new Event(header2, eventData))

// when we ge a xid event without having got a 'begin' event ,throw an exception
assertThrows(AssertionError.class, () -> testRunner.run(1, false, false))
// when we ge a xid event without having got a 'begin' event , don't throw an exception, just warn the user
testRunner.run(1, false, false)
}

@Test
Expand Down Expand Up @@ -1499,7 +1499,6 @@ class CaptureChangeMySQLTest {

}


static DistributedMapCacheClientImpl createCacheClient() throws InitializationException {

final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl()
Expand Down
Expand Up @@ -687,37 +687,35 @@ protected void commit(final Checkpoint checkpoint, final boolean asynchronous) {
// Update local state
final StateManager stateManager = context.getStateManager();
if (checkpoint.localState != null) {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
final Optional<String> stateVersion = stateMap.getStateVersion();
if (!stateVersion.equals(checkpoint.localState.getStateVersion())) {
LOG.debug("Updating State Manager's Local State");

try {
try {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
final Optional<String> stateVersion = stateMap.getStateVersion();
if (!stateVersion.equals(checkpoint.localState.getStateVersion())) {
LOG.debug("Updating State Manager's Local State");
stateManager.setState(checkpoint.localState.toMap(), Scope.LOCAL);
} catch (final Exception e) {
LOG.warn("Failed to update Local State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
} else {
LOG.debug("Will not update State Manager's Local State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateVersion, checkpoint.localState.getStateVersion());
}
} else {
LOG.debug("Will not update State Manager's Local State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateVersion, checkpoint.localState.getStateVersion());
} catch (final Exception e) {
LOG.warn("Failed to update Local State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
}

// Update cluster state
if (checkpoint.clusterState != null) {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Optional<String> stateVersion = stateMap.getStateVersion();
if (!stateVersion.equals(checkpoint.clusterState.getStateVersion())) {
LOG.debug("Updating State Manager's Cluster State");

try {
try {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Optional<String> stateVersion = stateMap.getStateVersion();
if (!stateVersion.equals(checkpoint.clusterState.getStateVersion())) {
LOG.debug("Updating State Manager's Cluster State");
stateManager.setState(checkpoint.clusterState.toMap(), Scope.CLUSTER);
} catch (final Exception e) {
LOG.warn("Failed to update Cluster State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
} else {
LOG.debug("Will not update State Manager's Cluster State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateVersion, checkpoint.clusterState.getStateVersion());
}
} else {
LOG.debug("Will not update State Manager's Cluster State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateVersion, checkpoint.clusterState.getStateVersion());
} catch (final Exception e) {
LOG.warn("Failed to update Cluster State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
}

Expand Down

0 comments on commit 836dd49

Please sign in to comment.