Skip to content

Commit

Permalink
Templatize RestTemplate usage for Job API forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
tgianos committed Sep 6, 2017
1 parent 4f641ad commit f22b616
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
* @since 3.0.0
*/
@Slf4j
public final class ControllerUtils {
final class ControllerUtils {

/**
* Constructor.
*/
protected ControllerUtils() {
private ControllerUtils() {
}

/**
Expand All @@ -46,7 +46,7 @@ protected ControllerUtils() {
* @param request The http servlet request.
* @return The remaining path
*/
public static String getRemainingPath(final HttpServletRequest request) {
static String getRemainingPath(final HttpServletRequest request) {
String path = (String) request.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE);
if (path != null) {
final String bestMatchPattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public class JobRestController {
private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
private static final String FORWARDED_FOR_HEADER = "X-Forwarded-For";
private static final String NAME_HEADER_COOKIE = "cookie";
private static final String JOB_API_TEMPLATE = "/api/v3/jobs/{id}";
private static final String EMPTY_STRING = "";
private static final String COMMA = ",";

private final JobCoordinatorService jobCoordinatorService;
private final JobSearchService jobSearchService;
Expand Down Expand Up @@ -208,13 +211,9 @@ public JobRestController(
@PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(HttpStatus.ACCEPTED)
public ResponseEntity<Void> submitJob(
@Valid
@RequestBody
final JobRequest jobRequest,
@RequestHeader(value = FORWARDED_FOR_HEADER, required = false)
final String clientHost,
@RequestHeader(value = HttpHeaders.USER_AGENT, required = false)
final String userAgent,
@Valid @RequestBody final JobRequest jobRequest,
@RequestHeader(value = FORWARDED_FOR_HEADER, required = false) final String clientHost,
@RequestHeader(value = HttpHeaders.USER_AGENT, required = false) final String userAgent,
final HttpServletRequest httpServletRequest
) throws GenieException {
log.info("[submitJob] Called json method type to submit job: {}", jobRequest);
Expand All @@ -236,15 +235,10 @@ public ResponseEntity<Void> submitJob(
@PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
@ResponseStatus(HttpStatus.ACCEPTED)
public ResponseEntity<Void> submitJob(
@Valid
@RequestPart("request")
final JobRequest jobRequest,
@RequestPart("attachment")
final MultipartFile[] attachments,
@RequestHeader(value = FORWARDED_FOR_HEADER, required = false)
final String clientHost,
@RequestHeader(value = HttpHeaders.USER_AGENT, required = false)
final String userAgent,
@Valid @RequestPart("request") final JobRequest jobRequest,
@RequestPart("attachment") final MultipartFile[] attachments,
@RequestHeader(value = FORWARDED_FOR_HEADER, required = false) final String clientHost,
@RequestHeader(value = HttpHeaders.USER_AGENT, required = false) final String userAgent,
final HttpServletRequest httpServletRequest
) throws GenieException {
log.info("[submitJob] Called multipart method to submit job: {}", jobRequest);
Expand All @@ -266,7 +260,7 @@ private ResponseEntity<Void> handleSubmitJob(
// get client's host from the context
final String localClientHost;
if (StringUtils.isNotBlank(clientHost)) {
localClientHost = clientHost.split(",")[0];
localClientHost = clientHost.split(COMMA)[0];
} else {
localClientHost = httpServletRequest.getRemoteAddr();
}
Expand Down Expand Up @@ -354,8 +348,7 @@ private ResponseEntity<Void> handleSubmitJob(
*/
@GetMapping(value = "/{id}", produces = MediaTypes.HAL_JSON_VALUE)
public JobResource getJob(
@PathVariable("id")
final String id) throws GenieException {
@PathVariable("id") final String id) throws GenieException {
log.info("[getJob] Called for job with id: {}", id);
return this.jobResourceAssembler.toResource(this.jobSearchService.getJob(id));
}
Expand All @@ -369,8 +362,7 @@ public JobResource getJob(
*/
@GetMapping(value = "/{id}/status", produces = MediaType.APPLICATION_JSON_VALUE)
public JsonNode getJobStatus(
@PathVariable("id")
final String id) throws GenieException {
@PathVariable("id") final String id) throws GenieException {
log.debug("[getJobStatus] Called for job with id: {}", id);
final JsonNodeFactory factory = JsonNodeFactory.instance;
return factory
Expand Down Expand Up @@ -402,34 +394,20 @@ public JsonNode getJobStatus(
@GetMapping(produces = MediaTypes.HAL_JSON_VALUE)
@ResponseStatus(HttpStatus.OK)
public PagedResources<JobSearchResultResource> findJobs(
@RequestParam(value = "id", required = false)
final String id,
@RequestParam(value = "name", required = false)
final String name,
@RequestParam(value = "user", required = false)
final String user,
@RequestParam(value = "status", required = false)
final Set<String> statuses,
@RequestParam(value = "tag", required = false)
final Set<String> tags,
@RequestParam(value = "clusterName", required = false)
final String clusterName,
@RequestParam(value = "clusterId", required = false)
final String clusterId,
@RequestParam(value = "commandName", required = false)
final String commandName,
@RequestParam(value = "commandId", required = false)
final String commandId,
@RequestParam(value = "minStarted", required = false)
final Long minStarted,
@RequestParam(value = "maxStarted", required = false)
final Long maxStarted,
@RequestParam(value = "minFinished", required = false)
final Long minFinished,
@RequestParam(value = "maxFinished", required = false)
final Long maxFinished,
@PageableDefault(sort = {"created"}, direction = Sort.Direction.DESC)
final Pageable page,
@RequestParam(value = "id", required = false) final String id,
@RequestParam(value = "name", required = false) final String name,
@RequestParam(value = "user", required = false) final String user,
@RequestParam(value = "status", required = false) final Set<String> statuses,
@RequestParam(value = "tag", required = false) final Set<String> tags,
@RequestParam(value = "clusterName", required = false) final String clusterName,
@RequestParam(value = "clusterId", required = false) final String clusterId,
@RequestParam(value = "commandName", required = false) final String commandName,
@RequestParam(value = "commandId", required = false) final String commandId,
@RequestParam(value = "minStarted", required = false) final Long minStarted,
@RequestParam(value = "maxStarted", required = false) final Long maxStarted,
@RequestParam(value = "minFinished", required = false) final Long minFinished,
@RequestParam(value = "maxFinished", required = false) final Long maxFinished,
@PageableDefault(sort = {"created"}, direction = Sort.Direction.DESC) final Pageable page,
final PagedResourcesAssembler<JobSearchResult> assembler
) throws GenieException {
log.info(
Expand Down Expand Up @@ -525,10 +503,8 @@ public PagedResources<JobSearchResultResource> findJobs(
@DeleteMapping(value = "/{id}")
@ResponseStatus(HttpStatus.ACCEPTED)
public void killJob(
@PathVariable("id")
final String id,
@RequestHeader(name = JobConstants.GENIE_FORWARDED_FROM_HEADER, required = false)
final String forwardedFrom,
@PathVariable("id") final String id,
@RequestHeader(name = JobConstants.GENIE_FORWARDED_FROM_HEADER, required = false) final String forwardedFrom,
final HttpServletRequest request,
final HttpServletResponse response
) throws GenieException, IOException, ServletException {
Expand All @@ -539,22 +515,25 @@ public void killJob(
final String jobHostname = this.jobSearchService.getJobHost(id);
if (!this.hostName.equals(jobHostname)) {
log.info("Job {} is not on this node. Forwarding kill request to {}", id, jobHostname);
final String forwardUrl = buildForwardURL(request, jobHostname);
final String forwardHost = this.buildForwardHost(jobHostname);
try {
//Need to forward job
restTemplate.execute(forwardUrl, HttpMethod.DELETE,
this.restTemplate.execute(
forwardHost + JOB_API_TEMPLATE,
HttpMethod.DELETE,
forwardRequest -> copyRequestHeaders(request, forwardRequest),
(final ClientHttpResponse forwardResponse) -> {
response.setStatus(HttpStatus.ACCEPTED.value());
copyResponseHeaders(response, forwardResponse);
return null;
}
},
id
);
} catch (HttpStatusCodeException e) {
log.error("Failed killing job on {}. Error: {}", forwardUrl, e.getMessage());
log.error("Failed killing job on {}. Error: {}", forwardHost, e.getMessage());
response.sendError(e.getStatusCode().value(), e.getStatusText());
} catch (Exception e) {
log.error("Failed killing job on {}. Error: {}", forwardUrl, e.getMessage());
log.error("Failed killing job on {}. Error: {}", forwardHost, e.getMessage());
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage());
}

Expand All @@ -579,8 +558,7 @@ public void killJob(
@GetMapping(value = "/{id}/request", produces = MediaTypes.HAL_JSON_VALUE)
@ResponseStatus(HttpStatus.OK)
public JobRequestResource getJobRequest(
@PathVariable("id")
final String id) throws GenieException {
@PathVariable("id") final String id) throws GenieException {
log.info("[getJobRequest] Called for job request with id {}", id);
return this.jobRequestResourceAssembler.toResource(this.jobSearchService.getJobRequest(id));
}
Expand All @@ -595,8 +573,7 @@ public JobRequestResource getJobRequest(
@GetMapping(value = "/{id}/execution", produces = MediaTypes.HAL_JSON_VALUE)
@ResponseStatus(HttpStatus.OK)
public JobExecutionResource getJobExecution(
@PathVariable("id")
final String id
@PathVariable("id") final String id
) throws GenieException {
log.info("[getJobExecution] Called for job execution with id {}", id);
return this.jobExecutionResourceAssembler.toResource(this.jobSearchService.getJobExecution(id));
Expand All @@ -612,8 +589,7 @@ public JobExecutionResource getJobExecution(
@GetMapping(value = "/{id}/cluster", produces = MediaTypes.HAL_JSON_VALUE)
@ResponseStatus(HttpStatus.OK)
public ClusterResource getJobCluster(
@PathVariable("id")
final String id
@PathVariable("id") final String id
) throws GenieException {
log.info("[getJobCluster] Called for job with id {}", id);
return this.clusterResourceAssembler.toResource(this.jobSearchService.getJobCluster(id));
Expand All @@ -629,8 +605,7 @@ public ClusterResource getJobCluster(
@GetMapping(value = "/{id}/command", produces = MediaTypes.HAL_JSON_VALUE)
@ResponseStatus(HttpStatus.OK)
public CommandResource getJobCommand(
@PathVariable("id")
final String id) throws GenieException {
@PathVariable("id") final String id) throws GenieException {
log.info("[getJobCommand] Called for job with id {}", id);
return this.commandResourceAssembler.toResource(this.jobSearchService.getJobCommand(id));
}
Expand All @@ -645,8 +620,7 @@ public CommandResource getJobCommand(
@GetMapping(value = "/{id}/applications", produces = MediaTypes.HAL_JSON_VALUE)
@ResponseStatus(HttpStatus.OK)
public List<ApplicationResource> getJobApplications(
@PathVariable("id")
final String id) throws GenieException {
@PathVariable("id") final String id) throws GenieException {
log.info("[getJobApplications] Called for job with id {}", id);

return this.jobSearchService
Expand Down Expand Up @@ -676,14 +650,13 @@ public List<ApplicationResource> getJobApplications(
produces = MediaType.ALL_VALUE
)
public void getJobOutput(
@PathVariable("id")
final String id,
@RequestHeader(name = JobConstants.GENIE_FORWARDED_FROM_HEADER, required = false)
final String forwardedFrom,
@PathVariable("id") final String id,
@RequestHeader(name = JobConstants.GENIE_FORWARDED_FROM_HEADER, required = false) final String forwardedFrom,
final HttpServletRequest request,
final HttpServletResponse response
) throws IOException, ServletException, GenieException {
log.info("[getJobOutput] Called for job with id: {}", id);
final String path = ControllerUtils.getRemainingPath(request);

// if forwarded from isn't null it's already been forwarded to this node. Assume data is on this node.
if (this.jobsProperties.getForwarding().isEnabled() && forwardedFrom == null) {
Expand All @@ -693,26 +666,28 @@ public void getJobOutput(
final String jobHostname = this.jobSearchService.getJobHost(id);
if (!this.hostName.equals(jobHostname)) {
log.info("Job {} is not or was not run on this node. Forwarding to {}", id, jobHostname);
final String forwardUrl = buildForwardURL(request, jobHostname);
final String forwardHost = this.buildForwardHost(jobHostname);
try {
this.restTemplate.execute(forwardUrl, HttpMethod.GET,
this.restTemplate.execute(
forwardHost + JOB_API_TEMPLATE + "/output/{path}",
HttpMethod.GET,
forwardRequest -> copyRequestHeaders(request, forwardRequest),
new ResponseExtractor<Void>() {
@Override
public Void extractData(final ClientHttpResponse forwardResponse) throws IOException {
response.setStatus(HttpStatus.OK.value());
copyResponseHeaders(response, forwardResponse);
// Documentation I could find pointed to the HttpEntity reading the bytes off
// the stream so this should resolve memory problems if the file returned is large
ByteStreams.copy(forwardResponse.getBody(), response.getOutputStream());
return null;
}
});
} catch (HttpStatusCodeException e) {
log.error("Failed getting the remote job output from {}. Error: {}", forwardUrl, e.getMessage());
(ResponseExtractor<Void>) forwardResponse -> {
response.setStatus(HttpStatus.OK.value());
copyResponseHeaders(response, forwardResponse);
// Documentation I could find pointed to the HttpEntity reading the bytes off
// the stream so this should resolve memory problems if the file returned is large
ByteStreams.copy(forwardResponse.getBody(), response.getOutputStream());
return null;
},
id,
path == null ? EMPTY_STRING : path
);
} catch (final HttpStatusCodeException e) {
log.error("Failed getting the remote job output from {}. Error: {}", forwardHost, e.getMessage());
response.sendError(e.getStatusCode().value(), e.getStatusText());
} catch (Exception e) {
log.error("Failed getting the remote job output from {}. Error: {}", forwardUrl, e.getMessage());
} catch (final Exception e) {
log.error("Failed getting the remote job output from {}. Error: {}", forwardHost, e.getMessage());
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage());
}

Expand All @@ -722,7 +697,6 @@ public Void extractData(final ClientHttpResponse forwardResponse) throws IOExcep
}

log.info("Job {} is running or was run on this node. Fetching requested resource...", id);
final String path = ControllerUtils.getRemainingPath(request);
if (StringUtils.isNotBlank(path)) {
request.setAttribute(GenieResourceHttpRequestHandler.GENIE_JOB_IS_ROOT_DIRECTORY, false);
} else {
Expand All @@ -734,13 +708,12 @@ public Void extractData(final ClientHttpResponse forwardResponse) throws IOExcep
this.resourceHttpRequestHandler.handleRequest(request, response);
}

private String buildForwardURL(final HttpServletRequest request, final String jobHostname) {
private String buildForwardHost(final String jobHostname) {
return this.jobsProperties.getForwarding().getScheme()
+ "://"
+ jobHostname
+ ":"
+ this.jobsProperties.getForwarding().getPort()
+ request.getRequestURI();
+ this.jobsProperties.getForwarding().getPort();
}

private void copyRequestHeaders(final HttpServletRequest request, final ClientHttpRequest forwardRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
import org.hibernate.validator.constraints.NotBlank;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.web.client.ResponseExtractor;
import org.springframework.web.client.RestTemplate;

import javax.validation.constraints.NotNull;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
Expand Down Expand Up @@ -106,16 +104,12 @@ public void getFile(
this.restTemplate.execute(
srcRemotePath,
HttpMethod.GET,
requestEntity ->
requestEntity.getHeaders().setAccept(Lists.newArrayList(MediaType.ALL)),
new ResponseExtractor<Void>() {
@Override
public Void extractData(final ClientHttpResponse response) throws IOException {
// Documentation I could find pointed to the HttpEntity reading the bytes off
// the stream so this should resolve memory problems if the file returned is large
FileUtils.copyInputStreamToFile(response.getBody(), outputFile);
return null;
}
requestEntity -> requestEntity.getHeaders().setAccept(Lists.newArrayList(MediaType.ALL)),
(ResponseExtractor<Void>) response -> {
// Documentation I could find pointed to the HttpEntity reading the bytes off
// the stream so this should resolve memory problems if the file returned is large
FileUtils.copyInputStreamToFile(response.getBody(), outputFile);
return null;
}
);
} catch (GenieException | RuntimeException e) {
Expand Down
Loading

0 comments on commit f22b616

Please sign in to comment.