From c87ff2e3e204c99f27507f8ec297db233a874c6b Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 18 Apr 2017 13:31:27 +0900 Subject: [PATCH 1/2] NIFI-1202: Site-to-Site batch settings. - Added batchCount, batchSize, batchDuration to limit flow files to be included in a single Site-to-Site transaction. - Added batch throttling logic when StandardRemoteGroupPort transfers flow files to a remote input port using the batch limit configurations, so that users can limit batch not only for pulling data, but also pushing data. - Added destination list shuffle to provide better load distribution. Previously, the load distribution algorithm produced the same host consecutively. - Added new batch settings to FlowConfiguration.xsd. - Added new batch settings to Flow Fingerprint. - Added new batch settings to Audit. - Sort ports by name at 'Remote Process Group Ports' dialog. --- .../nifi/remote/client/PeerSelector.java | 6 + .../nifi/remote/client/TestPeerSelector.java | 35 ++++ .../api/dto/RemoteProcessGroupPortDTO.java | 46 ++++++ .../RemoteProcessGroupPortDescriptor.java | 15 ++ .../apache/nifi/remote/RemoteGroupPort.java | 12 ++ .../nifi/controller/FlowController.java | 3 + .../serialization/FlowFromDOMFactory.java | 3 + .../serialization/StandardFlowSerializer.java | 12 ++ .../nifi/fingerprint/FingerprintFactory.java | 2 +- .../remote/StandardRemoteProcessGroup.java | 19 +++ ...ndardRemoteProcessGroupPortDescriptor.java | 30 ++++ .../src/main/resources/FlowConfiguration.xsd | 3 + .../fingerprint/FingerprintFactoryTest.java | 41 +++++ .../nifi/remote/StandardRemoteGroupPort.java | 76 ++++++++- .../remote/TestStandardRemoteGroupPort.java | 156 +++++++++++++++++- .../nifi/audit/RemoteProcessGroupAuditor.java | 8 +- .../apache/nifi/web/api/dto/DtoFactory.java | 6 + .../impl/StandardRemoteProcessGroupDAO.java | 69 +++++--- .../audit/TestRemoteProcessGroupAuditor.java | 32 ++++ .../TestStandardRemoteProcessGroupDAO.java | 124 ++++++++++++++ .../canvas/remote-port-configuration.jsp | 34 ++++ .../remote-process-group-configuration.css | 17 +- .../canvas/nf-remote-process-group-ports.js | 102 +++++++++++- 23 files changed, 811 insertions(+), 40 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index 0ec8951a43f7..a7bd0945f02e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -41,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -202,6 +203,11 @@ List formulateDestinationList(final Set statuses, final } } + // Shuffle destinations to provide better distribution. + // Without this, same host will be used continuously, especially when remote peers have the same number of queued files. + // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations. + Collections.shuffle(destinations, new Random(0)); + final StringBuilder distributionDescription = new StringBuilder(); distributionDescription.append("New Weighted Distribution of Nodes:"); for (final Map.Entry entry : entryCountMap.entrySet()) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index c434c7b4cd96..6a69fee656fb 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -39,6 +39,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -59,6 +60,40 @@ private Map calculateAverageSelectedCount(Set colle })); } + @Test + public void testFormulateDestinationListForOutputEven() throws IOException { + final Set collection = new HashSet<>(); + collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true)); + + PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); + PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); + + final List destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE); + final Map selectedCounts = calculateAverageSelectedCount(collection, destinations); + + logger.info("selectedCounts={}", selectedCounts); + + int consecutiveSamePeerCount = 0; + PeerStatus previousPeer = null; + for (PeerStatus peer : destinations) { + if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) { + consecutiveSamePeerCount++; + // The same peer shouldn't be used consecutively (number of nodes - 1) times or more. + if (consecutiveSamePeerCount >= (collection.size() - 1)) { + fail("The same peer is returned consecutively too frequently."); + } + } else { + consecutiveSamePeerCount = 0; + } + previousPeer = peer; + } + + } + @Test public void testFormulateDestinationListForOutput() throws IOException { final Set collection = new HashSet<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java index e4a813132353..70bbf717150b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java @@ -35,6 +35,9 @@ public class RemoteProcessGroupPortDTO { private Boolean exists; private Boolean targetRunning; private Boolean connected; + private Integer batchCount; + private String batchSize; + private String batchDuration; /** * @return comments as configured in the target port @@ -176,6 +179,49 @@ public void setConnected(Boolean connected) { this.connected = connected; } + /** + * @return preferred number of flow files to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of bytes to include in a transaction." + ) + public Integer getBatchCount() { + return batchCount; + } + + public void setBatchCount(Integer batchCount) { + this.batchCount = batchCount; + } + + /** + * @return preferred number of bytes to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of bytes to include in a transaction." + ) + public String getBatchSize() { + return batchSize; + } + + public void setBatchSize(String batchSize) { + this.batchSize = batchSize; + } + + /** + * @return preferred amount of time that a transaction should span + */ + @ApiModelProperty( + value = "Preferred amount of time that a transaction should span." + ) + public String getBatchDuration() { + return batchDuration; + } + + public void setBatchDuration(String batchDuration) { + this.batchDuration = batchDuration; + } + + @Override public int hashCode() { return 923847 + String.valueOf(name).hashCode(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java index 4d7f774432e2..c330c1342d93 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -54,6 +54,21 @@ public interface RemoteProcessGroupPortDescriptor { */ Boolean getUseCompression(); + /** + * @return Preferred number of flow files to include in a transaction + */ + Integer getBatchCount(); + + /** + * @return Preferred number of bytes to include in a transaction + */ + String getBatchSize(); + + /** + * @return Preferred amount of for a transaction to span + */ + String getBatchDuration(); + /** * @return Whether or not the target port exists */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index f8f4b20922cf..07faf4203698 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -41,4 +41,16 @@ public RemoteGroupPort(String id, String name, ProcessGroup processGroup, Connec public abstract boolean getTargetExists(); public abstract boolean isTargetRunning(); + + public abstract Integer getBatchCount(); + + public abstract void setBatchCount(Integer batchCount); + + public abstract String getBatchSize(); + + public abstract void setBatchSize(String batchSize); + + public abstract String getBatchDuration(); + + public abstract void setBatchDuration(String batchDuration); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 151640e40f6e..763b620287e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2011,6 +2011,9 @@ private Set convertRemotePort(final Set 0) { + addTextElement(element, "batchCount", batchCount); + } + final String batchSize = port.getBatchSize(); + if (batchSize != null && batchSize.length() > 0) { + addTextElement(element, "batchSize", batchSize); + } + final String batchDuration = port.getBatchDuration(); + if (batchDuration != null && batchDuration.length() > 0) { + addTextElement(element, "batchDuration", batchDuration); + } parentElement.appendChild(element); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 1db80fc82f39..d9e048e71303 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -511,7 +511,7 @@ public int compare(final Element o1, final Element o2) { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) { + for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 7aee480da730..0cc84337247c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -45,6 +45,7 @@ import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -632,6 +633,15 @@ private void addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) { if (descriptor.getUseCompression() != null) { port.setUseCompression(descriptor.getUseCompression()); } + if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) { + port.setBatchCount(descriptor.getBatchCount()); + } + if (!StringUtils.isBlank(descriptor.getBatchSize())) { + port.setBatchSize(descriptor.getBatchSize()); + } + if (!StringUtils.isBlank(descriptor.getBatchDuration())) { + port.setBatchDuration(descriptor.getBatchDuration()); + } } finally { writeLock.unlock(); } @@ -697,6 +707,15 @@ private void addInputPort(final RemoteProcessGroupPortDescriptor descriptor) { if (descriptor.getUseCompression() != null) { port.setUseCompression(descriptor.getUseCompression()); } + if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) { + port.setBatchCount(descriptor.getBatchCount()); + } + if (!StringUtils.isBlank(descriptor.getBatchSize())) { + port.setBatchSize(descriptor.getBatchSize()); + } + if (!StringUtils.isBlank(descriptor.getBatchDuration())) { + port.setBatchDuration(descriptor.getBatchDuration()); + } inputPorts.put(descriptor.getId(), port); } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java index ed901863db0c..c3a8f5e19554 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java @@ -27,6 +27,9 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr private Integer concurrentlySchedulableTaskCount; private Boolean transmitting; private Boolean useCompression; + private Integer batchCount; + private String batchSize; + private String batchDuration; private Boolean exists; private Boolean targetRunning; private Boolean connected; @@ -94,6 +97,33 @@ public void setUseCompression(Boolean useCompression) { this.useCompression = useCompression; } + @Override + public Integer getBatchCount() { + return batchCount; + } + + public void setBatchCount(Integer batchCount) { + this.batchCount = batchCount; + } + + @Override + public String getBatchSize() { + return batchSize; + } + + public void setBatchSize(String batchSize) { + this.batchSize = batchSize; + } + + @Override + public String getBatchDuration() { + return batchDuration; + } + + public void setBatchDuration(String batchDuration) { + this.batchDuration = batchDuration; + } + @Override public Boolean getExists() { return exists; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 8cf2ad87a7af..30fff1ff41f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -281,6 +281,9 @@ + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java index 87a372d1d8ed..67f1ad4e855a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java @@ -26,14 +26,17 @@ import java.io.IOException; import java.lang.reflect.Method; +import java.util.Collections; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; @@ -273,4 +276,42 @@ public void testRemoteProcessGroupFingerprintWithProxy() throws Exception { assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement)); } + @Test + public void testRemotePortFingerprint() throws Exception { + + // Fill out every configuration. + final RemoteProcessGroup groupComponent = mock(RemoteProcessGroup.class); + when(groupComponent.getName()).thenReturn("name"); + when(groupComponent.getIdentifier()).thenReturn("id"); + when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); + when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi"); + when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW); + + final RemoteGroupPort portComponent = mock(RemoteGroupPort.class); + when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent)); + when(portComponent.getName()).thenReturn("portName"); + when(portComponent.getIdentifier()).thenReturn("portId"); + when(portComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); + when(portComponent.getComments()).thenReturn("portComment"); + when(portComponent.getScheduledState()).thenReturn(ScheduledState.RUNNING); + when(portComponent.getMaxConcurrentTasks()).thenReturn(3); + when(portComponent.isUseCompression()).thenReturn(true); + when(portComponent.getBatchCount()).thenReturn(1234); + when(portComponent.getBatchSize()).thenReturn("64KB"); + when(portComponent.getBatchDuration()).thenReturn("10sec"); + // Serializer doesn't serialize if a port doesn't have any connection. + when(portComponent.hasIncomingConnection()).thenReturn(true); + + // Assert fingerprints with expected one. + final String expected = "portId" + + "3" + + "true" + + "1234" + + "64KB" + + "10sec"; + + final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup"); + final Element componentElement = (Element) rootElement.getElementsByTagName("inputPort").item(0); + assertEquals(expected.toString(), fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement)); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 92931f28596d..b1288f3d362f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -46,11 +46,13 @@ import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -80,6 +82,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class); private final RemoteProcessGroup remoteGroup; private final AtomicBoolean useCompression = new AtomicBoolean(false); + private final AtomicReference batchCount = new AtomicReference<>(); + private final AtomicReference batchSize = new AtomicReference<>(); + private final AtomicReference batchDuration = new AtomicReference<>(); private final AtomicBoolean targetExists = new AtomicBoolean(true); private final AtomicBoolean targetRunning = new AtomicBoolean(true); private final SSLContext sslContext; @@ -157,7 +162,7 @@ public void onSchedulingStart() { final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS); - final SiteToSiteClient client = new SiteToSiteClient.Builder() + final SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris())) .portIdentifier(getIdentifier()) .sslContext(sslContext) @@ -168,9 +173,24 @@ public void onSchedulingStart() { .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .transportProtocol(remoteGroup.getTransportProtocol()) .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), remoteGroup.getProxyPassword())) - .localAddress(remoteGroup.getLocalAddress()) - .build(); - clientRef.set(client); + .localAddress(remoteGroup.getLocalAddress()); + + final Integer batchCount = getBatchCount(); + if (batchCount != null) { + clientBuilder.requestBatchCount(batchCount); + } + + final String batchSize = getBatchSize(); + if (batchSize != null && batchSize.length() > 0) { + clientBuilder.requestBatchSize(DataUnit.parseDataSize(batchSize.trim(), DataUnit.B).intValue()); + } + + final String batchDuration = getBatchDuration(); + if (batchDuration != null && batchDuration.length() > 0) { + clientBuilder.requestBatchDuration(FormatUtils.getTimeDuration(batchDuration.trim(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + + clientRef.set(clientBuilder.build()); } @Override @@ -278,6 +298,13 @@ private int transferFlowFiles(final Transaction transaction, final ProcessContex final StopWatch stopWatch = new StopWatch(true); long bytesSent = 0L; + final SiteToSiteClientConfig siteToSiteClientConfig = getSiteToSiteClient().getConfig(); + final long maxBatchBytes = siteToSiteClientConfig.getPreferredBatchSize(); + final int maxBatchCount = siteToSiteClientConfig.getPreferredBatchCount(); + final long preferredBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + final long maxBatchDuration = preferredBatchDuration > 0 ? preferredBatchDuration : BATCH_SEND_NANOS; + + final Set flowFilesSent = new HashSet<>(); boolean continueTransaction = true; while (continueTransaction) { @@ -304,10 +331,15 @@ public void process(final InputStream in) throws IOException { session.remove(flowFile); final long sendingNanos = System.nanoTime() - startSendingNanos; - if (sendingNanos < BATCH_SEND_NANOS) { - flowFile = session.get(); - } else { + + if (maxBatchCount > 0 && flowFilesSent.size() >= maxBatchCount) { + flowFile = null; + } else if (maxBatchBytes > 0 && bytesSent >= maxBatchBytes) { flowFile = null; + } else if (sendingNanos >= maxBatchDuration) { + flowFile = null; + } else { + flowFile = session.get(); } continueTransaction = (flowFile != null); @@ -477,6 +509,36 @@ public boolean isUseCompression() { return useCompression.get(); } + @Override + public Integer getBatchCount() { + return batchCount.get(); + } + + @Override + public void setBatchCount(Integer batchCount) { + this.batchCount.set(batchCount); + } + + @Override + public String getBatchSize() { + return batchSize.get(); + } + + @Override + public void setBatchSize(String batchSize) { + this.batchSize.set(batchSize); + } + + @Override + public String getBatchDuration() { + return batchDuration.get(); + } + + @Override + public void setBatchDuration(String batchDuration) { + this.batchDuration.set(batchDuration); + } + @Override public String toString() { return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 31cd1549c549..f677b88db1eb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -19,6 +19,7 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; @@ -28,6 +29,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.io.http.HttpCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession; @@ -43,17 +45,23 @@ import java.io.ByteArrayInputStream; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; import org.apache.nifi.util.NiFiProperties; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -70,7 +78,7 @@ public class TestStandardRemoteGroupPort { private Transaction transaction; private EventReporter eventReporter; private ProcessGroup processGroup; - public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; + private static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; private StandardRemoteGroupPort port; private SharedSessionState sessionState; private MockProcessSession processSession; @@ -84,17 +92,18 @@ public static void setup() throws Exception { private void setupMock(final SiteToSiteTransportProtocol protocol, final TransferDirection direction) throws Exception { - setupMock(protocol, direction, mock(Transaction.class)); + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder().buildConfig(); + setupMock(protocol, direction, siteToSiteClientConfig); } private void setupMock(final SiteToSiteTransportProtocol protocol, final TransferDirection direction, - final Transaction transaction) throws Exception { + final SiteToSiteClientConfig siteToSiteClientConfig) throws Exception { processGroup = null; remoteGroup = mock(RemoteProcessGroup.class); scheduler = null; siteToSiteClient = mock(SiteToSiteClient.class); - this.transaction = transaction; + this.transaction = mock(Transaction.class); eventReporter = mock(EventReporter.class); @@ -119,6 +128,7 @@ private void setupMock(final SiteToSiteTransportProtocol protocol, doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri(); doReturn(siteToSiteClient).when(port).getSiteToSiteClient(); doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction)); + doReturn(siteToSiteClientConfig).when(siteToSiteClient).getConfig(); doReturn(eventReporter).when(remoteGroup).getEventReporter(); } @@ -245,6 +255,144 @@ public void testSendHttp() throws Exception { assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails()); } + @Test + public void testSendBatchByCount() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchCount(2) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {0, 1}, t2 = {2, 3}, t3 = {4} + final int[] expectedNumberOfPackets = {2, 2, 1}; + testSendBatch(expectedNumberOfPackets); + + } + + @Test + public void testSendBatchBySize() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchSize(30) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {10, 11, 12}, t2 = {13, 14} + final int[] expectedNumberOfPackets = {3, 2}; + testSendBatch(expectedNumberOfPackets); + + } + + @Test + public void testSendBatchByDuration() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchDuration(1, TimeUnit.NANOSECONDS) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {1}, t2 = {2} .. and so on. + final int[] expectedNumberOfPackets = {1, 1, 1, 1, 1}; + testSendBatch(expectedNumberOfPackets); + + } + + /** + * Generate flow files to be sent, and execute port's onTrigger method. + * Finally, this method verifies whether packets are sent as expected. + * @param expectedNumberOfPackets Specify how many packets should be sent by each transaction. + * E.g. passing {2, 2, 1}, would generate 5 flow files in total. + * Based on the siteToSiteClientConfig batch parameters, + * it's expected to be sent via 3 transactions, + * transaction 0 will send flow file 0 and 1, + * transaction 1 will send flow file 2 and 3, + * and transaction 2 will send flow file 4. + * Each flow file has different content size generated automatically. + * The content size starts with 10, and increases as more flow files are generated. + * E.g. flow file 1 will have 10 bytes, flow file 2 has 11 bytes, f3 has 12 and so on. + * + */ + private void testSendBatch(final int[] expectedNumberOfPackets) throws Exception { + + setupMockProcessSession(); + + final String peerUrl = "http://node1.example.com:8080/nifi"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); + final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; + + doReturn(peer).when(transaction).getCommunicant(); + commsSession.setDataTransferUrl(flowFileEndpointUri); + + // Capture packets being sent to the remote peer + final AtomicInteger totalPacketsSent = new AtomicInteger(0); + final List> sentPackets = new ArrayList<>(expectedNumberOfPackets.length); + final List sentPacketsPerTransaction = new ArrayList<>(); + doAnswer(invocation -> { + sentPacketsPerTransaction.add((DataPacket)invocation.getArguments()[0]); + totalPacketsSent.incrementAndGet(); + return null; + }).when(transaction).send(any(DataPacket.class)); + doAnswer(invocation -> { + sentPackets.add(new ArrayList<>(sentPacketsPerTransaction)); + sentPacketsPerTransaction.clear(); + return null; + }).when(transaction).confirm(); + + + // Execute onTrigger while offering new flow files. + final List flowFiles = new ArrayList<>(); + for (int i = 0; i < expectedNumberOfPackets.length; i++) { + int numOfPackets = expectedNumberOfPackets[i]; + int startF = flowFiles.size(); + int endF = startF + numOfPackets; + IntStream.range(startF, endF).forEach(f -> { + final StringBuilder flowFileContents = new StringBuilder("0123456789"); + for (int c = 0; c < f; c++) { + flowFileContents.append(c); + } + final byte[] bytes = flowFileContents.toString().getBytes(); + final MockFlowFile flowFile = spy(processSession.createFlowFile(bytes)); + when(flowFile.getSize()).then(invocation -> { + Thread.sleep(1); // For testSendBatchByDuration + return bytes.length; + }); + sessionState.getFlowFileQueue().offer(flowFile); + flowFiles.add(flowFile); + }); + port.onTrigger(processContext, processSession); + } + + // Verify transactions, sent packets, and provenance events. + assertEquals(flowFiles.size(), totalPacketsSent.get()); + assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size()); + final List provenanceEvents = sessionState.getProvenanceEvents(); + assertEquals(flowFiles.size(), provenanceEvents.size()); + + int f = 0; + for (int i = 0; i < expectedNumberOfPackets.length; i++) { + final List dataPackets = sentPackets.get(i); + assertEquals(expectedNumberOfPackets[i], dataPackets.size()); + + for (int p = 0; p < dataPackets.size(); p++) { + final FlowFile flowFile = flowFiles.get(f); + + // Assert sent packet + final DataPacket dataPacket = dataPackets.get(p); + assertEquals(flowFile.getSize(), dataPacket.getSize()); + + // Assert provenance event + final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f); + assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); + assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); + + f++; + } + } + } + @Test public void testReceiveHttp() throws Exception { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java index 5a69cfe49e8d..a99aa1ed2b29 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java @@ -93,7 +93,13 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { .setConvertName(PORT_NAME_CONVERT), new ConfigurationRecorder("Compressed", dto -> dto.getUseCompression() != null, RemoteGroupPort::isUseCompression) - .setConvertName(PORT_NAME_CONVERT) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder("Batch Count", + dto -> dto.getBatchCount() != null, RemoteGroupPort::getBatchCount), + new ConfigurationRecorder("Batch Size", + dto -> dto.getBatchSize() != null, RemoteGroupPort::getBatchSize), + new ConfigurationRecorder("Batch Duration", + dto -> dto.getBatchDuration() != null, RemoteGroupPort::getBatchDuration) ); /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index aaa33d0f9f9e..fa1de06826e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1492,6 +1492,9 @@ public RemoteProcessGroupPortDTO createRemoteProcessGroupPortDto(final RemoteGro dto.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks()); dto.setUseCompression(port.isUseCompression()); dto.setExists(port.getTargetExists()); + dto.setBatchCount(port.getBatchCount()); + dto.setBatchSize(port.getBatchSize()); + dto.setBatchDuration(port.getBatchDuration()); // determine if this port is currently connected to another component locally if (ConnectableType.REMOTE_OUTPUT_PORT.equals(port.getConnectableType())) { @@ -2962,6 +2965,9 @@ public RemoteProcessGroupPortDTO copy(final RemoteProcessGroupPortDTO original) copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setUseCompression(original.getUseCompression()); copy.setExists(original.getExists()); + copy.setBatchCount(original.getBatchCount()); + copy.setBatchSize(original.getBatchSize()); + copy.setBatchDuration(original.getBatchDuration()); return copy; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index a93c41084fc5..e5f4900061f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -29,6 +29,7 @@ import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.FormatUtils; @@ -203,7 +204,11 @@ private void verifyUpdatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO re // verify update when appropriate - if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), remoteProcessGroupPortDto.getUseCompression())) { + if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), + remoteProcessGroupPortDto.getUseCompression(), + remoteProcessGroupPortDto.getBatchCount(), + remoteProcessGroupPortDto.getBatchSize(), + remoteProcessGroupPortDto.getBatchDuration())) { port.verifyCanUpdate(); } } @@ -219,6 +224,27 @@ private List validateProposedRemoteProcessGroupPortConfiguration(RemoteG validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName())); } + final Integer batchCount = remoteProcessGroupPortDTO.getBatchCount(); + if (isNotNull(batchCount) && batchCount < 0) { + validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName())); + } + + final String batchSize = remoteProcessGroupPortDTO.getBatchSize(); + if (isNotNull(batchSize) && batchSize.length() > 0 + && !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) { + validationErrors.add(String.format("Batch size for port '%s' must be of format " + + " where is a non-negative integer and is a supported Data" + + " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName())); + } + + final String batchDuration = remoteProcessGroupPortDTO.getBatchDuration(); + if (isNotNull(batchDuration) && batchDuration.length() > 0 + && !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) { + validationErrors.add(String.format("Batch duration for port '%s' must be of format " + + " where is a non-negative integer and TimeUnit is a supported Time Unit, such " + + "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName())); + } + return validationErrors; } @@ -284,22 +310,7 @@ public RemoteGroupPort updateRemoteProcessGroupInputPort(String remoteProcessGro verifyUpdatePort(port, remoteProcessGroupPortDto); // perform the update - if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) { - port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount()); - } - if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) { - port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); - } - - final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); - if (isNotNull(isTransmitting)) { - // start or stop as necessary - if (!port.isRunning() && isTransmitting) { - remoteProcessGroup.startTransmitting(port); - } else if (port.isRunning() && !isTransmitting) { - remoteProcessGroup.stopTransmitting(port); - } - } + updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); return port; } @@ -318,6 +329,19 @@ public RemoteGroupPort updateRemoteProcessGroupOutputPort(String remoteProcessGr verifyUpdatePort(port, remoteProcessGroupPortDto); // perform the update + updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + + return port; + } + + /** + * + * @param port Port instance to be updated. + * @param remoteProcessGroupPortDto DTO containing updated remote process group port settings. + * @param remoteProcessGroup If remoteProcessGroupPortDto has updated isTransmitting input, + * this method will start or stop the port in this remoteProcessGroup as necessary. + */ + private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) { if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) { port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount()); } @@ -325,6 +349,15 @@ public RemoteGroupPort updateRemoteProcessGroupOutputPort(String remoteProcessGr port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); } + final Integer batchCount = remoteProcessGroupPortDto.getBatchCount(); + final String batchSize = remoteProcessGroupPortDto.getBatchSize(); + final String batchDuration = remoteProcessGroupPortDto.getBatchDuration(); + if (isAnyNotNull(batchCount, batchSize, batchDuration)) { + port.setBatchCount(batchCount); + port.setBatchSize(batchSize); + port.setBatchDuration(batchDuration); + } + final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); if (isNotNull(isTransmitting)) { // start or stop as necessary @@ -334,8 +367,6 @@ public RemoteGroupPort updateRemoteProcessGroupOutputPort(String remoteProcessGr remoteProcessGroup.stopTransmitting(port); } } - - return port; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java index 725e4d4fc4b2..d0aeef612ba2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java @@ -40,6 +40,7 @@ import org.springframework.security.core.context.SecurityContextHolder; import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK; @@ -433,6 +434,9 @@ private Collection updateProcessGroupInputPortConfiguration(RemoteProces } when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount()); when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression()); + when(updatedRPGPort.getBatchCount()).thenReturn(inputRPGPortDTO.getBatchCount()); + when(updatedRPGPort.getBatchSize()).thenReturn(inputRPGPortDTO.getBatchSize()); + when(updatedRPGPort.getBatchDuration()).thenReturn(inputRPGPortDTO.getBatchDuration()); when(joinPoint.proceed()).thenReturn(updatedRPGPort); @@ -553,4 +557,32 @@ public void testConfigurePortCompression() throws Throwable { assertConfigureDetails(action.getActionDetails(), "input-port-1.Compressed", "false", "true"); } + + @Test + public void testConfigurePortBatchSettings() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + inputRPGPortDTO.setBatchCount(1234); + inputRPGPortDTO.setBatchSize("64KB"); + inputRPGPortDTO.setBatchDuration("10sec"); + + final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(3, actions.size()); + final Iterator iterator = actions.iterator(); + Action action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Batch Count", "0", "1234"); + + action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Batch Size", "", "64KB"); + + action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "Batch Duration", "", "10sec"); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java new file mode 100644 index 000000000000..60fb050bbbb9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.dao.impl; + +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.exception.ValidationException; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStandardRemoteProcessGroupDAO { + + private void validate(final StandardRemoteProcessGroupDAO dao, final RemoteProcessGroupPortDTO dto, final String ... errMessageKeywords) { + try { + dao.verifyUpdateInputPort(dto.getGroupId(), dto); + if (errMessageKeywords.length > 0) { + fail("Validation should fail with keywords: " + Arrays.asList(errMessageKeywords)); + } + } catch (ValidationException e) { + if (errMessageKeywords.length == 0) { + fail("Validation should pass, but failed with: " + e); + } + final List validationErrors = e.getValidationErrors(); + assertEquals("Validation should return one validationErrors", 1, validationErrors.size()); + final String validationError = validationErrors.get(0); + for (String errMessageKeyword : errMessageKeywords) { + assertTrue("validation error message should contain " + errMessageKeyword + ", but was: " + validationError, + validationError.contains(errMessageKeyword)); + } + } + } + + @Test + public void testVerifyUpdateInputPort() { + final StandardRemoteProcessGroupDAO dao = new StandardRemoteProcessGroupDAO(); + + final String remoteProcessGroupId = "remote-process-group-id"; + final String remoteProcessGroupInputPortId = "remote-process-group-input-port-id"; + + final FlowController flowController = mock(FlowController.class); + final ProcessGroup processGroup = mock(ProcessGroup.class); + final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class); + final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class); + + dao.setFlowController(flowController); + when(flowController.getGroup(any())).thenReturn(processGroup); + when(processGroup.findRemoteProcessGroup(eq(remoteProcessGroupId))).thenReturn(remoteProcessGroup); + when(remoteProcessGroup.getInputPort(remoteProcessGroupInputPortId)).thenReturn(remoteGroupPort); + when(remoteGroupPort.getName()).thenReturn("remote-group-port"); + + final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); + dto.setGroupId(remoteProcessGroupId); + dto.setId(remoteProcessGroupInputPortId); + + // Empty input values should pass validation. + dao.verifyUpdateInputPort(remoteProcessGroupId, dto); + + // Concurrent tasks + dto.setConcurrentlySchedulableTaskCount(0); + validate(dao, dto, "Concurrent tasks", "positive integer"); + + dto.setConcurrentlySchedulableTaskCount(2); + validate(dao, dto); + + // Batch count + dto.setBatchCount(-1); + validate(dao, dto, "Batch count", "positive integer"); + + dto.setBatchCount(0); + validate(dao, dto); + + dto.setBatchCount(1000); + validate(dao, dto); + + // Batch size + dto.setBatchSize("AB"); + validate(dao, dto, "Batch size", "Data Size"); + + dto.setBatchSize("10 days"); + validate(dao, dto, "Batch size", "Data Size"); + + dto.setBatchSize("300MB"); + validate(dao, dto); + + // Batch duration + dto.setBatchDuration("AB"); + validate(dao, dto, "Batch duration", "Time Unit"); + + dto.setBatchDuration("10 KB"); + validate(dao, dto, "Batch duration", "Time Unit"); + + dto.setBatchDuration("10 secs"); + validate(dao, dto); + + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp index 8f383692830f..a62700c84c73 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp @@ -37,6 +37,40 @@ Compressed +
+ +
+
+ Batch Settings: +
+
+
+ Count +
+
+
+ +
+
+
+
+ Size +
+
+
+ +
+
+
+
+ Duration +
+
+
+ +
+
+
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css index 58dcfcd005b6..4a3fad4f1906 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css @@ -125,6 +125,7 @@ div.remote-port-description { div.concurrent-task-container { float: left; + width: 200px; } img.concurrent-tasks-info { @@ -133,7 +134,11 @@ img.concurrent-tasks-info { } div.compression-container { - float: right; + float: left; +} + +div.batch-settings-container { + margin-top: 8px; } div.remote-port-transmission-container { @@ -165,7 +170,7 @@ div.disabled-transmission-switch { #remote-port-concurrent-tasks { font-size: 11px !important; float: left; - width: 75%; + width: 266px; } #remote-port-use-compression-container { @@ -179,4 +184,10 @@ div.disabled-transmission-switch { #remote-port-concurrent-task-header { margin-top: 5px; -} \ No newline at end of file +} + +div.batch-setting { + margin-right: 8px; + width: 30%; + float: left; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js index 519a00bbdde9..5350ff920dc0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js @@ -73,9 +73,19 @@ handler: { click: function () { var remotePortConcurrentTasks = $('#remote-port-concurrent-tasks').val(); + var remotePortBatchCount = $('#remote-port-batch-count').val(); + var portValidationErrors = new Array(); // ensure the property name and value is specified - if ($.isNumeric(remotePortConcurrentTasks)) { + if (!$.isNumeric(remotePortConcurrentTasks)) { + portValidationErrors.push("Concurrent tasks must be an integer value."); + } + + if (remotePortBatchCount && !$.isNumeric(remotePortBatchCount)) { + portValidationErrors.push("Batch Settings: count must be an integer value."); + } + + if (portValidationErrors.length == 0) { var remoteProcessGroupId = $('#remote-process-group-ports-id').text(); var remoteProcessGroupData = d3.select('#id-' + remoteProcessGroupId).datum(); var remotePortId = $('#remote-port-id').text(); @@ -87,7 +97,10 @@ id: remotePortId, groupId: remoteProcessGroupId, useCompression: $('#remote-port-use-compression').hasClass('checkbox-checked'), - concurrentlySchedulableTaskCount: remotePortConcurrentTasks + concurrentlySchedulableTaskCount: remotePortConcurrentTasks, + batchCount: remotePortBatchCount, + batchSize: $('#remote-port-batch-size').val(), + batchDuration: $('#remote-port-batch-duration').val() } }; @@ -121,6 +134,9 @@ // set the new values $('#' + remotePortId + '-concurrent-tasks').text(remotePort.concurrentlySchedulableTaskCount); $('#' + remotePortId + '-compression').text(compressionLabel); + $('#' + remotePortId + '-batch-count').text(typeof(remotePort.batchCount) === 'number' ? remotePort.batchCount : ''); + $('#' + remotePortId + '-batch-size').text(remotePort.batchSize); + $('#' + remotePortId + '-batch-duration').text(remotePort.batchDuration); }).fail(function (xhr, status, error) { if (xhr.status === 400) { var errors = xhr.responseText.split('\n'); @@ -146,7 +162,9 @@ } else { nfDialog.showOkDialog({ headerText: 'Remote Process Group Ports', - dialogContent: 'Concurrent tasks must be an integer value.' + dialogContent: portValidationErrors.reduce(function (prev, curr) { + return typeof(prev) === 'string' ? prev + ' ' + curr : curr; + }) }); // close the dialog @@ -175,6 +193,9 @@ $('#remote-port-name').text(''); $('#remote-port-concurrent-tasks').val(''); $('#remote-port-use-compression').removeClass('checkbox-checked checkbox-unchecked'); + $('#remote-port-batch-count').val(''); + $('#remote-port-batch-size').val(''); + $('#remote-port-batch-duration').val(''); } } }); @@ -287,9 +308,12 @@ var portName = $('#' + portId + '-name').text(); var portConcurrentTasks = $('#' + portId + '-concurrent-tasks').text(); var portCompression = $('#' + portId + '-compression').text() === 'Yes'; + var batchCount = $('#' + portId + '-batch-count').text(); + var batchSize = $('#' + portId + '-batch-size').text(); + var batchDuration = $('#' + portId + '-batch-duration').text(); // show the configuration dialog - configureRemotePort(port.id, portName, portConcurrentTasks, portCompression, portType); + configureRemotePort(port.id, portName, portConcurrentTasks, portCompression, batchCount, batchSize, batchDuration, portType); }).appendTo(portContainerEditContainer); // show/hide the edit button as appropriate @@ -472,6 +496,54 @@ '' + '').appendTo(compressionContainer); + // clear: Concurrent Tasks, Compressed + $('
').appendTo(portContainerDetailsContainer); + + // Batch related settings + var batchSettingsContainer = $('
') + .append($('
Batch Settings' + + '
')) + .appendTo(portContainerDetailsContainer); + + batchSettingsContainer.find('div.batch-settings-info').qtip($.extend({}, + nf.Common.config.tooltipConfig, + { + content: 'The preferred batch settings in a transaction for this port.' + + ' NiFi will transfer as many flow files as they are remaining in a queue,' + + ' until it reaches to one of these limits.' + + ' If none of them is specified, NiFi batches flow files up to ' + + (portType === 'input' ? '500ms for sending to an input port' : '5s for receiving from an output port') + + ' by default.' + })); + + var batchCount = $('
').append($('
').text(typeof(port.batchCount) === 'number' ? port.batchCount : '')); + var batchSize = $('
').append($('
').text(port.batchSize)); + var batchDuration = $('
').append($('
').text(port.batchDuration)); + + // add this ports batch count + $('
' + + '
' + + 'Count' + + '
' + + '
' + + '
').append(batchCount).appendTo(batchSettingsContainer); + + // add this ports batch size + $('
' + + '
' + + 'Size' + + '
' + + '
' + + '
').append(batchSize).appendTo(batchSettingsContainer); + + // add this ports batch duration + $('
' + + '
' + + 'Duration' + + '
' + + '
' + + '
').append(batchDuration).appendTo(batchSettingsContainer); + // clear $('
').appendTo(portContainer); @@ -489,9 +561,12 @@ * @argument {string} portName The port name * @argument {int} portConcurrentTasks The number of concurrent tasks for the port * @argument {boolean} portCompression The compression flag for the port + * @argument {int} batchCount The flow file count in a batch transaction + * @argument {string} batchSize The size of flow files in a batch transaction + * @argument {string} batchDuration The duration of a batch transaction * @argument {string} portType The type of port this is */ - var configureRemotePort = function (portId, portName, portConcurrentTasks, portCompression, portType) { + var configureRemotePort = function (portId, portName, portConcurrentTasks, portCompression, batchCount, batchSize, batchDuration, portType) { // set port identifiers $('#remote-port-id').text(portId); $('#remote-port-type').text(portType); @@ -503,6 +578,9 @@ } $('#remote-port-use-compression').addClass(checkState); $('#remote-port-concurrent-tasks').val(portConcurrentTasks); + $('#remote-port-batch-count').val(batchCount); + $('#remote-port-batch-size').val(batchSize); + $('#remote-port-batch-duration').val(batchDuration); // set the port name $('#remote-port-name').text(portName).ellipsis(); @@ -549,6 +627,12 @@ var connectedInputPorts = []; var disconnectedInputPorts = []; + var nameComparator = function (a, b) { + var nameA = a.name.toUpperCase(); + var nameB = b.name.toUpperCase(); + return nameA < nameB ? -1 : (nameA > nameB ? 1 : 0); + }; + // show connected ports first var inputPortContainer = $('#remote-process-group-input-ports-container'); $.each(remoteProcessGroupContents.inputPorts, function (_, inputPort) { @@ -559,6 +643,10 @@ } }); + // sort by port name within each port list + connectedInputPorts.sort(nameComparator); + disconnectedInputPorts.sort(nameComparator); + // add all connected input ports $.each(connectedInputPorts, function (_, inputPort) { createPortOption(inputPortContainer, inputPort, 'input'); @@ -586,6 +674,10 @@ } }); + // sort by port name within each port list + connectedOutputPorts.sort(nameComparator); + disconnectedOutputPorts.sort(nameComparator); + // add all connected output ports $.each(connectedOutputPorts, function (_, outputPort) { createPortOption(outputPortContainer, outputPort, 'output'); From 64f674ff7c07cb6fb677996a3c561f4cef5b7d10 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 21 Apr 2017 14:53:50 +0900 Subject: [PATCH 2/2] NIFI-1202: Site-to-Site batch settings. Incorporated review comments: - Show 'No value set' when a batch configuration is not set - Updated batch settings tooltip to clearly explain how it works the configuration works differently for input and output ports. - Updated DTO by separating batch settings to BatchSettingsDTO to indicate count, size and duration are a set of configurations. --- .../nifi/web/api/dto/BatchSettingsDTO.java | 76 +++++++++++++++++++ .../api/dto/RemoteProcessGroupPortDTO.java | 45 ++--------- .../nifi/controller/FlowController.java | 10 ++- .../nifi/audit/RemoteProcessGroupAuditor.java | 9 ++- .../apache/nifi/web/api/dto/DtoFactory.java | 19 +++-- .../impl/StandardRemoteProcessGroupDAO.java | 54 ++++++------- .../audit/TestRemoteProcessGroupAuditor.java | 25 +++--- .../TestStandardRemoteProcessGroupDAO.java | 21 ++--- .../canvas/nf-remote-process-group-ports.js | 65 +++++++++++----- 9 files changed, 211 insertions(+), 113 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java new file mode 100644 index 000000000000..e1d63f813d89 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +/** + * Details of batch settings of a remote process group port. + */ +@XmlType(name = "batchSettings") +public class BatchSettingsDTO { + + private Integer count; + private String size; + private String duration; + + /** + * @return preferred number of flow files to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of flow files to include in a transaction." + ) + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } + + /** + * @return preferred number of bytes to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of bytes to include in a transaction." + ) + public String getSize() { + return size; + } + + public void setSize(String size) { + this.size = size; + } + + /** + * @return preferred amount of time that a transaction should span + */ + @ApiModelProperty( + value = "Preferred amount of time that a transaction should span." + ) + public String getDuration() { + return duration; + } + + public void setDuration(String duration) { + this.duration = duration; + } + + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java index 70bbf717150b..2a34d9c7e907 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java @@ -35,9 +35,7 @@ public class RemoteProcessGroupPortDTO { private Boolean exists; private Boolean targetRunning; private Boolean connected; - private Integer batchCount; - private String batchSize; - private String batchDuration; + private BatchSettingsDTO batchSettings; /** * @return comments as configured in the target port @@ -180,48 +178,19 @@ public void setConnected(Boolean connected) { } /** - * @return preferred number of flow files to include in a transaction + * @return batch settings for data transmission */ @ApiModelProperty( - value = "Preferred number of bytes to include in a transaction." + value = "The batch settings for data transmission." ) - public Integer getBatchCount() { - return batchCount; + public BatchSettingsDTO getBatchSettings() { + return batchSettings; } - public void setBatchCount(Integer batchCount) { - this.batchCount = batchCount; + public void setBatchSettings(BatchSettingsDTO batchSettings) { + this.batchSettings = batchSettings; } - /** - * @return preferred number of bytes to include in a transaction - */ - @ApiModelProperty( - value = "Preferred number of bytes to include in a transaction." - ) - public String getBatchSize() { - return batchSize; - } - - public void setBatchSize(String batchSize) { - this.batchSize = batchSize; - } - - /** - * @return preferred amount of time that a transaction should span - */ - @ApiModelProperty( - value = "Preferred amount of time that a transaction should span." - ) - public String getBatchDuration() { - return batchDuration; - } - - public void setBatchDuration(String batchDuration) { - this.batchDuration = batchDuration; - } - - @Override public int hashCode() { return 923847 + String.valueOf(name).hashCode(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 763b620287e4..b628668d1a5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -195,6 +195,7 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -2011,9 +2012,12 @@ private Set convertRemotePort(final Set dto.getUseCompression() != null, RemoteGroupPort::isUseCompression) .setConvertName(PORT_NAME_CONVERT), new ConfigurationRecorder("Batch Count", - dto -> dto.getBatchCount() != null, RemoteGroupPort::getBatchCount), + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getCount() != null, RemoteGroupPort::getBatchCount) + .setConvertName(PORT_NAME_CONVERT), new ConfigurationRecorder("Batch Size", - dto -> dto.getBatchSize() != null, RemoteGroupPort::getBatchSize), + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getSize() != null, RemoteGroupPort::getBatchSize) + .setConvertName(PORT_NAME_CONVERT), new ConfigurationRecorder("Batch Duration", - dto -> dto.getBatchDuration() != null, RemoteGroupPort::getBatchDuration) + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getDuration() != null, RemoteGroupPort::getBatchDuration) + .setConvertName(PORT_NAME_CONVERT) ); /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index fa1de06826e6..51548118be46 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1492,9 +1492,12 @@ public RemoteProcessGroupPortDTO createRemoteProcessGroupPortDto(final RemoteGro dto.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks()); dto.setUseCompression(port.isUseCompression()); dto.setExists(port.getTargetExists()); - dto.setBatchCount(port.getBatchCount()); - dto.setBatchSize(port.getBatchSize()); - dto.setBatchDuration(port.getBatchDuration()); + + final BatchSettingsDTO batchDTO = new BatchSettingsDTO(); + batchDTO.setCount(port.getBatchCount()); + batchDTO.setSize(port.getBatchSize()); + batchDTO.setDuration(port.getBatchDuration()); + dto.setBatchSettings(batchDTO); // determine if this port is currently connected to another component locally if (ConnectableType.REMOTE_OUTPUT_PORT.equals(port.getConnectableType())) { @@ -2965,9 +2968,13 @@ public RemoteProcessGroupPortDTO copy(final RemoteProcessGroupPortDTO original) copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setUseCompression(original.getUseCompression()); copy.setExists(original.getExists()); - copy.setBatchCount(original.getBatchCount()); - copy.setBatchSize(original.getBatchSize()); - copy.setBatchDuration(original.getBatchDuration()); + final BatchSettingsDTO batchOrg = original.getBatchSettings(); + if (batchOrg != null) { + final BatchSettingsDTO batchCopy = new BatchSettingsDTO(); + batchCopy.setCount(batchOrg.getCount()); + batchCopy.setSize(batchOrg.getSize()); + batchCopy.setDuration(batchOrg.getDuration()); + } return copy; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index e5f4900061f4..039244227c95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -34,6 +34,7 @@ import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; @@ -206,9 +207,7 @@ private void verifyUpdatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO re // verify update when appropriate if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), remoteProcessGroupPortDto.getUseCompression(), - remoteProcessGroupPortDto.getBatchCount(), - remoteProcessGroupPortDto.getBatchSize(), - remoteProcessGroupPortDto.getBatchDuration())) { + remoteProcessGroupPortDto.getBatchSettings())) { port.verifyCanUpdate(); } } @@ -224,25 +223,28 @@ private List validateProposedRemoteProcessGroupPortConfiguration(RemoteG validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName())); } - final Integer batchCount = remoteProcessGroupPortDTO.getBatchCount(); - if (isNotNull(batchCount) && batchCount < 0) { - validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName())); - } + final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDTO.getBatchSettings(); + if (batchSettingsDTO != null) { + final Integer batchCount = batchSettingsDTO.getCount(); + if (isNotNull(batchCount) && batchCount < 0) { + validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName())); + } - final String batchSize = remoteProcessGroupPortDTO.getBatchSize(); - if (isNotNull(batchSize) && batchSize.length() > 0 - && !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) { - validationErrors.add(String.format("Batch size for port '%s' must be of format " + - " where is a non-negative integer and is a supported Data" - + " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName())); - } + final String batchSize = batchSettingsDTO.getSize(); + if (isNotNull(batchSize) && batchSize.length() > 0 + && !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) { + validationErrors.add(String.format("Batch size for port '%s' must be of format " + + " where is a non-negative integer and is a supported Data" + + " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName())); + } - final String batchDuration = remoteProcessGroupPortDTO.getBatchDuration(); - if (isNotNull(batchDuration) && batchDuration.length() > 0 - && !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) { - validationErrors.add(String.format("Batch duration for port '%s' must be of format " + - " where is a non-negative integer and TimeUnit is a supported Time Unit, such " - + "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName())); + final String batchDuration = batchSettingsDTO.getDuration(); + if (isNotNull(batchDuration) && batchDuration.length() > 0 + && !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) { + validationErrors.add(String.format("Batch duration for port '%s' must be of format " + + " where is a non-negative integer and TimeUnit is a supported Time Unit, such " + + "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName())); + } } return validationErrors; @@ -349,13 +351,11 @@ private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remotePr port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); } - final Integer batchCount = remoteProcessGroupPortDto.getBatchCount(); - final String batchSize = remoteProcessGroupPortDto.getBatchSize(); - final String batchDuration = remoteProcessGroupPortDto.getBatchDuration(); - if (isAnyNotNull(batchCount, batchSize, batchDuration)) { - port.setBatchCount(batchCount); - port.setBatchSize(batchSize); - port.setBatchDuration(batchDuration); + final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDto.getBatchSettings(); + if (isNotNull(batchSettingsDTO)) { + port.setBatchCount(batchSettingsDTO.getCount()); + port.setBatchSize(batchSettingsDTO.getSize()); + port.setBatchDuration(batchSettingsDTO.getDuration()); } final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java index d0aeef612ba2..ea7fa7d41eaa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java @@ -29,6 +29,7 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; @@ -434,9 +435,13 @@ private Collection updateProcessGroupInputPortConfiguration(RemoteProces } when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount()); when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression()); - when(updatedRPGPort.getBatchCount()).thenReturn(inputRPGPortDTO.getBatchCount()); - when(updatedRPGPort.getBatchSize()).thenReturn(inputRPGPortDTO.getBatchSize()); - when(updatedRPGPort.getBatchDuration()).thenReturn(inputRPGPortDTO.getBatchDuration()); + + final BatchSettingsDTO batchSettings = inputRPGPortDTO.getBatchSettings(); + if (batchSettings != null) { + when(updatedRPGPort.getBatchCount()).thenReturn(batchSettings.getCount()); + when(updatedRPGPort.getBatchSize()).thenReturn(batchSettings.getSize()); + when(updatedRPGPort.getBatchDuration()).thenReturn(batchSettings.getDuration()); + } when(joinPoint.proceed()).thenReturn(updatedRPGPort); @@ -565,9 +570,11 @@ public void testConfigurePortBatchSettings() throws Throwable { when(existingRPGPort.getName()).thenReturn("input-port-1"); final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); - inputRPGPortDTO.setBatchCount(1234); - inputRPGPortDTO.setBatchSize("64KB"); - inputRPGPortDTO.setBatchDuration("10sec"); + final BatchSettingsDTO batchSettingsDTO = new BatchSettingsDTO(); + batchSettingsDTO.setCount(1234); + batchSettingsDTO.setSize("64KB"); + batchSettingsDTO.setDuration("10sec"); + inputRPGPortDTO.setBatchSettings(batchSettingsDTO); final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); @@ -575,14 +582,14 @@ public void testConfigurePortBatchSettings() throws Throwable { final Iterator iterator = actions.iterator(); Action action = iterator.next(); assertEquals(Operation.Configure, action.getOperation()); - assertConfigureDetails(action.getActionDetails(), "Batch Count", "0", "1234"); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Count", "0", "1234"); action = iterator.next(); assertEquals(Operation.Configure, action.getOperation()); - assertConfigureDetails(action.getActionDetails(), "Batch Size", "", "64KB"); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Size", "", "64KB"); action = iterator.next(); assertEquals(Operation.Configure, action.getOperation()); - assertConfigureDetails(action.getActionDetails(), "Batch Duration", "", "10sec"); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Duration", "", "10sec"); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java index 60fb050bbbb9..f68a115d6e49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java @@ -21,6 +21,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.junit.Test; @@ -78,6 +79,8 @@ public void testVerifyUpdateInputPort() { final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); dto.setGroupId(remoteProcessGroupId); dto.setId(remoteProcessGroupInputPortId); + final BatchSettingsDTO batchSettings = new BatchSettingsDTO(); + dto.setBatchSettings(batchSettings); // Empty input values should pass validation. dao.verifyUpdateInputPort(remoteProcessGroupId, dto); @@ -90,33 +93,33 @@ public void testVerifyUpdateInputPort() { validate(dao, dto); // Batch count - dto.setBatchCount(-1); + batchSettings.setCount(-1); validate(dao, dto, "Batch count", "positive integer"); - dto.setBatchCount(0); + batchSettings.setCount(0); validate(dao, dto); - dto.setBatchCount(1000); + batchSettings.setCount(1000); validate(dao, dto); // Batch size - dto.setBatchSize("AB"); + batchSettings.setSize("AB"); validate(dao, dto, "Batch size", "Data Size"); - dto.setBatchSize("10 days"); + batchSettings.setSize("10 days"); validate(dao, dto, "Batch size", "Data Size"); - dto.setBatchSize("300MB"); + batchSettings.setSize("300MB"); validate(dao, dto); // Batch duration - dto.setBatchDuration("AB"); + batchSettings.setDuration("AB"); validate(dao, dto, "Batch duration", "Time Unit"); - dto.setBatchDuration("10 KB"); + batchSettings.setDuration("10 KB"); validate(dao, dto, "Batch duration", "Time Unit"); - dto.setBatchDuration("10 secs"); + batchSettings.setDuration("10 secs"); validate(dao, dto); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js index 5350ff920dc0..4232cabfdd34 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js @@ -98,9 +98,11 @@ groupId: remoteProcessGroupId, useCompression: $('#remote-port-use-compression').hasClass('checkbox-checked'), concurrentlySchedulableTaskCount: remotePortConcurrentTasks, - batchCount: remotePortBatchCount, - batchSize: $('#remote-port-batch-size').val(), - batchDuration: $('#remote-port-batch-duration').val() + batchSettings : { + count: remotePortBatchCount, + size: $('#remote-port-batch-size').val(), + duration: $('#remote-port-batch-duration').val() + } } }; @@ -134,9 +136,12 @@ // set the new values $('#' + remotePortId + '-concurrent-tasks').text(remotePort.concurrentlySchedulableTaskCount); $('#' + remotePortId + '-compression').text(compressionLabel); - $('#' + remotePortId + '-batch-count').text(typeof(remotePort.batchCount) === 'number' ? remotePort.batchCount : ''); - $('#' + remotePortId + '-batch-size').text(remotePort.batchSize); - $('#' + remotePortId + '-batch-duration').text(remotePort.batchDuration); + + var batchSettings = getBatchSettingsDisplayValues(remotePort); + $('#' + remotePortId + '-batch-count').text(batchSettings.count); + $('#' + remotePortId + '-batch-size').text(batchSettings.size); + $('#' + remotePortId + '-batch-duration').text(batchSettings.duration); + }).fail(function (xhr, status, error) { if (xhr.status === 400) { var errors = xhr.responseText.split('\n'); @@ -253,6 +258,26 @@ }); }; + /** + * Create and return an object contains count, size and duration values to display. + * If port does not have batch settings or batch setting value is not defined, 'No value set' is displayed. + */ + var getBatchSettingsDisplayValues = function (port) { + var values = {}; + var batchSettings = port.batchSettings; + if (batchSettings) { + values.count = typeof(batchSettings.count) === 'number' ? batchSettings.count : 'No value set'; + values.size = batchSettings.size ? batchSettings.size : 'No value set'; + values.duration = batchSettings.duration ? batchSettings.duration : 'No value set'; + } else { + // if it doesn't have batch settings, clear values + values.count = 'No value set'; + values.size = 'No value set'; + values.duration = 'No value set'; + } + return values; + } + /** * Creates the markup for configuration concurrent tasks for a port. * @@ -508,17 +533,21 @@ batchSettingsContainer.find('div.batch-settings-info').qtip($.extend({}, nf.Common.config.tooltipConfig, { - content: 'The preferred batch settings in a transaction for this port.' + - ' NiFi will transfer as many flow files as they are remaining in a queue,' + - ' until it reaches to one of these limits.' + - ' If none of them is specified, NiFi batches flow files up to ' + - (portType === 'input' ? '500ms for sending to an input port' : '5s for receiving from an output port') + - ' by default.' + content: (portType === 'input' + ? 'The batch settings to control how this NiFi sends data to the remote input port in a transaction.' + + ' This NiFi will transfer as much flow files as they are queued in incoming relationships,' + + ' until any of these limits is met.' + + ' If none of these setting is specified, this NiFi uses 500 milliseconds batch duration by default.' + : 'The batch settings to tell the remote NiFi how this NiFi prefers to receive data from the remote output port in a transaction.' + + ' The remote NiFi will use these settings as a hint to control batch data transferring.' + + ' However, actual behavior depends on the version of remote NiFi instance.' + + ' Recent version of NiFi uses 5 seconds for batch duration if none of these setting is specified.') })); - var batchCount = $('
').append($('
').text(typeof(port.batchCount) === 'number' ? port.batchCount : '')); - var batchSize = $('
').append($('
').text(port.batchSize)); - var batchDuration = $('
').append($('
').text(port.batchDuration)); + var batchSettings = getBatchSettingsDisplayValues(port); + var batchCount = $('
').append($('
').text(batchSettings.count)); + var batchSize = $('
').append($('
').text(batchSettings.size)); + var batchDuration = $('
').append($('
').text(batchSettings.duration)); // add this ports batch count $('
' + @@ -578,9 +607,9 @@ } $('#remote-port-use-compression').addClass(checkState); $('#remote-port-concurrent-tasks').val(portConcurrentTasks); - $('#remote-port-batch-count').val(batchCount); - $('#remote-port-batch-size').val(batchSize); - $('#remote-port-batch-duration').val(batchDuration); + $('#remote-port-batch-count').val(batchCount === 'No value set' ? null : batchCount); + $('#remote-port-batch-size').val(batchSize === 'No value set' ? null : batchSize); + $('#remote-port-batch-duration').val(batchDuration === 'No value set' ? null : batchDuration); // set the port name $('#remote-port-name').text(portName).ellipsis();