Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

SUBMARINE-1021. Experiment Watcher #767

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.squareup.okhttp.OkHttpClient;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.Configuration;
Expand All @@ -46,8 +51,10 @@
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1Status;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;

import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
import org.apache.submarine.server.api.Submitter;
Expand All @@ -68,6 +75,8 @@
import org.apache.submarine.server.submitter.k8s.model.ingressroute.IngressRouteSpec;
import org.apache.submarine.server.submitter.k8s.model.ingressroute.SpecRoute;
import org.apache.submarine.server.submitter.k8s.model.middlewares.Middlewares;
import org.apache.submarine.server.submitter.k8s.model.pytorchjob.PyTorchJob;
import org.apache.submarine.server.submitter.k8s.model.tfjob.TFJob;
import org.apache.submarine.server.submitter.k8s.parser.ExperimentSpecParser;
import org.apache.submarine.server.submitter.k8s.parser.NotebookSpecParser;
import org.apache.submarine.server.submitter.k8s.parser.ServeSpecParser;
Expand All @@ -79,7 +88,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* JobSubmitter for Kubernetes Cluster.
*/
Expand All @@ -100,12 +108,13 @@ public class K8sSubmitter implements Submitter {

private AppsV1Api appsV1Api;

private ApiClient client = null;

public K8sSubmitter() {
}

@Override
public void initialize(SubmarineConfiguration conf) {
ApiClient client = null;
try {
String path = System.getenv(KUBECONFIG_ENV);
KubeConfig config = KubeConfig.loadKubeConfig(new FileReader(path));
Expand All @@ -119,6 +128,10 @@ public void initialize(SubmarineConfiguration conf) {
throw new SubmarineRuntimeException(500, "Initialize K8s submitter failed.");
}
} finally {
// let watcher can wait until the next change
OkHttpClient httpClient = client.getHttpClient();
httpClient.setReadTimeout(0, TimeUnit.SECONDS);
client.setHttpClient(httpClient);
Configuration.setDefaultApiClient(client);
}

Expand All @@ -133,7 +146,12 @@ public void initialize(SubmarineConfiguration conf) {
appsV1Api = new AppsV1Api();
}

client.setDebugging(true);
try {
watchExperiment();
} catch (Exception e){
LOG.error("Experiment watch failed. " + e.getMessage(), e);
}

}

@Override
Expand Down Expand Up @@ -164,7 +182,7 @@ public Experiment findExperiment(ExperimentSpec spec) throws SubmarineRuntimeExc
try {
MLJob mlJob = ExperimentSpecParser.parseJob(spec);
mlJob.getMetadata().setNamespace(getServerNamespace());

Object object = api.getNamespacedCustomObject(mlJob.getGroup(), mlJob.getVersion(),
mlJob.getMetadata().getNamespace(), mlJob.getPlural(), mlJob.getMetadata().getName());
experiment = parseExperimentResponseObject(object, ParseOp.PARSE_OP_RESULT);
Expand Down Expand Up @@ -451,7 +469,7 @@ public Notebook findNotebook(NotebookSpec spec) throws SubmarineRuntimeException
if (latestEvent.getReason().equalsIgnoreCase("Pulling")) {
notebook.setStatus(Notebook.Status.STATUS_PULLING.getValue());
notebook.setReason(latestEvent.getReason());
}
}
}
} catch (ApiException e) {
throw new SubmarineRuntimeException(e.getCode(), e.getMessage());
Expand Down Expand Up @@ -565,6 +583,89 @@ public ServeResponse deleteServe(ServeRequest spec)
}
}

public void watchExperiment() throws ApiException{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a unit test or integration test for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing in the meeting at Thursday, we think this function is hard to test because of its nature and it should be tested with the websocket developed following this PR. Maybe it is not too late to write a test for it at that time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay


Watch<MLJob> watchTF = Watch.createWatch(
client,
api.listNamespacedCustomObjectCall(
TFJob.CRD_TF_GROUP_V1,
TFJob.CRD_TF_VERSION_V1,
getServerNamespace(),
TFJob.CRD_TF_PLURAL_V1,
"true",
null,
null,
null,
null,
Boolean.TRUE,
null,
null
),
new TypeToken<Watch.Response<MLJob>>() {}.getType()
);

Watch<MLJob> watchPytorch = Watch.createWatch(
client,
api.listNamespacedCustomObjectCall(
PyTorchJob.CRD_PYTORCH_GROUP_V1,
PyTorchJob.CRD_PYTORCH_VERSION_V1,
getServerNamespace(),
PyTorchJob.CRD_PYTORCH_PLURAL_V1,
"true",
null,
null,
null,
null,
Boolean.TRUE,
null,
null
),
new TypeToken<Watch.Response<MLJob>>() {}.getType()
);

ExecutorService experimentThread = Executors.newFixedThreadPool(2);

experimentThread.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Start watching on TFJobs...");
for (Watch.Response<MLJob> experiment : watchTF) {
LOG.info("{}", experiment.object.getStatus());
}
} finally {
LOG.info("WATCH TFJob END");
try {
watchTF.close();
} catch (Exception e){
LOG.error("{}", e.getMessage());
}
throw new RuntimeException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new RuntimeException();
watchTF.close();
throw new RuntimeException();

}
}
});

experimentThread.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Start watching on PytorchJobs...");
for (Watch.Response<MLJob> experiment : watchPytorch) {
LOG.info("{}", experiment.object.getStatus());
}
} finally {
LOG.info("WATCH PytorchJob END");
try {
watchPytorch.close();
} catch (Exception e){
LOG.error("{}", e.getMessage());
}
throw new RuntimeException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new RuntimeException();
watchPytorch.close();
throw new RuntimeException();

}
}
});
}

public void createPersistentVolumeClaim(String pvcName, String namespace, String scName, String storage)
throws ApiException {
V1PersistentVolumeClaim pvc = VolumeSpecParser.parsePersistentVolumeClaim(pvcName, scName, storage);
Expand Down