Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,17 @@
*/
package org.apache.nifi.remote;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;

import javax.net.ssl.SSLContext;
import javax.ws.rs.core.MediaType;

import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.entity.ControllerEntity;
import org.apache.nifi.web.util.WebUtils;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.nifi.web.util.WebUtils;

public class RemoteNiFiUtils {
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.MediaType;
import java.net.URI;

public static final String CONTROLLER_URI_PATH = "/controller";
public class RemoteNiFiUtils {

private static final int CONNECT_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
Expand All @@ -50,61 +37,6 @@ public RemoteNiFiUtils(final SSLContext sslContext) {
this.client = getClient(sslContext);
}

/**
* Gets the content at the specified URI.
*
* @param uri uri to retrieve
* @param timeoutMillis time to wait in millis
* @return response
* @throws ClientHandlerException ex
* @throws UniformInterfaceException ex
*/
public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
return get(uri, timeoutMillis, null);
}

/**
* Gets the content at the specified URI using the given query parameters.
*
* @param uri to retrieve
* @param timeoutMillis wait period in millis
* @param queryParams query parameters
* @return response
* @throws ClientHandlerException ex
* @throws UniformInterfaceException ex
*/
public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
// perform the request
WebResource webResource = client.resource(uri);
if (queryParams != null) {
for (final Map.Entry<String, String> queryEntry : queryParams.entrySet()) {
webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
}
}

webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);

return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
}

/**
* Performs a HEAD request to the specified URI.
*
* @param uri to retrieve
* @param timeoutMillis wait time in millis
* @return response
* @throws ClientHandlerException ex
* @throws UniformInterfaceException ex
*/
public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
// perform the request
WebResource webResource = client.resource(uri);
webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
return webResource.head();
}

private Client getClient(final SSLContext sslContext) {
final Client client;
if (sslContext == null) {
Expand All @@ -120,69 +52,7 @@ private Client getClient(final SSLContext sslContext) {
}

/**
* Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance is not configured to use Site-to-Site transfers.
*
* @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
* @param timeoutMillis wait time in millis
* @return port number
* @throws IOException ex
*/
public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getRemoteListeningPort(uriObject, timeoutMillis);
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}

public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getRemoteRootGroupId(uriObject, timeoutMillis);
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}

public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getController(uriObject, timeoutMillis).getInstanceId();
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}

/**
* Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance is not configured to use Site-to-Site transfers.
*
* @param uri the full URI to fetch, including the path.
* @return port
* @throws IOException ex
*/
private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
}

private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
return getController(uri, timeoutMillis).getId();
}

public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
final ClientResponse response = get(uri, timeoutMillis);

if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
final ControllerEntity entity = response.getEntity(ControllerEntity.class);
return entity.getController();
} else {
final String responseMessage = response.getEntity(String.class);
throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
}
}

/**
* Issues a registration request on behalf of the current user.
* Issues a registration request for this NiFi instance for the instance at the baseApiUri.
*
* @param baseApiUri uri to register with
* @return response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.nifi.remote;

import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import org.apache.nifi.authorization.Resource;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -1119,6 +1121,10 @@ public Boolean getSecureFlag() {
}
}

private boolean isWebApiSecure() {
return targetUri.toString().toLowerCase().startsWith("https");
}

@Override
public boolean isSiteToSiteEnabled() {
readLock.lock();
Expand Down Expand Up @@ -1166,31 +1172,30 @@ public void run() {
} catch (SiteToSiteRestApiClient.HttpGetFailedException e) {

if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) {
// TODO: implement registration request
/*
try {
// attempt to issue a registration request in case the target instance is a 0.x
final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) {
logger.info("{} Issued a Request to communicate with remote instance", this);
} else {
logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()});
}
} catch (final Exception e) {
logger.error("{} Failed to request account due to {}", this, e.toString());
} catch (final Exception re) {
logger.error("{} Failed to request account due to {}", this, re.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
logger.error("", re);
}
}
*/
authorizationIssue = e.getDescription();

authorizationIssue = e.getDescription();
} else if (e.getResponseCode() == FORBIDDEN_STATUS_CODE) {
authorizationIssue = e.getDescription();
} else {
final String message = e.getDescription();
logger.warn("{} When communicating with remote instance, got unexpected result. {}",
new Object[]{this, e.getMessage()});
new Object[]{this, message});
authorizationIssue = "Unable to determine Site-to-Site availability.";
}
}
Expand Down
Loading