From ccaa12b4b6e4d648c63a2e1b122a3e37b8f2b505 Mon Sep 17 00:00:00 2001 From: Randy Gelhausen Date: Wed, 7 Dec 2016 02:18:09 -0500 Subject: [PATCH] NIFI-2585: Add attributes to track where a flow file came from when receiving over site-to-site --- .../remote/protocol/AbstractFlowFileServerProtocol.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index 2ba87a279ed2..e149481dbec7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -506,6 +506,12 @@ protected int commitReceiveTransaction(Peer peer, FlowFileTransaction transactio throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); } + // For routing purposes, downstream consumers often need to reference Flowfile's originating system + for (FlowFile flowFile : transaction.getFlowFilesSent()){ + flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost()); + flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort()); + } + // Commit the session so that we have persisted the data session.commit();