Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ public class PeerStatus {

private final PeerDescription description;
private final int numFlowFiles;
private final boolean queryForPeers;

public PeerStatus(final PeerDescription description, final int numFlowFiles) {
public PeerStatus(final PeerDescription description, final int numFlowFiles, final boolean queryForPeers) {
this.description = description;
this.numFlowFiles = numFlowFiles;
this.queryForPeers = queryForPeers;
}

public PeerDescription getPeerDescription() {
Expand All @@ -34,6 +36,13 @@ public int getFlowFileCount() {
return numFlowFiles;
}

/**
* @return <code>true</code> if this node can be queried for its peers, <code>false</code> otherwise.
*/
public boolean isQueryForPeers() {
return queryForPeers;
}

@Override
public String toString() {
return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void persistPeerStatuses(final Set<PeerStatus> statuses) {

for (final PeerStatus status : statuses) {
final PeerDescription description = status.getPeerDescription();
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
out.write(line.getBytes(StandardCharsets.UTF_8));
}

Expand All @@ -120,15 +120,17 @@ private static Set<PeerStatus> recoverPersistedPeerStatuses(final File file) thr
String line;
while ((line = reader.readLine()) != null) {
final String[] splits = line.split(Pattern.quote(":"));
if (splits.length != 3) {
if (splits.length != 3 && splits.length != 4) {
continue;
}

final String hostname = splits[0];
final int port = Integer.parseInt(splits[1]);
final boolean secure = Boolean.parseBoolean(splits[2]);

statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);

statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
}
}

Expand Down Expand Up @@ -172,7 +174,7 @@ List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final
final int index = n % destinations.size();
PeerStatus status = destinations.get(index);
if (status == null) {
status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount());
status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
destinations.set(index, status);
break;
} else {
Expand Down Expand Up @@ -306,7 +308,7 @@ private Set<PeerStatus> getPeerStatuses() {
if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
for (final PeerStatus status : cache.getStatuses()) {
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
equalizedSet.add(equalizedStatus);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,45 +93,44 @@ public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final URI clusterUrl;
try {
clusterUrl = new URI(config.getUrl());
} catch (URISyntaxException e) {
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e);
}

try (
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())
) {
String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) {
final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());

int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
apiClient.setConnectTimeoutMillis(timeoutMillis);
apiClient.setReadTimeoutMillis(timeoutMillis);
Collection<PeerDTO> peers = apiClient.getPeers();

final Collection<PeerDTO> peers = apiClient.getPeers();
if(peers == null || peers.size() == 0){
throw new IOException("Couldn't get any peer to communicate with. " + clusterApiUrl + " returned zero peers.");
}

return peers.stream()
.map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount()))
// Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP
// was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed.
return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true))
.collect(Collectors.toSet());
}
}

@Override
public Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {

int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);

PeerStatus peerStatus;
while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) {
logger.debug("peerStatus={}", peerStatus);

CommunicationsSession commSession = new HttpCommunicationsSession();
String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
final CommunicationsSession commSession = new HttpCommunicationsSession();
final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
commSession.setUri(nodeApiUrl);
String clusterUrl = config.getUrl();
Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
final String clusterUrl = config.getUrl();
final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);

int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
String portId = config.getPortIdentifier();
if (StringUtils.isEmpty(portId)) {
portId = siteInfoProvider.getPortIdentifier(config.getPortName(), direction);
Expand All @@ -141,7 +140,7 @@ public Transaction createTransaction(TransferDirection direction) throws Handsha
}
}

SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());

apiClient.setBaseUrl(peer.getUrl());
apiClient.setConnectTimeoutMillis(timeoutMillis);
Expand All @@ -157,7 +156,8 @@ public Transaction createTransaction(TransferDirection direction) throws Handsha
try {
transactionUrl = apiClient.initiateTransaction(direction, portId);
commSession.setUserDn(apiClient.getTrustedPeerDn());
} catch (Exception e) {
} catch (final Exception e) {
apiClient.close();
logger.debug("Penalizing a peer due to {}", e.getMessage());
peerSelector.penalize(peer, penaltyMillis);

Expand All @@ -170,8 +170,8 @@ public Transaction createTransaction(TransferDirection direction) throws Handsha
}

// We found a valid peer to communicate with.
Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
config.isUseCompression(), portId, penaltyMillis, config.getEventReporter());
transaction.initialize(apiClient, transactionUrl);

Expand All @@ -183,7 +183,7 @@ public Transaction createTransaction(TransferDirection direction) throws Handsha

}

private String resolveNodeApiUrl(PeerDescription description) {
private String resolveNodeApiUrl(final PeerDescription description) {
return (description.isSecure() ? "https" : "http") + "://" + description.getHostname() + ":" + description.getPort() + "/nifi-api";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,9 @@
*/
package org.apache.nifi.remote.client.socket;

import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;

import javax.net.ssl.SSLContext;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
Expand All @@ -63,9 +41,33 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
import javax.net.ssl.SSLContext;

import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EndpointConnectionPool implements PeerStatusProvider {

Expand All @@ -84,6 +86,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {

private volatile int commsTimeout;
private volatile boolean shutdown = false;
private volatile Set<PeerStatus> lastFetchedQueryablePeers;

private final SiteInfoProvider siteInfoProvider;
private final PeerSelector peerSelector;
Expand Down Expand Up @@ -145,8 +148,7 @@ public EndpointConnection getEndpointConnection(final TransferDirection directio
return getEndpointConnection(direction, null);
}

public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config)
throws IOException {
public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException {
//
// Attempt to get a connection state that already exists for this URL.
//
Expand Down Expand Up @@ -358,15 +360,13 @@ private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
}
}

public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final String hostname = clusterUrl.getHost();
final Integer port = siteInfoProvider.getSiteToSitePort();
if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
}
private Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
final String hostname = peerDescription.getHostname();
final int port = peerDescription.getPort();

final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);

final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
final SocketClientProtocol clientProtocol = new SocketClientProtocol();
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
Expand Down Expand Up @@ -414,6 +414,50 @@ public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
return peerStatuses;
}

@Override
public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();

// Look at all of the peers that we fetched last time.
final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
if (lastFetched != null && !lastFetched.isEmpty()) {
lastFetched.stream().map(peer -> peer.getPeerDescription())
.forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
}

// Always add the configured node info to the list of peers to communicate with
final String hostname = clusterUrl.getHost();
final Integer port = siteInfoProvider.getSiteToSitePort();
if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
}

final boolean secure = siteInfoProvider.isSecure();
peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure));

Exception lastFailure = null;
for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
try {
final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peerDescription);
lastFetchedQueryablePeers = statuses.stream()
.filter(p -> p.isQueryForPeers())
.collect(Collectors.toSet());

return statuses;
} catch (final Exception e) {
logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), peerDescription.getPort());
lastFailure = e;
}
}

final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
if (lastFailure != null) {
ioe.addSuppressed(lastFailure);
}

throw ioe;
}

private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
final PeerDescription description = peerStatus.getPeerDescription();
return establishSiteToSiteConnection(description.getHostname(), description.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
private Transaction.TransactionState status = Transaction.TransactionState.TRANSACTION_STARTED;
private ResponseCode responseCode;

public HttpServerCommunicationsSession(InputStream inputStream, OutputStream outputStream, String transactionId){
public HttpServerCommunicationsSession(final InputStream inputStream, final OutputStream outputStream, final String transactionId, final String userDn) {
super();
input.setInputStream(inputStream);
output.setOutputStream(outputStream);
this.transactionId = transactionId;
setUserDn(userDn);
}

// This status is only needed by HttpFlowFileServerProtocol, HttpClientTransaction has its own status.
Expand All @@ -46,7 +47,7 @@ public Transaction.TransactionState getStatus() {
return status;
}

public void setStatus(Transaction.TransactionState status) {
public void setStatus(final Transaction.TransactionState status) {
this.status = status;
}

Expand All @@ -58,11 +59,11 @@ public ResponseCode getResponseCode() {
return responseCode;
}

public void setResponseCode(ResponseCode responseCode) {
public void setResponseCode(final ResponseCode responseCode) {
this.responseCode = responseCode;
}

public void putHandshakeParam(HandshakeProperty key, String value) {
public void putHandshakeParam(final HandshakeProperty key, final String value) {
handshakeParams.put(key.name(), value);
}

Expand Down
Loading