Skip to content

Commit

Permalink
SUBMARINE-442. Support get job's log in submarine-server REST API
Browse files Browse the repository at this point in the history
### What is this PR for?

Now we have the "jobs" resource in REST which can do CRUD. We also need a "logs" API to get the job's log output. The URI could be "api/v1/logs"
It should accept parameters like "jobid". Initially, the logs could be aggregated logs of all containers.
Streaming is preferred so that the python client can enable a fancy way for the end-user to check logs

### What type of PR is it?
Feature

### Todos
* [x] - get logs so far

### What is the Jira issue?
https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-442

### How should this be tested?
Create a job
Visit /api/v1/logs or /api/v1/logs/{jobid} with a browser

### Screenshots (if appropriate)
http://127.0.0.1:8080/api/v1/jobs/logs
![image](https://user-images.githubusercontent.com/19265751/79961593-8a294c80-84b9-11ea-85ef-9367e17fecc9.png)

http://127.0.0.1:8080/api/v1/jobs/logs/job_1587481945001_0001
![image](https://user-images.githubusercontent.com/19265751/79961674-a0370d00-84b9-11ea-908f-b6bdeceeb6eb.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: JohnTing <jot.johnting@gmail.com>

Closes #263 from JohnTing/SUBMARINE-442 and squashes the following commits:

7195a77 [JohnTing] test12
1bb806f [JohnTing] test12
319222c [JohnTing] SUBMARINE-442
  • Loading branch information
JohnTing authored and xunliu committed Apr 28, 2020
1 parent ef02ebe commit 87b2296
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ submarine-cloud/bin/*

submarine-security/spark-security/dependency-reduced-pom.xml
submarine-security/spark-security/derby.log

# vscode file
.project
.classpath
.settings
.factorypath
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.submarine.server.api.job;

import java.util.ArrayList;
import java.util.List;

public class JobLog {
private String jobId;
private List<podLog> logContent;

class podLog {
String podName;
String podLog;
podLog(String podName, String podLog) {
this.podName = podName;
this.podLog = podLog;
}
}

public JobLog() {
logContent = new ArrayList<podLog>();
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public String getJobId() {
return jobId;
}

public void addPodLog(String name, String log) {
logContent.add(new podLog(name, log));
}

public void clearPodLog() {
logContent.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,20 @@ public interface JobSubmitter {
* @throws SubmarineRuntimeException running error
*/
Job deleteJob(JobSpec jobSpec) throws SubmarineRuntimeException;

/**
* Get the pod log list in the job
* @param Job job
* @return object
* @throws SubmarineRuntimeException running error
*/
JobLog getJobLog(JobSpec jobSpec, String jobId) throws SubmarineRuntimeException;

/**
* Get the pod name list in the job
* @param Job job
* @return object
* @throws SubmarineRuntimeException running error
*/
JobLog getJobLogName(JobSpec jobSpec, String jobId) throws SubmarineRuntimeException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.submarine.server.api.job.JobSubmitter;
import org.apache.submarine.server.api.job.Job;
import org.apache.submarine.server.api.job.JobId;
import org.apache.submarine.server.api.job.JobLog;
import org.apache.submarine.server.api.spec.JobSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -164,6 +165,43 @@ public Job deleteJob(String id) throws SubmarineRuntimeException {
return job;
}

/**
* List job logs
* @param status job status, if null will return all job logs
* @return job log list
* @throws SubmarineRuntimeException the service error
*/
public List<JobLog> listJobLogsByStatus(String status) throws SubmarineRuntimeException {
List<JobLog> jobLogList = new ArrayList<JobLog>();
for (Map.Entry<String, Job> entry : cachedJobMap.entrySet()) {
String jobId = entry.getKey();
Job job = entry.getValue();
JobSpec spec = job.getSpec();
Job patchJob = submitter.findJob(spec);
LOG.info("Found job: {}", patchJob.getStatus());
if (status == null || status.toLowerCase().equals(patchJob.getStatus().toLowerCase())) {
job.rebuild(patchJob);
jobLogList.add(submitter.getJobLogName(spec, jobId));
}
}
return jobLogList;
}

/**
* Get job log
* @param id job id
* @return object
* @throws SubmarineRuntimeException the service error
*/
public JobLog getJobLog(String id) throws SubmarineRuntimeException {
checkJobId(id);
Job job = cachedJobMap.get(id);
JobSpec spec = job.getSpec();
Job patchJob = submitter.findJob(spec);
job.rebuild(patchJob);
return submitter.getJobLog(spec, id);
}

private void checkSpec(JobSpec spec) throws SubmarineRuntimeException {
if (spec == null) {
throw new SubmarineRuntimeException(Status.OK.getStatusCode(), "Invalid job spec.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.job.JobManager;
import org.apache.submarine.server.api.job.Job;
import org.apache.submarine.server.api.job.JobLog;
import org.apache.submarine.server.api.spec.JobSpec;
import org.apache.submarine.server.response.JsonResponse;

Expand Down Expand Up @@ -132,6 +133,32 @@ public Response deleteJob(@PathParam(RestConstants.JOB_ID) String id) {
return parseJobServiceException(e);
}
}

@GET
@Path("/logs")
public Response listLog(@QueryParam("status") String status) {
try {
List<JobLog> jobLogList = jobManager.listJobLogsByStatus(status);
return new JsonResponse.Builder<List<JobLog>>(Response.Status.OK).
result(jobLogList).build();

} catch (SubmarineRuntimeException e) {
return parseJobServiceException(e);
}
}

@GET
@Path("/logs/{id}")
public Response getLog(@PathParam(RestConstants.JOB_ID) String id) {
try {
JobLog jobLog = jobManager.getJobLog(id);
return new JsonResponse.Builder<JobLog>(Response.Status.OK).
result(jobLog).build();

} catch (SubmarineRuntimeException e) {
return parseJobServiceException(e);
}
}

private Response parseJobServiceException(SubmarineRuntimeException e) {
return new JsonResponse.Builder<String>(e.getCode()).message(e.getMessage()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public class RestConstants {

public static final String NODES = "nodes";
public static final String NODE = "node";

public static final String LOGS = "logs";
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.JSON;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.CustomObjectsApi;
import io.kubernetes.client.models.V1Pod;
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.models.V1Status;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
Expand All @@ -37,6 +40,8 @@
import org.apache.submarine.server.api.exception.InvalidSpecException;
import org.apache.submarine.server.api.job.JobSubmitter;
import org.apache.submarine.server.api.job.Job;
import org.apache.submarine.server.api.job.JobLog;
import org.apache.submarine.server.api.spec.JobLibrarySpec;
import org.apache.submarine.server.api.spec.JobSpec;
import org.apache.submarine.server.submitter.k8s.util.MLJobConverter;
import org.apache.submarine.server.submitter.k8s.model.MLJob;
Expand All @@ -52,9 +57,14 @@ public class K8sJobSubmitter implements JobSubmitter {

private static final String KUBECONFIG_ENV = "KUBECONFIG";

private static final String TF_JOB_SELECTOR_KEY = "tf-job-name=";
private static final String PYTORCH_JOB_SELECTOR_KEY = "pytorch-job-name=";

// K8s API client for CRD
private CustomObjectsApi api;

private CoreV1Api coreApi;

public K8sJobSubmitter() {}

@Override
Expand All @@ -79,6 +89,9 @@ public void initialize(SubmarineConfiguration conf) {
if (api == null) {
api = new CustomObjectsApi();
}
if (coreApi == null) {
coreApi = new CoreV1Api(client);
}
}

@Override
Expand All @@ -95,8 +108,10 @@ public Job createJob(JobSpec jobSpec) throws SubmarineRuntimeException {
mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob, "true");
job = parseResponseObject(object, ParseOp.PARSE_OP_RESULT);
} catch (InvalidSpecException e) {
LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(), e);
throw new SubmarineRuntimeException(200, e.getMessage());
} catch (ApiException e) {
LOG.error("K8s submitter: parse Job object failed by " + e.getMessage(), e);
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
}
return job;
Expand Down Expand Up @@ -165,11 +180,68 @@ private Job parseResponseObject(Object object, ParseOp op) throws SubmarineRunti
return MLJobConverter.toJobFromStatus(status);
}
} catch (JsonSyntaxException e) {
LOG.warn("K8s submitter: parse response object failed by " + e.getMessage(), e);
LOG.error("K8s submitter: parse response object failed by " + e.getMessage(), e);
}
throw new SubmarineRuntimeException(500, "K8s Submitter parse upstream response failed.");
}

@Override
public JobLog getJobLogName(JobSpec jobSpec, String jobId) {
JobLog jobLog = new JobLog();
jobLog.setJobId(jobId);
try {
final V1PodList podList = coreApi.listNamespacedPod(
jobSpec.getNamespace(),
"false", null, null,
getJobLabelSelector(jobSpec), null, null,
null, null);
for (V1Pod pod: podList.getItems()) {
String podName = pod.getMetadata().getName();
jobLog.addPodLog(podName, null);
}
} catch (final ApiException e) {
LOG.error("Error when listing pod for job:" + jobSpec.getName(), e.getMessage());
}
return jobLog;
}

@Override
public JobLog getJobLog(JobSpec jobSpec, String jobId) {
JobLog jobLog = new JobLog();
jobLog.setJobId(jobId);
try {
final V1PodList podList = coreApi.listNamespacedPod(
jobSpec.getNamespace(),
"false", null, null,
getJobLabelSelector(jobSpec), null, null,
null, null);

for (V1Pod pod : podList.getItems()) {
String podName = pod.getMetadata().getName();
String namespace = pod.getMetadata().getNamespace();
String podLog = coreApi.readNamespacedPodLog(
podName, namespace, null, Boolean.FALSE,
Integer.MAX_VALUE, null, Boolean.FALSE,
Integer.MAX_VALUE, null, Boolean.FALSE);

jobLog.addPodLog(podName, podLog);
}
} catch (final ApiException e) {
LOG.error("Error when listing pod for job:" + jobSpec.getName(), e.getMessage());
}
return jobLog;
}

private String getJobLabelSelector(JobSpec jobSpec) {
// TODO(JohnTing): SELECTOR_KEY should be obtained from individual models in MLJOB
if (jobSpec.getLibrarySpec()
.getName().equalsIgnoreCase(JobLibrarySpec.SupportedMLFramework.TENSORFLOW.getName())) {
return TF_JOB_SELECTOR_KEY + jobSpec.getName();
} else {
return PYTORCH_JOB_SELECTOR_KEY + jobSpec.getName();
}
}

private enum ParseOp {
PARSE_OP_RESULT,
PARSE_OP_DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class JobManagerRestApiIT extends AbstractSubmarineServerTest {
/** Key is the ml framework name, the value is the operator */
private static Map<String, KfOperator> kfOperatorMap;
private static String JOB_PATH = "/api/" + RestConstants.V1 + "/" + RestConstants.JOBS;
private static String JOB_LOG_PATH = JOB_PATH + "/" + RestConstants.LOGS;

private Gson gson = new GsonBuilder()
.registerTypeAdapter(JobId.class, new JobIdSerializer())
Expand Down Expand Up @@ -151,6 +153,9 @@ private void run(String body, String patchBody, String contentType) throws Excep
Job foundJob = gson.fromJson(gson.toJson(jsonResponse.getResult()), Job.class);
verifyGetJobApiResult(createdJob, foundJob);

// get log list
// TODO(JohnTing): Test the job log after creating the job

// patch
// TODO(jiwq): the commons-httpclient not support patch method
// https://tools.ietf.org/html/rfc5789
Expand Down Expand Up @@ -252,6 +257,16 @@ public void testNotFoundJob() throws Exception {
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), jsonResponse.getCode());
}

@Test
public void testListJobLog() throws Exception {
GetMethod getMethod = httpGet(JOB_LOG_PATH);
Assert.assertEquals(Response.Status.OK.getStatusCode(), getMethod.getStatusCode());

String json = getMethod.getResponseBodyAsString();
JsonResponse jsonResponse = gson.fromJson(json, JsonResponse.class);
Assert.assertEquals(Response.Status.OK.getStatusCode(), jsonResponse.getCode());
}

String loadContent(String resourceName) throws Exception {
URL fileUrl = this.getClass().getResource("/" + resourceName);
LOG.info("Resource file: " + fileUrl);
Expand Down

0 comments on commit 87b2296

Please sign in to comment.