Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Autoscaling #437

Merged
merged 14 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,23 @@ examples/models/sagemaker/scikit_learn_iris_deployment.json
#go example
examples/wrappers/go/.idea

util/custom-resource-definitions/hpa-spec.json
wrappers/s2i/python_openvino/s2i

components/outlier-detection/seq2seq-lstm/data/ECG5000.txt
components/outlier-detection/seq2seq-lstm/data/ECG5000_TEST.arff
components/outlier-detection/seq2seq-lstm/data/ECG5000_TEST.txt
components/outlier-detection/seq2seq-lstm/data/ECG5000_TRAIN.arff
components/outlier-detection/seq2seq-lstm/data/ECG5000_TRAIN.txt

examples/models/nodejs_mnist/model.json
examples/models/nodejs_mnist/weights.bin
examples/models/nodejs_tensorflow/model.json
examples/models/nodejs_tensorflow/weights.bin
examples/models/onnx_resnet50/resnet50.tar.gz.1
examples/models/onnx_resnet50/resnet50.tar.gz.2
examples/models/onnx_resnet50/resnet50/

doc/_build
doc/source/_static/cluster-manager

Expand Down
1 change: 0 additions & 1 deletion api-frontend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ update_proto:
cp -v ../proto/seldon_deployment.proto src/main/proto/
cp -v ../proto/prediction.proto src/main/proto/
cp -vr ../proto/k8s/k8s.io src/main/proto
cp -v ../proto/k8s/v1.proto src/main/proto
cp -vr ../proto/tensorflow/tensorflow src/main/proto

port_forward_api_server:
Expand Down
1 change: 0 additions & 1 deletion api-frontend/Makefile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ update_proto: download_protos_k8s download_protos_tensorflow
cp -v ../proto/seldon_deployment.proto src/main/proto/
cp -v ../proto/prediction.proto src/main/proto/
cp -vr ../proto/k8s/k8s.io src/main/proto
cp -v ../proto/k8s/v1.proto src/main/proto
cp -vr ../proto/tensorflow/tensorflow src/main/proto

update_swagger:
Expand Down
10 changes: 4 additions & 6 deletions api-frontend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<clearOutputDirectory>false</clearOutputDirectory>
<excludes>
<exclude>k8s.io/**/*.proto</exclude>
</excludes>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -257,11 +260,6 @@
<version>3.2.0rc2</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.hamcrest/hamcrest-all -->
<dependency>
<groupId>org.hamcrest</groupId>
Expand Down Expand Up @@ -307,7 +305,7 @@
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>0.2</version>
<version>3.0.0</version>
<scope>compile</scope>
</dependency>

Expand Down
1 change: 0 additions & 1 deletion cluster-manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ cache_dependencies:
update_proto:
cp -v ../proto/seldon_deployment.proto src/main/proto/
cp -vr ../proto/k8s/k8s.io src/main/proto
cp -v ../proto/k8s/v1.proto src/main/proto

cluster-manager-client-secret.txt:
@openssl rand -base64 12 > $@ && echo "created: $@"
Expand Down
3 changes: 1 addition & 2 deletions cluster-manager/Makefile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ push_image_private_repo:
clean:
mvn clean -B
rm -fr src/main/proto/*
rm -rfv java_client

download_protos:
cd ../proto/k8s ; make create_protos

update_proto: download_protos
cp -v ../proto/seldon_deployment.proto src/main/proto/
cp -vr ../proto/k8s/k8s.io src/main/proto
cp -v ../proto/k8s/v1.proto src/main/proto



6 changes: 5 additions & 1 deletion cluster-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@
</extensions>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
Expand Down Expand Up @@ -202,7 +207,6 @@
<checkStaleness>true</checkStaleness>
<excludes>
<exclude>k8s.io/**/*.proto</exclude>
<exclude>**/v1.proto</exclude>
</excludes>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.seldon.clustermanager.config;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Provide multi threaded default scheduling for Spring schedulers
* @author clive
*
*/
@Configuration
@EnableScheduling
public class SchedulerConfig {

@Bean(destroyMethod = "shutdown")
public Executor taskScheduler() {
return Executors.newScheduledThreadPool(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,19 @@
import io.kubernetes.client.models.ExtensionsV1beta1Deployment;
import io.kubernetes.client.models.ExtensionsV1beta1DeploymentStatus;
import io.kubernetes.client.models.V1OwnerReference;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import io.seldon.clustermanager.ClusterManagerProperites;
import io.seldon.clustermanager.k8s.client.K8sApiProvider;
import io.seldon.clustermanager.k8s.client.K8sClientProvider;
import io.seldon.clustermanager.k8s.tasks.K8sTaskScheduler;
import io.seldon.clustermanager.k8s.tasks.SeldonDeploymentTaskKey;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

/**
* Watch deployments created by Seldon Deployments to update the status of the owned Seldon Deployment
* @author clive
*
*/
@Component
public class DeploymentWatcher {

Expand All @@ -50,22 +57,92 @@ public class DeploymentWatcher {
private int resourceVersion = 0;
private int resourceVersionProcessed = 0;

private final K8sTaskScheduler taskScheduler;
private final SeldonDeploymentStatusUpdate statusUpdater;
private final K8sClientProvider k8sClientProvider;
private final K8sApiProvider k8sApiProvider;
private final String namespace;
private final boolean clusterWide;


/**
* Runnable to call the status updater
* @author clive
*
*/
public static class StatusUpdateTask implements Runnable {

private final SeldonDeploymentStatusUpdate statusUpdater;
private final String sdepName;
private final String version;
private final String namespace;
private final String depName;
private final Integer replicas;
private final Integer replicasReady;
public StatusUpdateTask(SeldonDeploymentStatusUpdate statusUpdater,String sdepName, String version, String namespace,String depName, Integer replicas,Integer replicasReady) {
this.statusUpdater = statusUpdater;
this.sdepName = sdepName;
this.version = version;
this.namespace = namespace;
this.depName = depName;
this.replicas = replicas;
this.replicasReady = replicasReady;
}

@Override
public void run() {
statusUpdater.updateStatus(sdepName, version, depName, replicas, replicasReady, namespace);
}
}

/**
* Taks to remove Seldon Deployment status when deployment removed
* @author clive
*
*/
public static class StatusRemoveTask implements Runnable {

private final SeldonDeploymentStatusUpdate statusUpdater;
private final String sdepName;
private final String version;
private final String namespace;
private final String depName;
public StatusRemoveTask(SeldonDeploymentStatusUpdate statusUpdater,String sdepName, String version, String namespace,String depName) {
this.statusUpdater = statusUpdater;
this.sdepName = sdepName;
this.version = version;
this.namespace = namespace;
this.depName = depName;
}

@Override
public void run() {
statusUpdater.removeStatus(sdepName, version, depName, namespace);
}
}


@Autowired
public DeploymentWatcher(K8sApiProvider k8sApiProvider,K8sClientProvider k8sClientProvider,ClusterManagerProperites clusterManagerProperites,SeldonDeploymentStatusUpdate statusUpdater)
public DeploymentWatcher(K8sApiProvider k8sApiProvider,K8sClientProvider k8sClientProvider,
ClusterManagerProperites clusterManagerProperites,SeldonDeploymentStatusUpdate statusUpdater,
K8sTaskScheduler taskScheduler)
{
this.statusUpdater = statusUpdater;
this.namespace = StringUtils.isEmpty(clusterManagerProperites.getNamespace()) ? "default" : clusterManagerProperites.getNamespace();
this.clusterWide = !clusterManagerProperites.isSingleNamespace();
this.k8sClientProvider = k8sClientProvider;
this.k8sApiProvider = k8sApiProvider;
this.taskScheduler = taskScheduler;
}

/**
* Watch for owned deployments
* @param resourceVersion - last resource version returned
* @param resourceVersionProcessed - last resource version processed
* @return the new resource version
* @throws ApiException
* @throws IOException
*/
public int watchDeployments(int resourceVersion,int resourceVersionProcessed) throws ApiException, IOException
{
String rs = null;
Expand Down Expand Up @@ -130,7 +207,9 @@ public int watchDeployments(int resourceVersion,int resourceVersionProcessed) th
String namespace = StringUtils.isEmpty(item.object.getMetadata().getNamespace()) ? "default" : item.object.getMetadata().getNamespace();
ExtensionsV1beta1DeploymentStatus status = item.object.getStatus();
logger.info("{} {} {} replicas:{} replicasAvailable(ready):{} replicasUnavilable:{} replicasReady(available):{}",item.type,mlDepName,depName,status.getReplicas(),status.getReadyReplicas(),status.getUnavailableReplicas(),status.getAvailableReplicas());
statusUpdater.updateStatus(mlDepName, depName, item.object.getStatus().getReplicas(),item.object.getStatus().getReadyReplicas(),namespace);
final String version = SeldonDeploymentUtils.getVersionFromApiVersion(ownerRef.getApiVersion());
//statusUpdater.updateStatus(mlDep, depName, item.object.getStatus().getReplicas(),item.object.getStatus().getReadyReplicas());
taskScheduler.submit(new SeldonDeploymentTaskKey(mlDepName, version, namespace), new StatusUpdateTask(statusUpdater, mlDepName, version, namespace, depName, item.object.getStatus().getReplicas(),item.object.getStatus().getReadyReplicas()));
}
}
break;
Expand All @@ -144,7 +223,9 @@ public int watchDeployments(int resourceVersion,int resourceVersionProcessed) th
ExtensionsV1beta1DeploymentStatus status = item.object.getStatus();
logger.info("{} {} {} replicas:{} replicasAvailable(ready):{} replicasUnavilable:{} replicasReady(available):{}",item.type,mlDepName,depName,status.getReplicas(),status.getReadyReplicas(),status.getUnavailableReplicas(),status.getAvailableReplicas());
String namespace = StringUtils.isEmpty(item.object.getMetadata().getNamespace()) ? "default" : item.object.getMetadata().getNamespace();
statusUpdater.removeStatus(mlDepName,depName,namespace);
final String version = SeldonDeploymentUtils.getVersionFromApiVersion(ownerRef.getApiVersion());
//statusUpdater.removeStatus(mlDep,depName);
taskScheduler.submit(new SeldonDeploymentTaskKey(mlDepName, version, namespace), new StatusRemoveTask(statusUpdater, mlDepName, version, namespace, depName));
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

import io.kubernetes.client.models.ExtensionsV1beta1DeploymentList;
import io.kubernetes.client.models.V1ServiceList;
import io.kubernetes.client.models.V2beta1HorizontalPodAutoscalerList;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

/**
* Interactions to update and get details about Seldon deployments and associated resources
* @author clive
*
*/
public interface KubeCRDHandler {

public void updateRaw(String json,String seldonDeploymentName,String namespace);
public void updateRaw(String json, String seldonDeploymentName, String version, String namespace);
public void updateSeldonDeploymentStatus(SeldonDeployment mlDep);
public SeldonDeployment getSeldonDeployment(String name,String namespace);
public SeldonDeployment getSeldonDeployment(String name, String version, String namespace);
public ExtensionsV1beta1DeploymentList getOwnedDeployments(String seldonDeploymentName,String namespace);
public V1ServiceList getOwnedServices(String seldonDeploymentName,String namespace);
public V2beta1HorizontalPodAutoscalerList getOwnedHPAs(String seldonDeploymentName,String namespace);
}
Loading