Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -202,6 +203,11 @@ List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final
}
}

// Shuffle destinations to provide better distribution.
// Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
// Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
Collections.shuffle(destinations, new Random(0));

final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New Weighted Distribution of Nodes:");
for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand All @@ -59,6 +60,40 @@ private Map<String, Integer> calculateAverageSelectedCount(Set<PeerStatus> colle
}));
}

@Test
public void testFormulateDestinationListForOutputEven() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true));

PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);

final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);

logger.info("selectedCounts={}", selectedCounts);

int consecutiveSamePeerCount = 0;
PeerStatus previousPeer = null;
for (PeerStatus peer : destinations) {
if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) {
consecutiveSamePeerCount++;
// The same peer shouldn't be used consecutively (number of nodes - 1) times or more.
if (consecutiveSamePeerCount >= (collection.size() - 1)) {
fail("The same peer is returned consecutively too frequently.");
}
} else {
consecutiveSamePeerCount = 0;
}
previousPeer = peer;
}

}

@Test
public void testFormulateDestinationListForOutput() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;

import com.wordnik.swagger.annotations.ApiModelProperty;

import javax.xml.bind.annotation.XmlType;

/**
* Details of batch settings of a remote process group port.
*/
@XmlType(name = "batchSettings")
public class BatchSettingsDTO {

private Integer count;
private String size;
private String duration;

/**
* @return preferred number of flow files to include in a transaction
*/
@ApiModelProperty(
value = "Preferred number of flow files to include in a transaction."
)
public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}

/**
* @return preferred number of bytes to include in a transaction
*/
@ApiModelProperty(
value = "Preferred number of bytes to include in a transaction."
)
public String getSize() {
return size;
}

public void setSize(String size) {
this.size = size;
}

/**
* @return preferred amount of time that a transaction should span
*/
@ApiModelProperty(
value = "Preferred amount of time that a transaction should span."
)
public String getDuration() {
return duration;
}

public void setDuration(String duration) {
this.duration = duration;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class RemoteProcessGroupPortDTO {
private Boolean exists;
private Boolean targetRunning;
private Boolean connected;
private BatchSettingsDTO batchSettings;

/**
* @return comments as configured in the target port
Expand Down Expand Up @@ -176,6 +177,20 @@ public void setConnected(Boolean connected) {
this.connected = connected;
}

/**
* @return batch settings for data transmission
*/
@ApiModelProperty(
value = "The batch settings for data transmission."
)
public BatchSettingsDTO getBatchSettings() {
return batchSettings;
}

public void setBatchSettings(BatchSettingsDTO batchSettings) {
this.batchSettings = batchSettings;
}

@Override
public int hashCode() {
return 923847 + String.valueOf(name).hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ public interface RemoteProcessGroupPortDescriptor {
*/
Boolean getUseCompression();

/**
* @return Preferred number of flow files to include in a transaction
*/
Integer getBatchCount();

/**
* @return Preferred number of bytes to include in a transaction
*/
String getBatchSize();

/**
* @return Preferred amount of for a transaction to span
*/
String getBatchDuration();

/**
* @return Whether or not the target port exists
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,16 @@ public RemoteGroupPort(String id, String name, ProcessGroup processGroup, Connec
public abstract boolean getTargetExists();

public abstract boolean isTargetRunning();

public abstract Integer getBatchCount();

public abstract void setBatchCount(Integer batchCount);

public abstract String getBatchSize();

public abstract void setBatchSize(String batchSize);

public abstract String getBatchDuration();

public abstract void setBatchDuration(String batchDuration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
Expand Down Expand Up @@ -2011,6 +2012,12 @@ private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<Remote
descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
descriptor.setTransmitting(port.isTransmitting());
descriptor.setUseCompression(port.getUseCompression());
final BatchSettingsDTO batchSettings = port.getBatchSettings();
if (batchSettings != null) {
descriptor.setBatchCount(batchSettings.getCount());
descriptor.setBatchSize(batchSettings.getSize());
descriptor.setBatchDuration(batchSettings.getDuration());
}
remotePorts.add(descriptor);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ public static RemoteProcessGroupPortDescriptor getRemoteProcessGroupPort(final E
descriptor.setComments(getString(element, "comments"));
descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
descriptor.setUseCompression(getBoolean(element, "useCompression"));
descriptor.setBatchCount(getOptionalInt(element, "batchCount"));
descriptor.setBatchSize(getString(element, "batchSize"));
descriptor.setBatchDuration(getString(element, "batchDuration"));
descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState")));

return descriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,18 @@ private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPo
addTextElement(element, "scheduledState", port.getScheduledState().name());
addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
final Integer batchCount = port.getBatchCount();
if (batchCount != null && batchCount > 0) {
addTextElement(element, "batchCount", batchCount);
}
final String batchSize = port.getBatchSize();
if (batchSize != null && batchSize.length() > 0) {
addTextElement(element, "batchSize", batchSize);
}
final String batchDuration = port.getBatchDuration();
if (batchDuration != null && batchDuration.length() > 0) {
addTextElement(element, "batchDuration", batchDuration);
}

parentElement.appendChild(element);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public int compare(final Element o1, final Element o2) {
}

private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) {
for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) {
for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
Expand Down Expand Up @@ -632,6 +633,15 @@ private void addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) {
if (descriptor.getUseCompression() != null) {
port.setUseCompression(descriptor.getUseCompression());
}
if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) {
port.setBatchCount(descriptor.getBatchCount());
}
if (!StringUtils.isBlank(descriptor.getBatchSize())) {
port.setBatchSize(descriptor.getBatchSize());
}
if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration());
}
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -697,6 +707,15 @@ private void addInputPort(final RemoteProcessGroupPortDescriptor descriptor) {
if (descriptor.getUseCompression() != null) {
port.setUseCompression(descriptor.getUseCompression());
}
if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) {
port.setBatchCount(descriptor.getBatchCount());
}
if (!StringUtils.isBlank(descriptor.getBatchSize())) {
port.setBatchSize(descriptor.getBatchSize());
}
if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration());
}

inputPorts.put(descriptor.getId(), port);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr
private Integer concurrentlySchedulableTaskCount;
private Boolean transmitting;
private Boolean useCompression;
private Integer batchCount;
private String batchSize;
private String batchDuration;
private Boolean exists;
private Boolean targetRunning;
private Boolean connected;
Expand Down Expand Up @@ -94,6 +97,33 @@ public void setUseCompression(Boolean useCompression) {
this.useCompression = useCompression;
}

@Override
public Integer getBatchCount() {
return batchCount;
}

public void setBatchCount(Integer batchCount) {
this.batchCount = batchCount;
}

@Override
public String getBatchSize() {
return batchSize;
}

public void setBatchSize(String batchSize) {
this.batchSize = batchSize;
}

@Override
public String getBatchDuration() {
return batchDuration;
}

public void setBatchDuration(String batchDuration) {
this.batchDuration = batchDuration;
}

@Override
public Boolean getExists() {
return exists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@
<xs:sequence>
<xs:element name="maxConcurrentTasks" type="xs:positiveInteger"></xs:element>
<xs:element name="useCompression" type="xs:boolean"></xs:element>
<xs:element name="batchCount" type="xs:positiveInteger" minOccurs="0" maxOccurs="1" />
<xs:element name="batchSize" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="batchDuration" type="xs:string" minOccurs="0" maxOccurs="1" />
</xs:sequence>
</xs:extension>
</xs:complexContent>
Expand Down
Loading