Skip to content

Commit

Permalink
Add support for byte ranges to job output requests in java client
Browse files Browse the repository at this point in the history
  • Loading branch information
tgianos committed Dec 3, 2021
1 parent 760be83 commit 2a8ec8f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 13 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ configure((Set<Project>) ext.javaProjects) {
mavenBom "com.google.protobuf:protobuf-bom:${protobuf_version}"
mavenBom "com.squareup.okhttp3:okhttp-bom:4.9.0"
mavenBom "io.grpc:grpc-bom:1.33.1"
mavenBom "org.spockframework:spock-bom:2.0-M4-groovy-2.5"
mavenBom "org.testcontainers:testcontainers-bom:1.15.0"
mavenBom "org.spockframework:spock-bom:2.0-groovy-2.5"
mavenBom "org.testcontainers:testcontainers-bom:1.16.2"
}
dependencies {
dependency("com.beust:jcommander:1.81")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ abstract class GenieClientIntegrationTestBase {
// is faster as in agent mode the tests are much slower than embedded. Also once we move to boot 2.3 we can
// leverage their layered jars to produce less changing images.
@Container
private static final GenericContainer GENIE = new GenericContainer("netflixoss/genie-app:latest.candidate")
private static final GenericContainer GENIE = new GenericContainer("netflixoss/genie-app:latest.release")
.waitingFor(Wait.forHttp("/admin/health").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(1L)))
.withExposedPorts(8080);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,18 @@ void canSubmitJob() throws Exception {
Assertions
.assertThat(IOUtils.toString(this.jobClient.getJobStdout(echoJobId), StandardCharsets.UTF_8))
.isEqualTo("hello\n");
Assertions
.assertThat(IOUtils.toString(this.jobClient.getJobStdout(echoJobId, null, null), StandardCharsets.UTF_8))
.isEqualTo("hello\n");
Assertions
.assertThat(IOUtils.toString(this.jobClient.getJobStdout(echoJobId, 4L, null), StandardCharsets.UTF_8))
.isEqualTo("o\n");
Assertions
.assertThat(IOUtils.toString(this.jobClient.getJobStdout(echoJobId, 0L, 3L), StandardCharsets.UTF_8))
.isEqualTo("hell");
Assertions
.assertThat(IOUtils.toString(this.jobClient.getJobStdout(echoJobId, null, 2L), StandardCharsets.UTF_8))
.isEqualTo("o\n");
Assertions
.assertThat(IOUtils.toString(this.jobClient.getJobStderr(echoJobId), StandardCharsets.UTF_8))
.isBlank();
Expand Down
132 changes: 124 additions & 8 deletions genie-client/src/main/java/com/netflix/genie/client/JobClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public class JobClient {
private static final String STATUS = "status";
private static final String ATTACHMENT = "attachment";
private static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
private static final String EMPTY_STRING = "";
private static final int ZERO = 0;
private static final String STDOUT = "stdout";
private static final String STDERR = "stderr";

private final JobService jobService;
private final int maxStatusRetries;
Expand Down Expand Up @@ -536,7 +540,35 @@ public List<Application> getJobApplications(
* @throws IOException For Network and other IO issues.
*/
public InputStream getJobStdout(final String jobId) throws IOException, GenieClientException {
return getJobOutputFile(jobId, "stdout");
return this.getJobStdout(jobId, null, null);
}

/**
* Method to fetch the stdout of a job from Genie.
*
* <p>
* Range Logic:
* <p>
* {@literal rangeStart} but no {@literal rangeEnd} then go from the start byte to the end of available content
* <p>
* {@literal rangeStart} and {@literal rangeEnd} return that range of bytes from the file if they exist
* <p>
* If only {@literal rangeEnd} then return the last number of those bytes from the file if they exist
*
* @param jobId The id of the job whose output is desired.
* @param rangeStart The start byte of the file to retrieve. Optional. Greater than or equal to 0.
* @param rangeEnd The end byte of the file to retrieve. Optional. Greater than or equal to 0. Must be
* greater than {@literal rangeStart}.
* @return An input stream to the output contents.
* @throws GenieClientException If the response received is not 2xx.
* @throws IOException For Network and other IO issues.
*/
public InputStream getJobStdout(
final String jobId,
@Nullable final Long rangeStart,
@Nullable final Long rangeEnd
) throws IOException, GenieClientException {
return this.getJobOutputFile(jobId, STDOUT, rangeStart, rangeEnd);
}

/**
Expand All @@ -547,10 +579,36 @@ public InputStream getJobStdout(final String jobId) throws IOException, GenieCli
* @throws GenieClientException If the response received is not 2xx.
* @throws IOException For Network and other IO issues.
*/
public InputStream getJobStderr(final String jobId) throws IOException, GenieClientException {
return this.getJobStderr(jobId, null, null);
}

/**
* Method to fetch the stderr of a job from Genie.
*
* <p>
* Range Logic:
* <p>
* {@literal rangeStart} but no {@literal rangeEnd} then go from the start byte to the end of available content
* <p>
* {@literal rangeStart} and {@literal rangeEnd} return that range of bytes from the file if they exist
* <p>
* If only {@literal rangeEnd} then return the last number of those bytes from the file if they exist
*
* @param jobId The id of the job whose stderr is desired.
* @param rangeStart The start byte of the file to retrieve. Optional. Greater than or equal to 0.
* @param rangeEnd The end byte of the file to retrieve. Optional. Greater than or equal to 0. Must be
* greater than {@literal rangeStart}.
* @return An input stream to the stderr contents.
* @throws GenieClientException If the response received is not 2xx.
* @throws IOException For Network and other IO issues.
*/
public InputStream getJobStderr(
final String jobId
final String jobId,
@Nullable final Long rangeStart,
@Nullable final Long rangeEnd
) throws IOException, GenieClientException {
return getJobOutputFile(jobId, "stderr");
return this.getJobOutputFile(jobId, STDERR, rangeStart, rangeEnd);
}

/**
Expand All @@ -568,16 +626,74 @@ public InputStream getJobStderr(
* @throws IOException For Network and other IO issues.
*/
public InputStream getJobOutputFile(
final String jobId, final String outputFilePath
final String jobId,
final String outputFilePath
) throws IOException, GenieClientException {
return this.getJobOutputFile(jobId, outputFilePath, null, null);
}

/**
* Method to fetch an output file for a job from Genie and accepting an range of bytes to return.
*
* <p>
* <b>NOTE</b>: If the specified outputFilePath is a directory, then the directory
* manifest is returned.
* </p>
*
* <p>
* Range Logic:
* <p>
* {@literal rangeStart} but no {@literal rangeEnd} then go from the start byte to the end of available content
* <p>
* {@literal rangeStart} and {@literal rangeEnd} return that range of bytes from the file if they exist
* <p>
* If only {@literal rangeEnd} then return the last number of those bytes from the file if they exist
*
* @param jobId The id of the job whose output file is desired.
* @param outputFilePath The path to the file within output directory.
* @param rangeStart The start byte of the file to retrieve. Optional. Greater than or equal to 0.
* @param rangeEnd The end byte of the file to retrieve. Optional. Greater than or equal to 0. Must be
* greater than {@literal rangeStart}.
* @return An input stream to the output file contents.
* @throws GenieClientException If the response received is not 2xx.
* @throws IOException For Network and other IO issues.
* @see <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range">Range Header Documentation</a>
*/
public InputStream getJobOutputFile(
final String jobId,
final String outputFilePath,
@Nullable final Long rangeStart,
@Nullable final Long rangeEnd
) throws IOException, GenieClientException {
if (StringUtils.isEmpty(jobId)) {
throw new IllegalArgumentException("Missing required parameter: jobId.");
}
final String pathArg = StringUtils.isEmpty(outputFilePath) ? "" : outputFilePath;
final ResponseBody body = this.jobService.getJobOutputFile(jobId, pathArg).execute().body();
if (rangeStart != null && rangeStart < ZERO) {
throw new IllegalArgumentException("Range start must be greater than or equal to 0");
}
if (rangeEnd != null && rangeEnd < ZERO) {
throw new IllegalArgumentException("Range end must be greater than or equal to 0");
}

String rangeHeader = null;
if (rangeStart != null || rangeEnd != null) {
if (rangeStart != null && rangeEnd != null) {
if (rangeEnd < rangeStart) {
throw new IllegalArgumentException("Range end must be greater than range start");
}
rangeHeader = "bytes=" + rangeStart + "-" + rangeEnd;
} else if (rangeStart != null) {
rangeHeader = "bytes=" + rangeStart + "-";
} else {
rangeHeader = "bytes=-" + rangeEnd;
}
}
final String pathArg = StringUtils.isEmpty(outputFilePath) ? EMPTY_STRING : outputFilePath;
final ResponseBody body = rangeHeader == null
? this.jobService.getJobOutputFile(jobId, pathArg).execute().body()
: this.jobService.getJobOutputFile(jobId, pathArg, rangeHeader).execute().body();
if (body == null) {
throw new GenieClientException(
String.format("No data for %s returned", outputFilePath));
throw new GenieClientException(String.format("No data for %s returned", outputFilePath));
}
return body.byteStream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import retrofit2.http.Body;
import retrofit2.http.DELETE;
import retrofit2.http.GET;
import retrofit2.http.Header;
import retrofit2.http.Multipart;
import retrofit2.http.POST;
import retrofit2.http.Part;
Expand Down Expand Up @@ -74,7 +75,8 @@ public interface JobService {
@POST(JOBS_URL_SUFFIX)
Call<Void> submitJobWithAttachments(
@Part("request") JobRequest request,
@Part List<MultipartBody.Part> attachments);
@Part List<MultipartBody.Part> attachments
);

/**
* Method to get all jobs from Genie.
Expand Down Expand Up @@ -147,7 +149,29 @@ Call<JsonNode> getJobs(
@GET(JOBS_URL_SUFFIX + "/{id}/output/{path}")
Call<ResponseBody> getJobOutputFile(
@Path("id") String jobId,
@Path(value = "path", encoded = true) String outputFilePath);
@Path(value = "path", encoded = true) String outputFilePath
);

/**
* Method to fetch partial output file for a job from Genie.
*
* <p>
* <b>NOTE</b>: If the specified outputFilePath is a directory, then the directory
* manifest is returned.
* </p>
*
* @param jobId The id of the job whose output file is desired.
* @param outputFilePath The path to the file within output directory.
* @param range The range header value
* @return A callable object.
*/
@Streaming
@GET(JOBS_URL_SUFFIX + "/{id}/output/{path}")
Call<ResponseBody> getJobOutputFile(
@Path("id") String jobId,
@Path(value = "path", encoded = true) String outputFilePath,
@Header("Range") String range
);

/**
* Method to get Job status.
Expand Down

0 comments on commit 2a8ec8f

Please sign in to comment.