From fe0786cbe80db3ed59055085b8bfa30240bef07d Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 7 Oct 2020 19:19:00 -0700 Subject: [PATCH 01/10] Disallow modifying internal cluster images in running cluster --- cli/cmd/lib_cluster_config.go | 66 +++++- manager/install.sh | 415 ++++++++++++++++++++-------------- 2 files changed, 306 insertions(+), 175 deletions(-) diff --git a/cli/cmd/lib_cluster_config.go b/cli/cmd/lib_cluster_config.go index e155eeaade..6c2c1f49df 100644 --- a/cli/cmd/lib_cluster_config.go +++ b/cli/cmd/lib_cluster_config.go @@ -343,6 +343,66 @@ func setConfigFieldsFromCached(userClusterConfig *clusterconfig.Config, cachedCl } userClusterConfig.APIGatewaySetting = cachedClusterConfig.APIGatewaySetting + if s.Obj(cachedClusterConfig.VPCCIDR) != s.Obj(userClusterConfig.VPCCIDR) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.VPCCIDRKey, cachedClusterConfig.VPCCIDR) + } + userClusterConfig.VPCCIDR = cachedClusterConfig.VPCCIDR + + if s.Obj(cachedClusterConfig.ImageDownloader) != s.Obj(userClusterConfig.ImageDownloader) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageDownloaderKey, cachedClusterConfig.ImageDownloader) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageRequestMonitor) != s.Obj(userClusterConfig.ImageRequestMonitor) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageRequestMonitorKey, cachedClusterConfig.ImageRequestMonitor) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageClusterAutoscaler) != s.Obj(userClusterConfig.ImageClusterAutoscaler) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageClusterAutoscalerKey, cachedClusterConfig.ImageClusterAutoscaler) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageMetricsServer) != s.Obj(userClusterConfig.ImageMetricsServer) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageMetricsServerKey, cachedClusterConfig.ImageMetricsServer) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageInferentia) != s.Obj(userClusterConfig.ImageInferentia) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageInferentiaKey, cachedClusterConfig.ImageInferentia) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageNeuronRTD) != s.Obj(userClusterConfig.ImageNeuronRTD) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageNeuronRTDKey, cachedClusterConfig.ImageNeuronRTD) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageNvidia) != s.Obj(userClusterConfig.ImageNvidia) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageNvidiaKey, cachedClusterConfig.ImageNvidia) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageFluentd) != s.Obj(userClusterConfig.ImageFluentd) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageFluentdKey, cachedClusterConfig.ImageFluentd) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageStatsd) != s.Obj(userClusterConfig.ImageStatsd) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageStatsdKey, cachedClusterConfig.ImageStatsd) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageIstioProxy) != s.Obj(userClusterConfig.ImageIstioProxy) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageIstioProxyKey, cachedClusterConfig.ImageIstioProxy) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + + if s.Obj(cachedClusterConfig.ImageIstioPilot) != s.Obj(userClusterConfig.ImageIstioPilot) { + return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.ImageIstioPilotKey, cachedClusterConfig.ImageIstioPilot) + } + userClusterConfig.ImageDownloader = cachedClusterConfig.ImageDownloader + if userClusterConfig.Spot != nil && *userClusterConfig.Spot != *cachedClusterConfig.Spot { return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.SpotKey, *cachedClusterConfig.Spot) } @@ -384,14 +444,8 @@ func setConfigFieldsFromCached(userClusterConfig *clusterconfig.Config, cachedCl return errors.Wrap(clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.OnDemandBackupKey, cachedClusterConfig.SpotConfig.OnDemandBackup), clusterconfig.SpotConfigKey) } } - userClusterConfig.SpotConfig = cachedClusterConfig.SpotConfig - if s.Obj(cachedClusterConfig.VPCCIDR) != s.Obj(userClusterConfig.VPCCIDR) { - return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.VPCCIDRKey, cachedClusterConfig.VPCCIDR) - } - userClusterConfig.VPCCIDR = cachedClusterConfig.VPCCIDR - return nil } diff --git a/manager/install.sh b/manager/install.sh index 06f4da6f54..40d798b939 100755 --- a/manager/install.sh +++ b/manager/install.sh @@ -23,64 +23,218 @@ mkdir /workspace arg1="$1" -function ensure_eks() { - # Cluster statuses: https://github.com/aws/aws-sdk-go/blob/master/service/eks/api.go#L2785 +function main() { + if [ "$arg1" = "--update" ]; then + cluster_configure + else + cluster_up + fi +} + +function cluster_up() { + create_eks + + start_pre_download_images + + if [ "$CORTEX_API_LOAD_BALANCER_SCHEME" == "internal" ] && [ "$CORTEX_API_GATEWAY" == "public" ]; then + create_vpc_link + fi + + echo -n "○ updating cluster configuration " + setup_configmap + setup_secrets + echo "✓" + + echo -n "○ configuring networking " + setup_istio + python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/apis.yaml.j2 > /workspace/apis.yaml + kubectl apply -f /workspace/apis.yaml >/dev/null + echo "✓" + + echo -n "○ configuring autoscaling " + python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/cluster-autoscaler.yaml.j2 > /workspace/cluster-autoscaler.yaml + kubectl apply -f /workspace/cluster-autoscaler.yaml >/dev/null + echo "✓" + + echo -n "○ configuring logging " + envsubst < manifests/fluentd.yaml | kubectl apply -f - >/dev/null + echo "✓" + + echo -n "○ configuring metrics " + envsubst < manifests/metrics-server.yaml | kubectl apply -f - >/dev/null + envsubst < manifests/statsd.yaml | kubectl apply -f - >/dev/null + echo "✓" + + if [[ "$CORTEX_INSTANCE_TYPE" == p* ]] || [[ "$CORTEX_INSTANCE_TYPE" == g* ]]; then + echo -n "○ configuring gpu support " + envsubst < manifests/nvidia.yaml | kubectl apply -f - >/dev/null + echo "✓" + fi + + if [[ "$CORTEX_INSTANCE_TYPE" == inf* ]]; then + echo -n "○ configuring inf support " + envsubst < manifests/inferentia.yaml | kubectl apply -f - >/dev/null + echo "✓" + fi + + if [ "$CORTEX_API_LOAD_BALANCER_SCHEME" == "internal" ] && [ "$CORTEX_API_GATEWAY" == "public" ]; then + create_vpc_link_integration + fi + + restart_operator + + validate_cortex + + await_pre_download_images + + if [ "$CORTEX_OPERATOR_LOAD_BALANCER_SCHEME" == "internal" ]; then + echo -e "\ncortex is ready! (it may take a few minutes for your private operator load balancer to finish initializing, but you may now set up VPC Peering)" + else + echo -e "\ncortex is ready!" + fi +} + +function cluster_configure() { + check_eks + + resize_nodegroup + + echo -n "○ updating cluster configuration " + setup_configmap + setup_secrets + echo "✓" + + restart_operator + + validate_cortex + + echo -e "\ncortex is ready!" +} + +# creates the eks cluster and configures kubectl +function create_eks() { set +e cluster_info=$(eksctl get cluster --name=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION -o json 2> /dev/null) cluster_info_exit_code=$? set -e - # No cluster - if [ $cluster_info_exit_code -ne 0 ]; then - if [ "$arg1" = "--update" ]; then - echo "error: there is no cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION; please update your configuration to point to an existing cortex cluster or create a cortex cluster with \`cortex cluster up\`" + # cluster already exists + if [ $cluster_info_exit_code -eq 0 ]; then + set +e + # cluster statuses: https://github.com/aws/aws-sdk-go/blob/master/service/eks/api.go#L6883 + cluster_status=$(echo "$cluster_info" | jq -r 'first | .Status') + set -e + + if [ "$cluster_status" == "ACTIVE" ]; then + echo "error: there is already a cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION" + exit 1 + elif [ "$cluster_status" == "DELETING" ]; then + echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is currently spinning down; please try again once it is completely deleted (may take a few minutes)" + exit 1 + elif [ "$cluster_status" == "CREATING" ]; then + echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is currently spinning up; please try again once it is ready" + exit 1 + elif [ "$cluster_status" == "UPDATING" ]; then + echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is currently updating; please try again once it is ready" + exit 1 + elif [ "$cluster_status" == "FAILED" ]; then + echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is failed; delete it with \`eksctl delete cluster --name=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION\` and try again" + exit 1 + else # cluster exists, but is has an unknown status (unexpected) + echo "error: there is already a cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION (status: ${cluster_status})" exit 1 fi + fi - echo -e "○ spinning up the cluster ... (this will take about 15 minutes)\n" - - python generate_eks.py $CORTEX_CLUSTER_CONFIG_FILE > /workspace/eks.yaml - eksctl create cluster --timeout=$EKSCTL_TIMEOUT --install-neuron-plugin=false -f /workspace/eks.yaml - - if [ "$CORTEX_SPOT" == "True" ]; then - asg_info=$(aws autoscaling describe-auto-scaling-groups --region $CORTEX_REGION --query "AutoScalingGroups[?contains(Tags[?Key==\`alpha.eksctl.io/cluster-name\`].Value, \`$CORTEX_CLUSTER_NAME\`)]|[?contains(Tags[?Key==\`alpha.eksctl.io/nodegroup-name\`].Value, \`ng-cortex-worker-spot\`)]") - asg_name=$(echo "$asg_info" | jq -r 'first | .AutoScalingGroupName') - if [ "$asg_name" = "" ] || [ "$asg_name" = "null" ]; then - echo -e "unable to find autoscaling group name from info:\n$asg_info" - exit 1 - fi - aws autoscaling suspend-processes --region $CORTEX_REGION --auto-scaling-group-name $asg_name --scaling-processes AZRebalance - fi + echo -e "○ spinning up the cluster (this will take about 15 minutes) ...\n" + python generate_eks.py $CORTEX_CLUSTER_CONFIG_FILE > /workspace/eks.yaml + eksctl create cluster --timeout=$EKSCTL_TIMEOUT --install-neuron-plugin=false -f /workspace/eks.yaml + echo - echo # cluster is ready - return + if [ "${CORTEX_SPOT,,}" == "true" ]; then + suspend_spot_az_rebalance fi + write_kubeconfig +} + +# checks that the eks cluster is active and configures kubectl +function check_eks() { set +e - cluster_status=$(echo "$cluster_info" | jq -r 'first | .Status') + cluster_info=$(eksctl get cluster --name=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION -o json 2> /dev/null) + cluster_info_exit_code=$? set -e - if [[ "$cluster_status" == "ACTIVE" ]] && [[ "$arg1" != "--update" ]]; then - echo "error: there is already a cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION" + # no cluster + if [ $cluster_info_exit_code -ne 0 ]; then + echo "error: there is no cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION; please update your configuration to point to an existing cortex cluster or create a cortex cluster with \`cortex cluster up\`" exit 1 fi + set +e + # cluster statuses: https://github.com/aws/aws-sdk-go/blob/master/service/eks/api.go#L6883 + cluster_status=$(echo "$cluster_info" | jq -r 'first | .Status') + set -e + if [ "$cluster_status" == "DELETING" ]; then echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is currently spinning down; please try again once it is completely deleted (may take a few minutes)" exit 1 - fi - - if [ "$cluster_status" == "CREATING" ]; then + elif [ "$cluster_status" == "CREATING" ]; then echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is currently spinning up; please try again once it is ready" exit 1 - fi - - if [ "$cluster_status" == "FAILED" ]; then + elif [ "$cluster_status" == "UPDATING" ]; then + echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is currently updating; please try again once it is ready" + exit 1 + elif [ "$cluster_status" == "FAILED" ]; then echo "error: your cortex cluster named \"$CORTEX_CLUSTER_NAME\" in $CORTEX_REGION is failed; delete it with \`eksctl delete cluster --name=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION\` and try again" exit 1 fi - # Check for change in min/max instances + # cluster status is ACTIVE or unknown (in which case we'll assume things are ok instead of erroring) + + write_kubeconfig +} + +function write_kubeconfig() { + eksctl utils write-kubeconfig --cluster=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION | grep -v "saved kubeconfig as" | grep -v "using region" | grep -v "eksctl version" || true + out=$(kubectl get pods 2>&1 || true); if [[ "$out" == *"must be logged in to the server"* ]]; then echo "error: your aws iam user does not have access to this cluster; to grant access, see https://docs.cortex.dev/v/${CORTEX_VERSION_MINOR}/miscellaneous/security#running-cortex-cluster-commands-from-different-iam-users"; exit 1; fi +} + +function setup_configmap() { + kubectl -n=default create configmap 'cluster-config' \ + --from-file='cluster.yaml'=$CORTEX_CLUSTER_CONFIG_FILE \ + -o yaml --dry-run=client | kubectl apply -f - >/dev/null + + kubectl -n=default create configmap 'env-vars' \ + --from-literal='CORTEX_VERSION'=$CORTEX_VERSION \ + --from-literal='CORTEX_REGION'=$CORTEX_REGION \ + --from-literal='AWS_REGION'=$CORTEX_REGION \ + --from-literal='CORTEX_BUCKET'=$CORTEX_BUCKET \ + --from-literal='CORTEX_TELEMETRY_DISABLE'=$CORTEX_TELEMETRY_DISABLE \ + --from-literal='CORTEX_TELEMETRY_SENTRY_DSN'=$CORTEX_TELEMETRY_SENTRY_DSN \ + --from-literal='CORTEX_TELEMETRY_SEGMENT_WRITE_KEY'=$CORTEX_TELEMETRY_SEGMENT_WRITE_KEY \ + --from-literal='CORTEX_DEV_DEFAULT_PREDICTOR_IMAGE_REGISTRY'=$CORTEX_DEV_DEFAULT_PREDICTOR_IMAGE_REGISTRY \ + -o yaml --dry-run=client | kubectl apply -f - >/dev/null +} + +function setup_secrets() { + kubectl -n=default create secret generic 'aws-credentials' \ + --from-literal='AWS_ACCESS_KEY_ID'=$CLUSTER_AWS_ACCESS_KEY_ID \ + --from-literal='AWS_SECRET_ACCESS_KEY'=$CLUSTER_AWS_SECRET_ACCESS_KEY \ + -o yaml --dry-run=client | kubectl apply -f - >/dev/null +} + +function restart_operator() { + echo -n "○ starting operator " + kubectl -n=default delete --ignore-not-found=true --grace-period=10 deployment operator >/dev/null 2>&1 + printed_dot="false" + until [ "$(kubectl -n=default get pods -l workloadID=operator -o json | jq -j '.items | length')" -eq "0" ]; do echo -n "."; printed_dot="true"; sleep 2; done + envsubst < manifests/operator.yaml | kubectl apply -f - >/dev/null + if [ "$printed_dot" == "true" ]; then echo " ✓"; else echo "✓"; fi +} + +function resize_nodegroup() { + # check for change in min/max instances asg_on_demand_info=$(aws autoscaling describe-auto-scaling-groups --region $CORTEX_REGION --query "AutoScalingGroups[?contains(Tags[?Key==\`alpha.eksctl.io/cluster-name\`].Value, \`$CORTEX_CLUSTER_NAME\`)]|[?contains(Tags[?Key==\`alpha.eksctl.io/nodegroup-name\`].Value, \`ng-cortex-worker-on-demand\`)]") asg_on_demand_length=$(echo "$asg_on_demand_info" | jq -r 'length') asg_on_demand_name="" @@ -162,114 +316,81 @@ function ensure_eks() { fi } -function main() { - mkdir -p /workspace - - # create cluster (if it doesn't already exist) - ensure_eks - - # create VPC Link for API Gateway - if [ "$arg1" != "--update" ] && [ "$CORTEX_API_LOAD_BALANCER_SCHEME" == "internal" ] && [ "$CORTEX_API_GATEWAY" == "public" ]; then - vpc_id=$(aws ec2 describe-vpcs --region $CORTEX_REGION --filters Name=tag:eksctl.cluster.k8s.io/v1alpha1/cluster-name,Values=$CORTEX_CLUSTER_NAME | jq .Vpcs[0].VpcId | tr -d '"') - if [ "$vpc_id" = "" ] || [ "$vpc_id" = "null" ]; then - echo "unable to find cortex vpc" - exit 1 - fi - - # filter all private subnets belonging to cortex cluster - private_subnets=$(aws ec2 describe-subnets --region $CORTEX_REGION --filters Name=vpc-id,Values=$vpc_id Name=tag:Name,Values=*Private* | jq -s '.[].Subnets[].SubnetId' | tr -d '"') - if [ "$private_subnets" = "" ] || [ "$private_subnets" = "null" ]; then - echo "unable to find cortex private subnets" - exit 1 - fi - - # get default security group for cortex VPC - default_security_group=$(aws ec2 describe-security-groups --region $CORTEX_REGION --filters Name=vpc-id,Values=$vpc_id Name=group-name,Values=default | jq -c .SecurityGroups[].GroupId | tr -d '"') - if [ "$default_security_group" = "" ] || [ "$default_security_group" = "null" ]; then - echo "unable to find cortex default security group" - exit 1 - fi - - # create VPC Link - create_vpc_link_output=$(aws apigatewayv2 create-vpc-link --region $CORTEX_REGION --tags "$CORTEX_TAGS_JSON" --name $CORTEX_CLUSTER_NAME --subnet-ids $private_subnets --security-group-ids $default_security_group) - vpc_link_id=$(echo $create_vpc_link_output | jq .VpcLinkId | tr -d '"') - if [ "$vpc_link_id" = "" ] || [ "$vpc_link_id" = "null" ]; then - echo -e "unable to extract vpc link ID from create-vpc-link output:\n$create_vpc_link_output" - exit 1 - fi +function suspend_spot_az_rebalance() { + asg_info=$(aws autoscaling describe-auto-scaling-groups --region $CORTEX_REGION --query "AutoScalingGroups[?contains(Tags[?Key==\`alpha.eksctl.io/cluster-name\`].Value, \`$CORTEX_CLUSTER_NAME\`)]|[?contains(Tags[?Key==\`alpha.eksctl.io/nodegroup-name\`].Value, \`ng-cortex-worker-spot\`)]") + asg_name=$(echo "$asg_info" | jq -r 'first | .AutoScalingGroupName') + if [ "$asg_name" = "" ] || [ "$asg_name" = "null" ]; then + echo -e "unable to find autoscaling group name from info:\n$asg_info" + exit 1 fi + aws autoscaling suspend-processes --region $CORTEX_REGION --auto-scaling-group-name $asg_name --scaling-processes AZRebalance +} - eksctl utils write-kubeconfig --cluster=$CORTEX_CLUSTER_NAME --region=$CORTEX_REGION | grep -v "saved kubeconfig as" | grep -v "using region" | grep -v "eksctl version" || true - out=$(kubectl get pods 2>&1 || true); if [[ "$out" == *"must be logged in to the server"* ]]; then echo "error: your aws iam user does not have access to this cluster; to grant access, see https://docs.cortex.dev/v/${CORTEX_VERSION_MINOR}/miscellaneous/security#running-cortex-cluster-commands-from-different-iam-users"; exit 1; fi - - # pre-download images on cortex cluster up - if [ "$arg1" != "--update" ]; then - if [[ "$CORTEX_INSTANCE_TYPE" == p* ]] || [[ "$CORTEX_INSTANCE_TYPE" == g* ]]; then - envsubst < manifests/image-downloader-gpu.yaml | kubectl apply -f - &>/dev/null - elif [[ "$CORTEX_INSTANCE_TYPE" == inf* ]]; then - envsubst < manifests/image-downloader-inf.yaml | kubectl apply -f - &>/dev/null - else - envsubst < manifests/image-downloader-cpu.yaml | kubectl apply -f - &>/dev/null - fi +function create_vpc_link() { + # get VPC ID + vpc_id=$(aws ec2 describe-vpcs --region $CORTEX_REGION --filters Name=tag:eksctl.cluster.k8s.io/v1alpha1/cluster-name,Values=$CORTEX_CLUSTER_NAME | jq .Vpcs[0].VpcId | tr -d '"') + if [ "$vpc_id" = "" ] || [ "$vpc_id" = "null" ]; then + echo "unable to find cortex vpc" + exit 1 fi - echo -n "○ updating cluster configuration " - setup_configmap - setup_secrets - echo "✓" + # filter all private subnets belonging to cortex cluster + private_subnets=$(aws ec2 describe-subnets --region $CORTEX_REGION --filters Name=vpc-id,Values=$vpc_id Name=tag:Name,Values=*Private* | jq -s '.[].Subnets[].SubnetId' | tr -d '"') + if [ "$private_subnets" = "" ] || [ "$private_subnets" = "null" ]; then + echo "unable to find cortex private subnets" + exit 1 + fi - echo -n "○ configuring networking " - setup_istio - python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/apis.yaml.j2 > /workspace/apis.yaml - kubectl apply -f /workspace/apis.yaml >/dev/null - echo "✓" + # get default security group for cortex VPC + default_security_group=$(aws ec2 describe-security-groups --region $CORTEX_REGION --filters Name=vpc-id,Values=$vpc_id Name=group-name,Values=default | jq -c .SecurityGroups[].GroupId | tr -d '"') + if [ "$default_security_group" = "" ] || [ "$default_security_group" = "null" ]; then + echo "unable to find cortex default security group" + exit 1 + fi - echo -n "○ configuring autoscaling " - python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/cluster-autoscaler.yaml.j2 > /workspace/cluster-autoscaler.yaml - kubectl apply -f /workspace/cluster-autoscaler.yaml >/dev/null - echo "✓" + # create VPC Link + create_vpc_link_output=$(aws apigatewayv2 create-vpc-link --region $CORTEX_REGION --tags "$CORTEX_TAGS_JSON" --name $CORTEX_CLUSTER_NAME --subnet-ids $private_subnets --security-group-ids $default_security_group) + vpc_link_id=$(echo $create_vpc_link_output | jq .VpcLinkId | tr -d '"') + if [ "$vpc_link_id" = "" ] || [ "$vpc_link_id" = "null" ]; then + echo -e "unable to extract vpc link ID from create-vpc-link output:\n$create_vpc_link_output" + exit 1 + fi +} - echo -n "○ configuring logging " - envsubst < manifests/fluentd.yaml | kubectl apply -f - >/dev/null +function create_vpc_link_integration() { + echo -n "○ creating api gateway vpc link integration " + api_id=$(python get_api_gateway_id.py) + python create_gateway_integration.py $api_id $vpc_link_id echo "✓" + echo -n "○ waiting for api gateway vpc link integration " + until [ "$(aws apigatewayv2 get-vpc-link --region $CORTEX_REGION --vpc-link-id $vpc_link_id | jq .VpcLinkStatus | tr -d '"')" = "AVAILABLE" ]; do echo -n "."; sleep 3; done + echo " ✓" +} - echo -n "○ configuring metrics " - envsubst < manifests/metrics-server.yaml | kubectl apply -f - >/dev/null - envsubst < manifests/statsd.yaml | kubectl apply -f - >/dev/null - echo "✓" +function setup_istio() { + envsubst < manifests/istio-namespace.yaml | kubectl apply -f - >/dev/null - if [[ "$CORTEX_INSTANCE_TYPE" == p* ]] || [[ "$CORTEX_INSTANCE_TYPE" == g* ]]; then - echo -n "○ configuring gpu support " - envsubst < manifests/nvidia.yaml | kubectl apply -f - >/dev/null - echo "✓" + if ! grep -q "istio-customgateway-certs" <<< $(kubectl get secret -n istio-system); then + WEBSITE=localhost + openssl req -subj "/C=US/CN=$WEBSITE" -newkey rsa:2048 -nodes -keyout $WEBSITE.key -x509 -days 3650 -out $WEBSITE.crt >/dev/null 2>&1 + kubectl create -n istio-system secret tls istio-customgateway-certs --key $WEBSITE.key --cert $WEBSITE.crt >/dev/null fi - if [[ "$CORTEX_INSTANCE_TYPE" == inf* ]]; then - echo -n "○ configuring inf support " - envsubst < manifests/inferentia.yaml | kubectl apply -f - >/dev/null - echo "✓" - fi + python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/istio.yaml.j2 > /workspace/istio.yaml + output_if_error istio-${ISTIO_VERSION}/bin/istioctl install -f /workspace/istio.yaml +} - # add VPC Link integration to API Gateway - if [ "$arg1" != "--update" ] && [ "$CORTEX_API_LOAD_BALANCER_SCHEME" == "internal" ] && [ "$CORTEX_API_GATEWAY" == "public" ]; then - echo -n "○ creating api gateway vpc link integration " - api_id=$(python get_api_gateway_id.py) - python create_gateway_integration.py $api_id $vpc_link_id - echo "✓" - echo -n "○ waiting for api gateway vpc link integration " - until [ "$(aws apigatewayv2 get-vpc-link --region $CORTEX_REGION --vpc-link-id $vpc_link_id | jq .VpcLinkStatus | tr -d '"')" = "AVAILABLE" ]; do echo -n "."; sleep 3; done - echo " ✓" +function start_pre_download_images() { + if [[ "$CORTEX_INSTANCE_TYPE" == p* ]] || [[ "$CORTEX_INSTANCE_TYPE" == g* ]]; then + envsubst < manifests/image-downloader-gpu.yaml | kubectl apply -f - &>/dev/null + elif [[ "$CORTEX_INSTANCE_TYPE" == inf* ]]; then + envsubst < manifests/image-downloader-inf.yaml | kubectl apply -f - &>/dev/null + else + envsubst < manifests/image-downloader-cpu.yaml | kubectl apply -f - &>/dev/null fi +} - echo -n "○ starting operator " - kubectl -n=default delete --ignore-not-found=true --grace-period=10 deployment operator >/dev/null 2>&1 - printed_dot="false" - until [ "$(kubectl -n=default get pods -l workloadID=operator -o json | jq -j '.items | length')" -eq "0" ]; do echo -n "."; printed_dot="true"; sleep 2; done - envsubst < manifests/operator.yaml | kubectl apply -f - >/dev/null - if [ "$printed_dot" == "true" ]; then echo " ✓"; else echo "✓"; fi - - validate_cortex - +function await_pre_download_images() { if kubectl get daemonset image-downloader -n=default &>/dev/null; then echo -n "○ downloading docker images " printed_dot="false" @@ -284,50 +405,6 @@ function main() { kubectl -n=default delete --ignore-not-found=true daemonset image-downloader &>/dev/null if [ "$printed_dot" == "true" ]; then echo " ✓"; else echo "✓"; fi fi - - if [ "$arg1" != "--update" ] && [ "$CORTEX_OPERATOR_LOAD_BALANCER_SCHEME" == "internal" ]; then - echo -e "\ncortex is ready! (it may take a few minutes for your private operator load balancer to finish initializing, but you may now set up VPC Peering)" - else - echo -e "\ncortex is ready!" - fi -} - -function setup_configmap() { - kubectl -n=default create configmap 'cluster-config' \ - --from-file='cluster.yaml'=$CORTEX_CLUSTER_CONFIG_FILE \ - -o yaml --dry-run=client | kubectl apply -f - >/dev/null - - kubectl -n=default create configmap 'env-vars' \ - --from-literal='CORTEX_VERSION'=$CORTEX_VERSION \ - --from-literal='CORTEX_REGION'=$CORTEX_REGION \ - --from-literal='AWS_REGION'=$CORTEX_REGION \ - --from-literal='CORTEX_BUCKET'=$CORTEX_BUCKET \ - --from-literal='CORTEX_TELEMETRY_DISABLE'=$CORTEX_TELEMETRY_DISABLE \ - --from-literal='CORTEX_TELEMETRY_SENTRY_DSN'=$CORTEX_TELEMETRY_SENTRY_DSN \ - --from-literal='CORTEX_TELEMETRY_SEGMENT_WRITE_KEY'=$CORTEX_TELEMETRY_SEGMENT_WRITE_KEY \ - --from-literal='CORTEX_DEV_DEFAULT_PREDICTOR_IMAGE_REGISTRY'=$CORTEX_DEV_DEFAULT_PREDICTOR_IMAGE_REGISTRY \ - -o yaml --dry-run=client | kubectl apply -f - >/dev/null -} - -function setup_secrets() { - kubectl -n=default create secret generic 'aws-credentials' \ - --from-literal='AWS_ACCESS_KEY_ID'=$CLUSTER_AWS_ACCESS_KEY_ID \ - --from-literal='AWS_SECRET_ACCESS_KEY'=$CLUSTER_AWS_SECRET_ACCESS_KEY \ - -o yaml --dry-run=client | kubectl apply -f - >/dev/null -} - -function setup_istio() { - envsubst < manifests/istio-namespace.yaml | kubectl apply -f - >/dev/null - - if ! grep -q "istio-customgateway-certs" <<< $(kubectl get secret -n istio-system); then - WEBSITE=localhost - openssl req -subj "/C=US/CN=$WEBSITE" -newkey rsa:2048 -nodes -keyout $WEBSITE.key -x509 -days 3650 -out $WEBSITE.crt >/dev/null 2>&1 - kubectl create -n istio-system secret tls istio-customgateway-certs --key $WEBSITE.key --cert $WEBSITE.crt >/dev/null - fi - - python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/istio.yaml.j2 > /workspace/istio.yaml - - output_if_error istio-${ISTIO_VERSION}/bin/istioctl install -f /workspace/istio.yaml } function validate_cortex() { From d2978362a27ab09b3dcce7cad5347282817fc417 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 7 Oct 2020 19:33:34 -0700 Subject: [PATCH 02/10] Add additional debugging info to cluster up failures --- manager/install.sh | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/manager/install.sh b/manager/install.sh index 40d798b939..d7cbd16ef1 100755 --- a/manager/install.sh +++ b/manager/install.sh @@ -421,6 +421,7 @@ function validate_cortex() { operator_endpoint="" operator_pod_name="" operator_pod_is_ready="" + operator_pod_status="" while true; do # 30 minute timeout @@ -430,6 +431,9 @@ function validate_cortex() { echo -e "\ndebugging info:" echo "operator pod name: $operator_pod_name" echo "operator pod is ready: $operator_pod_is_ready" + if [ "$operator_pod_status" != "" ]; then + echo "operator pod status: $operator_pod_status" + fi echo "operator pod ready cycles: $operator_pod_ready_cycles" echo "api load balancer status: $api_load_balancer" echo "operator load balancer status: $operator_load_balancer" @@ -454,8 +458,15 @@ function validate_cortex() { operator_pod_is_ready=$(kubectl -n=default get "$operator_pod_name" -o jsonpath='{.status.containerStatuses[0].ready}') if [ "$operator_pod_is_ready" == "true" ]; then ((operator_pod_ready_cycles++)) + operator_pod_status="" else operator_pod_ready_cycles=0 + operator_pod_status=$(kubectl -n=default get "$operator_pod_name" -o jsonpath='{.status.containerStatuses[0]}') + if [[ "$operator_pod_status" == *"ImagePullBackOff"* ]]; then + echo "error: the operator image you specified could not be pulled:" + echo $operator_pod_status + exit 1 + fi fi fi From cef5fdeed5bf1fe565703f0933f49d5b83fb57e5 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 7 Oct 2020 19:34:18 -0700 Subject: [PATCH 03/10] Update formatting --- manager/install.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager/install.sh b/manager/install.sh index d7cbd16ef1..dd3176c308 100755 --- a/manager/install.sh +++ b/manager/install.sh @@ -463,7 +463,7 @@ function validate_cortex() { operator_pod_ready_cycles=0 operator_pod_status=$(kubectl -n=default get "$operator_pod_name" -o jsonpath='{.status.containerStatuses[0]}') if [[ "$operator_pod_status" == *"ImagePullBackOff"* ]]; then - echo "error: the operator image you specified could not be pulled:" + echo -e "\nerror: the operator image you specified could not be pulled:" echo $operator_pod_status exit 1 fi From 4c8b31a71ebbe051f91fc39a915959721de87ffc Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 7 Oct 2020 19:37:32 -0700 Subject: [PATCH 04/10] Update install.sh --- manager/install.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/manager/install.sh b/manager/install.sh index dd3176c308..b6059841e2 100755 --- a/manager/install.sh +++ b/manager/install.sh @@ -445,6 +445,7 @@ function validate_cortex() { echo "operator curl response (404 is expected):" curl --max-time 3 $operator_endpoint fi + echo exit 1 fi From 3fb931206124bc3e6e47a527f62c85cf18c63417 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 8 Oct 2020 09:03:05 -0700 Subject: [PATCH 05/10] Remove log_group cluster configuration and use cluster name instead --- cli/cmd/cluster.go | 2 +- cli/cmd/lib_cluster_config.go | 10 +--------- docs/cluster-management/config.md | 3 --- docs/cluster-management/uninstall.md | 4 ++-- docs/contributing/development.md | 1 - manager/manifests/fluentd.yaml | 2 +- pkg/operator/resources/batchapi/logging.go | 2 +- pkg/operator/resources/batchapi/metrics.go | 8 ++++---- pkg/operator/resources/realtimeapi/logs.go | 2 +- pkg/types/clusterconfig/clusterconfig.go | 9 --------- pkg/types/clusterconfig/config_key.go | 2 -- 11 files changed, 11 insertions(+), 34 deletions(-) diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index 698ef79874..c1fddedd85 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -183,7 +183,7 @@ var _upCmd = &cobra.Command{ exit.Error(err) } - err = createLogGroupIfNotFound(awsClient, clusterConfig.LogGroup, clusterConfig.Tags) + err = createLogGroupIfNotFound(awsClient, clusterConfig.ClusterName, clusterConfig.Tags) if err != nil { exit.Error(err) } diff --git a/cli/cmd/lib_cluster_config.go b/cli/cmd/lib_cluster_config.go index 6c2c1f49df..01fdea93d2 100644 --- a/cli/cmd/lib_cluster_config.go +++ b/cli/cmd/lib_cluster_config.go @@ -276,11 +276,6 @@ func setConfigFieldsFromCached(userClusterConfig *clusterconfig.Config, cachedCl } userClusterConfig.Bucket = cachedClusterConfig.Bucket - if userClusterConfig.LogGroup != "" && userClusterConfig.LogGroup != cachedClusterConfig.LogGroup { - return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.LogGroupKey, cachedClusterConfig.LogGroup) - } - userClusterConfig.LogGroup = cachedClusterConfig.LogGroup - if userClusterConfig.InstanceType != nil && *userClusterConfig.InstanceType != *cachedClusterConfig.InstanceType { return clusterconfig.ErrorConfigCannotBeChangedOnUpdate(clusterconfig.InstanceTypeKey, *cachedClusterConfig.InstanceType) } @@ -544,7 +539,7 @@ func confirmInstallClusterConfig(clusterConfig *clusterconfig.Config, awsCreds A if clusterConfig.SubnetVisibility == clusterconfig.PrivateSubnetVisibility { privateSubnetMsg = ", and will use private subnets for all EC2 instances" } - fmt.Printf("cortex will also create an s3 bucket (%s) and a cloudwatch log group (%s)%s\n\n", clusterConfig.Bucket, clusterConfig.LogGroup, privateSubnetMsg) + fmt.Printf("cortex will also create an s3 bucket (%s) and a cloudwatch log group (%s)%s\n\n", clusterConfig.Bucket, clusterConfig.ClusterName, privateSubnetMsg) if clusterConfig.APIGatewaySetting == clusterconfig.NoneAPIGatewaySetting { fmt.Print("warning: you've disabled API Gateway cluster-wide, so APIs will not be able to create API Gateway endpoints (they will still be reachable via the API load balancer; see https://docs.cortex.dev/deployments/networking for more information)\n\n") @@ -592,9 +587,6 @@ func clusterConfigConfirmationStr(clusterConfig clusterconfig.Config, awsCreds A } items.Add(clusterconfig.BucketUserKey, clusterConfig.Bucket) items.Add(clusterconfig.ClusterNameUserKey, clusterConfig.ClusterName) - if clusterConfig.LogGroup != defaultConfig.LogGroup { - items.Add(clusterconfig.LogGroupUserKey, clusterConfig.LogGroup) - } items.Add(clusterconfig.InstanceTypeUserKey, *clusterConfig.InstanceType) items.Add(clusterconfig.MinInstancesUserKey, *clusterConfig.MinInstances) diff --git a/docs/cluster-management/config.md b/docs/cluster-management/config.md index 6a4c359fbd..313cdb50c8 100644 --- a/docs/cluster-management/config.md +++ b/docs/cluster-management/config.md @@ -63,9 +63,6 @@ operator_load_balancer_scheme: internet-facing # must be "internet-facing" or " # if set to "none", no APIs will be allowed to use API Gateway api_gateway: public # must be "public" or "none" -# CloudWatch log group for cortex (default: ) -log_group: cortex - # additional tags to assign to aws resources for labelling and cost allocation (by default, all resources will be tagged with cortex.dev/cluster-name=) tags: # : map of key/value pairs diff --git a/docs/cluster-management/uninstall.md b/docs/cluster-management/uninstall.md index db2f5733bb..7482474dc5 100644 --- a/docs/cluster-management/uninstall.md +++ b/docs/cluster-management/uninstall.md @@ -39,8 +39,8 @@ aws s3 ls # delete the S3 bucket aws s3 rb --force s3:// -# delete the log group (replace with what was configured during installation, default: cortex) -aws logs describe-log-groups --log-group-name-prefix= --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {} +# delete the log group (replace with the name of your cluster, default: cortex) +aws logs describe-log-groups --log-group-name-prefix= --query logGroups[*].[logGroupName] --output text | xargs -I {} aws logs delete-log-group --log-group-name {} ``` If you've configured a custom domain for your APIs, you may wish to remove the SSL Certificate and Hosted Zone for the domain by following these [instructions](../guides/custom-domain.md#cleanup). diff --git a/docs/contributing/development.md b/docs/contributing/development.md index b9b5e14ea4..5cce28dff8 100644 --- a/docs/contributing/development.md +++ b/docs/contributing/development.md @@ -139,7 +139,6 @@ min_instances: 2 max_instances: 5 bucket: cortex-cluster- region: us-west-2 -log_group: cortex cluster_name: cortex image_operator: XXXXXXXX.dkr.ecr.us-west-2.amazonaws.com/cortexlabs/operator:latest diff --git a/manager/manifests/fluentd.yaml b/manager/manifests/fluentd.yaml index a5b814bef4..a1aabf3139 100644 --- a/manager/manifests/fluentd.yaml +++ b/manager/manifests/fluentd.yaml @@ -199,7 +199,7 @@ spec: - name: AWS_REGION value: $CORTEX_REGION - name: LOG_GROUP_NAME - value: $CORTEX_LOG_GROUP + value: $CORTEX_CLUSTER_NAME - name: K8S_NODE_NAME # used by fluentd to avoid a k8s query valueFrom: fieldRef: diff --git a/pkg/operator/resources/batchapi/logging.go b/pkg/operator/resources/batchapi/logging.go index 609b93c0a7..f18f4e6e2e 100644 --- a/pkg/operator/resources/batchapi/logging.go +++ b/pkg/operator/resources/batchapi/logging.go @@ -30,7 +30,7 @@ import ( ) func logGroupNameForAPI(apiName string) string { - return fmt.Sprintf("%s/%s", config.Cluster.LogGroup, apiName) + return fmt.Sprintf("%s/%s", config.Cluster.ClusterName, apiName) } func logGroupNameForJob(jobKey spec.JobKey) string { diff --git a/pkg/operator/resources/batchapi/metrics.go b/pkg/operator/resources/batchapi/metrics.go index 996f028fea..6fb30e11dc 100644 --- a/pkg/operator/resources/batchapi/metrics.go +++ b/pkg/operator/resources/batchapi/metrics.go @@ -181,7 +181,7 @@ func batchMetricsDef(jobKey *spec.JobKey, period int64) []*cloudwatch.MetricData Label: aws.String("Succeeded"), MetricStat: &cloudwatch.MetricStat{ Metric: &cloudwatch.Metric{ - Namespace: aws.String(config.Cluster.LogGroup), + Namespace: aws.String(config.Cluster.ClusterName), MetricName: aws.String("Succeeded"), Dimensions: getJobDimensionsCounter(jobKey), }, @@ -194,7 +194,7 @@ func batchMetricsDef(jobKey *spec.JobKey, period int64) []*cloudwatch.MetricData Label: aws.String("Failed"), MetricStat: &cloudwatch.MetricStat{ Metric: &cloudwatch.Metric{ - Namespace: aws.String(config.Cluster.LogGroup), + Namespace: aws.String(config.Cluster.ClusterName), MetricName: aws.String("Failed"), Dimensions: getJobDimensionsCounter(jobKey), }, @@ -207,7 +207,7 @@ func batchMetricsDef(jobKey *spec.JobKey, period int64) []*cloudwatch.MetricData Label: aws.String("AverageTimePerBatch"), MetricStat: &cloudwatch.MetricStat{ Metric: &cloudwatch.Metric{ - Namespace: aws.String(config.Cluster.LogGroup), + Namespace: aws.String(config.Cluster.ClusterName), MetricName: aws.String("TimePerBatch"), Dimensions: getJobDimensionsHistogram(jobKey), }, @@ -220,7 +220,7 @@ func batchMetricsDef(jobKey *spec.JobKey, period int64) []*cloudwatch.MetricData Label: aws.String("Total"), MetricStat: &cloudwatch.MetricStat{ Metric: &cloudwatch.Metric{ - Namespace: aws.String(config.Cluster.LogGroup), + Namespace: aws.String(config.Cluster.ClusterName), MetricName: aws.String("TimePerBatch"), Dimensions: getJobDimensionsHistogram(jobKey), }, diff --git a/pkg/operator/resources/realtimeapi/logs.go b/pkg/operator/resources/realtimeapi/logs.go index 47b0cf3a9f..720dbda5d1 100644 --- a/pkg/operator/resources/realtimeapi/logs.go +++ b/pkg/operator/resources/realtimeapi/logs.go @@ -207,7 +207,7 @@ func getLogStreams(logGroupName string) (strset.Set, error) { } func getLogGroupName(apiName string) string { - return config.Cluster.LogGroup + "/" + apiName + return config.Cluster.ClusterName + "/" + apiName } func writeString(socket *websocket.Conn, message string) { diff --git a/pkg/types/clusterconfig/clusterconfig.go b/pkg/types/clusterconfig/clusterconfig.go index 1adbfa995c..6d62c534bf 100644 --- a/pkg/types/clusterconfig/clusterconfig.go +++ b/pkg/types/clusterconfig/clusterconfig.go @@ -68,7 +68,6 @@ type Config struct { AvailabilityZones []string `json:"availability_zones" yaml:"availability_zones"` SSLCertificateARN *string `json:"ssl_certificate_arn,omitempty" yaml:"ssl_certificate_arn,omitempty"` Bucket string `json:"bucket" yaml:"bucket"` - LogGroup string `json:"log_group" yaml:"log_group"` SubnetVisibility SubnetVisibility `json:"subnet_visibility" yaml:"subnet_visibility"` NATGateway NATGateway `json:"nat_gateway" yaml:"nat_gateway"` APILoadBalancerScheme LoadBalancerScheme `json:"api_load_balancer_scheme" yaml:"api_load_balancer_scheme"` @@ -288,13 +287,6 @@ var UserValidation = &cr.StructValidation{ Validator: validateBucketNameOrEmpty, }, }, - { - StructField: "LogGroup", - StringValidation: &cr.StringValidation{ - MaxLength: 63, - }, - DefaultField: "ClusterName", - }, { StructField: "SubnetVisibility", StringValidation: &cr.StringValidation{ @@ -1122,7 +1114,6 @@ func (cc *Config) UserTable() table.KeyValuePairs { items.Add(InstancePoolsUserKey, *cc.SpotConfig.InstancePools) items.Add(OnDemandBackupUserKey, s.YesNo(*cc.SpotConfig.OnDemandBackup)) } - items.Add(LogGroupUserKey, cc.LogGroup) items.Add(SubnetVisibilityUserKey, cc.SubnetVisibility) items.Add(NATGatewayUserKey, cc.NATGateway) items.Add(APILoadBalancerSchemeUserKey, cc.APILoadBalancerScheme) diff --git a/pkg/types/clusterconfig/config_key.go b/pkg/types/clusterconfig/config_key.go index 1126f64fc5..d7eee22df6 100644 --- a/pkg/types/clusterconfig/config_key.go +++ b/pkg/types/clusterconfig/config_key.go @@ -37,7 +37,6 @@ const ( AvailabilityZonesKey = "availability_zones" SSLCertificateARNKey = "ssl_certificate_arn" BucketKey = "bucket" - LogGroupKey = "log_group" SubnetVisibilityKey = "subnet_visibility" NATGatewayKey = "nat_gateway" APILoadBalancerSchemeKey = "api_load_balancer_scheme" @@ -80,7 +79,6 @@ const ( MaxPriceUserKey = "spot max price ($ per hour)" InstancePoolsUserKey = "spot instance pools" OnDemandBackupUserKey = "on demand backup" - LogGroupUserKey = "cloudwatch log group" SubnetVisibilityUserKey = "subnet visibility" NATGatewayUserKey = "nat gateway" APILoadBalancerSchemeUserKey = "api load balancer scheme" From 0a894a3840be3cb2b917bdf29f16e692aa01ed25 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 8 Oct 2020 09:39:11 -0700 Subject: [PATCH 06/10] Add cluster name to s3 prefix for all objects --- pkg/operator/endpoints/get_job.go | 3 ++- pkg/operator/endpoints/logs_job.go | 6 +++-- pkg/operator/endpoints/stop_job.go | 3 ++- pkg/operator/operator/storage.go | 2 +- pkg/operator/resources/batchapi/api.go | 10 ++++---- .../resources/batchapi/in_progress_cache.go | 10 ++++---- pkg/operator/resources/batchapi/job.go | 5 ++-- pkg/operator/resources/batchapi/job_state.go | 4 +-- .../batchapi/manage_resources_cron.go | 2 +- pkg/operator/resources/batchapi/queue.go | 2 +- pkg/operator/resources/realtimeapi/api.go | 10 ++++---- pkg/operator/resources/resources.go | 2 +- pkg/operator/resources/trafficsplitter/api.go | 8 +++--- pkg/types/spec/api.go | 25 +++++++++++-------- pkg/types/spec/job_spec.go | 15 +++++------ 15 files changed, 59 insertions(+), 48 deletions(-) diff --git a/pkg/operator/endpoints/get_job.go b/pkg/operator/endpoints/get_job.go index 62f8dfa950..483403c96b 100644 --- a/pkg/operator/endpoints/get_job.go +++ b/pkg/operator/endpoints/get_job.go @@ -20,6 +20,7 @@ import ( "net/http" "github.com/cortexlabs/cortex/pkg/lib/urls" + "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/resources" "github.com/cortexlabs/cortex/pkg/operator/resources/batchapi" @@ -44,7 +45,7 @@ func GetJob(w http.ResponseWriter, r *http.Request) { return } - jobKey := spec.JobKey{APIName: apiName, ID: jobID} + jobKey := spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName} jobStatus, err := batchapi.GetJobStatus(jobKey) if err != nil { diff --git a/pkg/operator/endpoints/logs_job.go b/pkg/operator/endpoints/logs_job.go index 3fc68f510c..d613586b23 100644 --- a/pkg/operator/endpoints/logs_job.go +++ b/pkg/operator/endpoints/logs_job.go @@ -19,6 +19,7 @@ package endpoints import ( "net/http" + "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/resources" "github.com/cortexlabs/cortex/pkg/operator/resources/batchapi" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -50,7 +51,8 @@ func ReadJobLogs(w http.ResponseWriter, r *http.Request) { defer socket.Close() batchapi.ReadLogs(spec.JobKey{ - APIName: deployedResource.Name, - ID: jobID, + APIName: deployedResource.Name, + ID: jobID, + ClusterName: config.Cluster.ClusterName, }, socket) } diff --git a/pkg/operator/endpoints/stop_job.go b/pkg/operator/endpoints/stop_job.go index 90aa419cce..fdcd76790d 100644 --- a/pkg/operator/endpoints/stop_job.go +++ b/pkg/operator/endpoints/stop_job.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" + "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/resources/batchapi" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -31,7 +32,7 @@ func StopJob(w http.ResponseWriter, r *http.Request) { apiName := vars["apiName"] jobID := vars["jobID"] - err := batchapi.StopJob(spec.JobKey{APIName: apiName, ID: jobID}) + err := batchapi.StopJob(spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName}) if err != nil { respondError(w, r, err) return diff --git a/pkg/operator/operator/storage.go b/pkg/operator/operator/storage.go index 6e19fae791..b8cc6d0b84 100644 --- a/pkg/operator/operator/storage.go +++ b/pkg/operator/operator/storage.go @@ -23,7 +23,7 @@ import ( ) func DownloadAPISpec(apiName string, apiID string) (*spec.API, error) { - s3Key := spec.Key(apiName, apiID) + s3Key := spec.Key(apiName, apiID, config.Cluster.ClusterName) var api spec.API if err := config.AWS.ReadJSONFromS3(&api, config.Cluster.Bucket, s3Key); err != nil { diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index 3dd803aef6..ba4d31be71 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -42,14 +42,14 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, return nil, "", err } - api := spec.GetAPISpec(apiConfig, projectID, "") // Deployment ID not needed for BatchAPI spec + api := spec.GetAPISpec(apiConfig, projectID, "", config.Cluster.ClusterName) // Deployment ID not needed for BatchAPI spec if prevVirtualService == nil { if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil { return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil { + if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { return nil, "", errors.Wrap(err, "upload raw api spec") } @@ -79,7 +79,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil { + if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { return nil, "", errors.Wrap(err, "upload raw api spec") } @@ -153,11 +153,11 @@ func deleteK8sResources(apiName string) error { func deleteS3Resources(apiName string) error { return parallel.RunFirstErr( func() error { - prefix := filepath.Join("apis", apiName) + prefix := filepath.Join(config.Cluster.ClusterName, "apis", apiName) return config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) }, func() error { - prefix := spec.BatchAPIJobPrefix(apiName) + prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName) go config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while return nil }, diff --git a/pkg/operator/resources/batchapi/in_progress_cache.go b/pkg/operator/resources/batchapi/in_progress_cache.go index 79db60e160..24b1b6475e 100644 --- a/pkg/operator/resources/batchapi/in_progress_cache.go +++ b/pkg/operator/resources/batchapi/in_progress_cache.go @@ -29,7 +29,7 @@ var ( ) func inProgressS3Key(jobKey spec.JobKey) string { - return path.Join(_inProgressFilePrefix, jobKey.APIName, jobKey.ID) + return path.Join(config.Cluster.ClusterName, _inProgressFilePrefix, jobKey.APIName, jobKey.ID) } func jobKeyFromInProgressS3Key(s3Key string) spec.JobKey { @@ -37,7 +37,7 @@ func jobKeyFromInProgressS3Key(s3Key string) spec.JobKey { apiName := s3PathSplit[len(s3PathSplit)-2] jobID := s3PathSplit[len(s3PathSplit)-1] - return spec.JobKey{APIName: apiName, ID: jobID} + return spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName} } func uploadInProgressFile(jobKey spec.JobKey) error { @@ -57,7 +57,7 @@ func deleteInProgressFile(jobKey spec.JobKey) error { } func deleteAllInProgressFilesByAPI(apiName string) error { - err := config.AWS.DeleteS3Prefix(config.Cluster.Bucket, path.Join(_inProgressFilePrefix, apiName), true) + err := config.AWS.DeleteS3Prefix(config.Cluster.Bucket, path.Join(config.Cluster.ClusterName, _inProgressFilePrefix, apiName), true) if err != nil { return err } @@ -65,7 +65,7 @@ func deleteAllInProgressFilesByAPI(apiName string) error { } func listAllInProgressJobKeys() ([]spec.JobKey, error) { - s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, _inProgressFilePrefix, false, nil) + s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, path.Join(config.Cluster.ClusterName, _inProgressFilePrefix), false, nil) if err != nil { return nil, err } @@ -79,7 +79,7 @@ func listAllInProgressJobKeys() ([]spec.JobKey, error) { } func listAllInProgressJobKeysByAPI(apiName string) ([]spec.JobKey, error) { - s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, path.Join(_inProgressFilePrefix, apiName), false, nil) + s3Objects, err := config.AWS.ListS3Dir(config.Cluster.Bucket, path.Join(config.Cluster.ClusterName, _inProgressFilePrefix, apiName), false, nil) if err != nil { return nil, err } diff --git a/pkg/operator/resources/batchapi/job.go b/pkg/operator/resources/batchapi/job.go index da3a28fb2b..4e8f934c73 100644 --- a/pkg/operator/resources/batchapi/job.go +++ b/pkg/operator/resources/batchapi/job.go @@ -78,8 +78,9 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err jobID := spec.MonotonicallyDecreasingID() jobKey := spec.JobKey{ - APIName: apiSpec.Name, - ID: jobID, + APIName: apiSpec.Name, + ID: jobID, + ClusterName: config.Cluster.ClusterName, } tags := map[string]string{ diff --git a/pkg/operator/resources/batchapi/job_state.go b/pkg/operator/resources/batchapi/job_state.go index c7722e8d71..bf4c609057 100644 --- a/pkg/operator/resources/batchapi/job_state.go +++ b/pkg/operator/resources/batchapi/job_state.go @@ -135,7 +135,7 @@ func getJobStateFromFiles(jobKey spec.JobKey, lastUpdatedFileMap map[string]time func getMostRecentlySubmittedJobStates(apiName string, count int) ([]*JobState, error) { // a single job state may include 5 files on average, overshoot the number of files needed - s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, spec.BatchAPIJobPrefix(apiName), false, pointer.Int64(int64(count*_averageFilesPerJobState))) + s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName), false, pointer.Int64(int64(count*_averageFilesPerJobState))) if err != nil { return nil, err } @@ -160,7 +160,7 @@ func getMostRecentlySubmittedJobStates(apiName string, count int) ([]*JobState, jobStateCount := 0 for _, jobID := range jobIDOrder { - jobState := getJobStateFromFiles(spec.JobKey{APIName: apiName, ID: jobID}, lastUpdatedMaps[jobID]) + jobState := getJobStateFromFiles(spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName}, lastUpdatedMaps[jobID]) jobStates = append(jobStates, &jobState) jobStateCount++ diff --git a/pkg/operator/resources/batchapi/manage_resources_cron.go b/pkg/operator/resources/batchapi/manage_resources_cron.go index 4beb6c3a63..60a2bfd8d2 100644 --- a/pkg/operator/resources/batchapi/manage_resources_cron.go +++ b/pkg/operator/resources/batchapi/manage_resources_cron.go @@ -141,7 +141,7 @@ func ManageJobResources() error { // existing k8sjob but job is not in progress for jobID := range strset.Difference(k8sJobIDSet, inProgressJobIDSet) { - jobKey := spec.JobKey{APIName: k8sJobMap[jobID].Labels["apiName"], ID: k8sJobMap[jobID].Labels["jobID"]} + jobKey := spec.JobKey{APIName: k8sJobMap[jobID].Labels["apiName"], ID: k8sJobMap[jobID].Labels["jobID"], ClusterName: config.Cluster.ClusterName} // delete both k8sjob and queue err := deleteJobRuntimeResources(jobKey) diff --git a/pkg/operator/resources/batchapi/queue.go b/pkg/operator/resources/batchapi/queue.go index 229487c1ad..c9bc0f6a85 100644 --- a/pkg/operator/resources/batchapi/queue.go +++ b/pkg/operator/resources/batchapi/queue.go @@ -58,7 +58,7 @@ func jobKeyFromQueueURL(queueURL string) spec.JobKey { apiNameSplit := dashSplit[1 : len(dashSplit)-1] apiName := strings.Join(apiNameSplit, "-") - return spec.JobKey{APIName: apiName, ID: jobID} + return spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName} } func createFIFOQueue(jobKey spec.JobKey, tags map[string]string) (string, error) { diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 9311270bad..02f3b12b3c 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -52,14 +52,14 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A deploymentID = prevDeployment.Labels["deploymentID"] } - api := spec.GetAPISpec(apiConfig, projectID, deploymentID) + api := spec.GetAPISpec(apiConfig, projectID, deploymentID, config.Cluster.ClusterName) if prevDeployment == nil { if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil { return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil { + if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { return nil, "", errors.Wrap(err, "upload raw api spec") } @@ -96,7 +96,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil { + if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { return nil, "", errors.Wrap(err, "upload raw api spec") } @@ -152,7 +152,7 @@ func RefreshAPI(apiName string, force bool) (string, error) { return "", err } - api = spec.GetAPISpec(api.API, api.ProjectID, deploymentID()) + api = spec.GetAPISpec(api.API, api.ProjectID, deploymentID(), config.Cluster.ClusterName) if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil { return "", errors.Wrap(err, "upload api spec") @@ -436,7 +436,7 @@ func deleteK8sResources(apiName string) error { } func deleteS3Resources(apiName string) error { - prefix := filepath.Join("apis", apiName) + prefix := filepath.Join(config.Cluster.ClusterName, "apis", apiName) return config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 5631a91ff4..bd50d39b28 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -75,7 +75,7 @@ func GetDeployedResourceByNameOrNil(resourceName string) (*operator.DeployedReso func Deploy(projectBytes []byte, configFileName string, configBytes []byte, force bool) (*schema.DeployResponse, error) { projectID := hash.Bytes(projectBytes) - projectKey := spec.ProjectKey(projectID) + projectKey := spec.ProjectKey(projectID, config.Cluster.ClusterName) projectFileMap, err := archive.UnzipMemToMem(projectBytes) if err != nil { return nil, err diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index f7c6bd10a3..f0f4cd9420 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -37,13 +37,13 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) return nil, "", err } - api := spec.GetAPISpec(apiConfig, "", "") + api := spec.GetAPISpec(apiConfig, "", "", config.Cluster.ClusterName) if prevVirtualService == nil { if err := config.AWS.UploadJSONToS3(api, config.Cluster.Bucket, api.Key); err != nil { return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil { + if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { return nil, "", errors.Wrap(err, "upload raw api spec") } @@ -65,7 +65,7 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey()); err != nil { + if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { return nil, "", errors.Wrap(err, "upload raw api spec") } @@ -198,6 +198,6 @@ func deleteK8sResources(apiName string) error { } func deleteS3Resources(apiName string) error { - prefix := filepath.Join("apis", apiName) + prefix := filepath.Join(config.Cluster.ClusterName, "apis", apiName) return config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) } diff --git a/pkg/types/spec/api.go b/pkg/types/spec/api.go index e13a9fc083..68bbed2226 100644 --- a/pkg/types/spec/api.go +++ b/pkg/types/spec/api.go @@ -66,7 +66,7 @@ APIID (uniquely identifies an api configuration for a given deployment) * APIs * DeploymentID (used for refreshing a deployment) */ -func GetAPISpec(apiConfig *userconfig.API, projectID string, deploymentID string) *API { +func GetAPISpec(apiConfig *userconfig.API, projectID string, deploymentID string, clusterName string) *API { var buf bytes.Buffer buf.WriteString(s.Obj(apiConfig.Resource)) @@ -93,13 +93,13 @@ func GetAPISpec(apiConfig *userconfig.API, projectID string, deploymentID string ID: apiID, SpecID: specID, PredictorID: predictorID, - Key: Key(apiConfig.Name, apiID), - PredictorKey: PredictorKey(apiConfig.Name, predictorID), + Key: Key(apiConfig.Name, apiID, clusterName), + PredictorKey: PredictorKey(apiConfig.Name, predictorID, clusterName), DeploymentID: deploymentID, LastUpdated: time.Now().Unix(), - MetadataRoot: MetadataRoot(apiConfig.Name), + MetadataRoot: MetadataRoot(apiConfig.Name, clusterName), ProjectID: projectID, - ProjectKey: ProjectKey(projectID), + ProjectKey: ProjectKey(projectID, clusterName), } } @@ -134,8 +134,9 @@ func (api *API) SubtractLocalModelIDs(apis ...*API) []string { return modelIDs.Slice() } -func PredictorKey(apiName string, predictorID string) string { +func PredictorKey(apiName string, predictorID string, clusterName string) string { return filepath.Join( + clusterName, "apis", apiName, "predictor", @@ -144,8 +145,9 @@ func PredictorKey(apiName string, predictorID string) string { ) } -func Key(apiName string, apiID string) string { +func Key(apiName string, apiID string, clusterName string) string { return filepath.Join( + clusterName, "apis", apiName, "api", @@ -154,8 +156,9 @@ func Key(apiName string, apiID string) string { ) } -func (api API) RawAPIKey() string { +func (api API) RawAPIKey(clusterName string) string { return filepath.Join( + clusterName, "apis", api.Name, "raw_api", @@ -164,16 +167,18 @@ func (api API) RawAPIKey() string { ) } -func MetadataRoot(apiName string) string { +func MetadataRoot(apiName string, clusterName string) string { return filepath.Join( + clusterName, "apis", apiName, "metadata", ) } -func ProjectKey(projectID string) string { +func ProjectKey(projectID string, clusterName string) string { return filepath.Join( + clusterName, "projects", projectID+".zip", ) diff --git a/pkg/types/spec/job_spec.go b/pkg/types/spec/job_spec.go index 391da34aa8..ca74aa3a5e 100644 --- a/pkg/types/spec/job_spec.go +++ b/pkg/types/spec/job_spec.go @@ -27,22 +27,23 @@ import ( ) type JobKey struct { - ID string `json:"job_id"` - APIName string `json:"api_name"` + ID string `json:"job_id"` + APIName string `json:"api_name"` + ClusterName string `json:"cluster_name"` } func (j JobKey) UserString() string { return fmt.Sprintf("%s (%s api)", j.ID, j.APIName) } -// e.g. /jobs////spec.json +// e.g. //jobs////spec.json func (j JobKey) SpecFilePath() string { return path.Join(j.Prefix(), "spec.json") } -// e.g. /jobs/// +// e.g. //jobs/// func (j JobKey) Prefix() string { - return s.EnsureSuffix(path.Join(BatchAPIJobPrefix(j.APIName), j.ID), "/") + return s.EnsureSuffix(path.Join(BatchAPIJobPrefix(j.APIName, j.ClusterName), j.ID), "/") } func (j JobKey) K8sName() string { @@ -65,6 +66,6 @@ type Job struct { StartTime time.Time `json:"start_time"` } -func BatchAPIJobPrefix(apiName string) string { - return filepath.Join("jobs", consts.CortexVersion, apiName) +func BatchAPIJobPrefix(apiName string, clusterName string) string { + return filepath.Join(clusterName, "jobs", consts.CortexVersion, apiName) } From 73976d44a90ac0f93508ac17663df561d72cbbaa Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 8 Oct 2020 09:43:48 -0700 Subject: [PATCH 07/10] Update api.go --- cli/local/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/local/api.go b/cli/local/api.go index 4f680f5f9e..a784f663c6 100644 --- a/cli/local/api.go +++ b/cli/local/api.go @@ -70,7 +70,7 @@ func UpdateAPI(apiConfig *userconfig.API, configPath string, projectID string, d return nil, "", err } - newAPISpec := spec.GetAPISpec(apiConfig, projectID, _deploymentID) + newAPISpec := spec.GetAPISpec(apiConfig, projectID, _deploymentID, "") // apiConfig.Predictor.ModelPath was already added to apiConfig.Predictor.Models for ease of use if len(apiConfig.Predictor.Models) > 0 { From 4dd8b4cbe926c8b19cb85a50c323f474dba6f332 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 8 Oct 2020 09:44:57 -0700 Subject: [PATCH 08/10] Update cluster.go --- cli/cmd/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index c1fddedd85..6bfce5e33f 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -659,7 +659,7 @@ var _exportCmd = &cobra.Command{ exit.Error(err) } - err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiSpec.RawAPIKey(), path.Join(baseDir, apiSpec.FileName)) + err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiSpec.RawAPIKey(info.ClusterConfig.ClusterName), path.Join(baseDir, apiSpec.FileName)) if err != nil { exit.Error(err) } From 8f3cc76466684b892733ca549cfea1f0c3ea5fd8 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 8 Oct 2020 11:38:26 -0700 Subject: [PATCH 09/10] Update JobKey --- pkg/operator/endpoints/get_job.go | 3 +-- pkg/operator/endpoints/logs_job.go | 6 ++--- pkg/operator/endpoints/stop_job.go | 3 +-- pkg/operator/resources/batchapi/enqueue.go | 2 +- .../resources/batchapi/in_progress_cache.go | 2 +- pkg/operator/resources/batchapi/job.go | 9 ++++---- pkg/operator/resources/batchapi/job_state.go | 22 +++++++++---------- pkg/operator/resources/batchapi/k8s_specs.go | 6 ++--- .../batchapi/manage_resources_cron.go | 2 +- pkg/operator/resources/batchapi/queue.go | 2 +- pkg/types/spec/job_spec.go | 13 +++++------ 11 files changed, 32 insertions(+), 38 deletions(-) diff --git a/pkg/operator/endpoints/get_job.go b/pkg/operator/endpoints/get_job.go index 483403c96b..62f8dfa950 100644 --- a/pkg/operator/endpoints/get_job.go +++ b/pkg/operator/endpoints/get_job.go @@ -20,7 +20,6 @@ import ( "net/http" "github.com/cortexlabs/cortex/pkg/lib/urls" - "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/resources" "github.com/cortexlabs/cortex/pkg/operator/resources/batchapi" @@ -45,7 +44,7 @@ func GetJob(w http.ResponseWriter, r *http.Request) { return } - jobKey := spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName} + jobKey := spec.JobKey{APIName: apiName, ID: jobID} jobStatus, err := batchapi.GetJobStatus(jobKey) if err != nil { diff --git a/pkg/operator/endpoints/logs_job.go b/pkg/operator/endpoints/logs_job.go index d613586b23..3fc68f510c 100644 --- a/pkg/operator/endpoints/logs_job.go +++ b/pkg/operator/endpoints/logs_job.go @@ -19,7 +19,6 @@ package endpoints import ( "net/http" - "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/resources" "github.com/cortexlabs/cortex/pkg/operator/resources/batchapi" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -51,8 +50,7 @@ func ReadJobLogs(w http.ResponseWriter, r *http.Request) { defer socket.Close() batchapi.ReadLogs(spec.JobKey{ - APIName: deployedResource.Name, - ID: jobID, - ClusterName: config.Cluster.ClusterName, + APIName: deployedResource.Name, + ID: jobID, }, socket) } diff --git a/pkg/operator/endpoints/stop_job.go b/pkg/operator/endpoints/stop_job.go index fdcd76790d..90aa419cce 100644 --- a/pkg/operator/endpoints/stop_job.go +++ b/pkg/operator/endpoints/stop_job.go @@ -20,7 +20,6 @@ import ( "fmt" "net/http" - "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/resources/batchapi" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -32,7 +31,7 @@ func StopJob(w http.ResponseWriter, r *http.Request) { apiName := vars["apiName"] jobID := vars["jobID"] - err := batchapi.StopJob(spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName}) + err := batchapi.StopJob(spec.JobKey{APIName: apiName, ID: jobID}) if err != nil { respondError(w, r, err) return diff --git a/pkg/operator/resources/batchapi/enqueue.go b/pkg/operator/resources/batchapi/enqueue.go index 7779e1a1a3..3353e6609e 100644 --- a/pkg/operator/resources/batchapi/enqueue.go +++ b/pkg/operator/resources/batchapi/enqueue.go @@ -49,7 +49,7 @@ func randomMessageID() string { } func updateLiveness(jobKey spec.JobKey) error { - s3Key := path.Join(jobKey.Prefix(), _enqueuingLivenessFile) + s3Key := path.Join(jobKey.Prefix(config.Cluster.ClusterName), _enqueuingLivenessFile) err := config.AWS.UploadJSONToS3(time.Now(), config.Cluster.Bucket, s3Key) if err != nil { return errors.Wrap(err, "failed to update liveness", jobKey.UserString()) diff --git a/pkg/operator/resources/batchapi/in_progress_cache.go b/pkg/operator/resources/batchapi/in_progress_cache.go index 24b1b6475e..02458aeecb 100644 --- a/pkg/operator/resources/batchapi/in_progress_cache.go +++ b/pkg/operator/resources/batchapi/in_progress_cache.go @@ -37,7 +37,7 @@ func jobKeyFromInProgressS3Key(s3Key string) spec.JobKey { apiName := s3PathSplit[len(s3PathSplit)-2] jobID := s3PathSplit[len(s3PathSplit)-1] - return spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName} + return spec.JobKey{APIName: apiName, ID: jobID} } func uploadInProgressFile(jobKey spec.JobKey) error { diff --git a/pkg/operator/resources/batchapi/job.go b/pkg/operator/resources/batchapi/job.go index 4e8f934c73..b5c978feab 100644 --- a/pkg/operator/resources/batchapi/job.go +++ b/pkg/operator/resources/batchapi/job.go @@ -78,9 +78,8 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err jobID := spec.MonotonicallyDecreasingID() jobKey := spec.JobKey{ - APIName: apiSpec.Name, - ID: jobID, - ClusterName: config.Cluster.ClusterName, + APIName: apiSpec.Name, + ID: jobID, } tags := map[string]string{ @@ -135,7 +134,7 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err func downloadJobSpec(jobKey spec.JobKey) (*spec.Job, error) { jobSpec := spec.Job{} - err := config.AWS.ReadJSONFromS3(&jobSpec, config.Cluster.Bucket, jobKey.SpecFilePath()) + err := config.AWS.ReadJSONFromS3(&jobSpec, config.Cluster.Bucket, jobKey.SpecFilePath(config.Cluster.ClusterName)) if err != nil { return nil, errors.Wrap(err, "unable to download job specification", jobKey.UserString()) } @@ -143,7 +142,7 @@ func downloadJobSpec(jobKey spec.JobKey) (*spec.Job, error) { } func uploadJobSpec(jobSpec *spec.Job) error { - err := config.AWS.UploadJSONToS3(jobSpec, config.Cluster.Bucket, jobSpec.SpecFilePath()) + err := config.AWS.UploadJSONToS3(jobSpec, config.Cluster.Bucket, jobSpec.SpecFilePath(config.Cluster.ClusterName)) if err != nil { return err } diff --git a/pkg/operator/resources/batchapi/job_state.go b/pkg/operator/resources/batchapi/job_state.go index bf4c609057..f69ba4962f 100644 --- a/pkg/operator/resources/batchapi/job_state.go +++ b/pkg/operator/resources/batchapi/job_state.go @@ -96,7 +96,7 @@ func getStatusCode(lastUpdatedMap map[string]time.Time) status.JobCode { } func getJobState(jobKey spec.JobKey) (*JobState, error) { - s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, jobKey.Prefix(), false, nil) + s3Objects, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, jobKey.Prefix(config.Cluster.ClusterName), false, nil) if err != nil { return nil, errors.Wrap(err, "failed to get job state", jobKey.UserString()) } @@ -160,7 +160,7 @@ func getMostRecentlySubmittedJobStates(apiName string, count int) ([]*JobState, jobStateCount := 0 for _, jobID := range jobIDOrder { - jobState := getJobStateFromFiles(spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName}, lastUpdatedMaps[jobID]) + jobState := getJobStateFromFiles(spec.JobKey{APIName: apiName, ID: jobID}, lastUpdatedMaps[jobID]) jobStates = append(jobStates, &jobState) jobStateCount++ @@ -197,7 +197,7 @@ func setStatusForJob(jobKey spec.JobKey, jobStatus status.JobCode) error { } func setEnqueuingStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobEnqueuing.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobEnqueuing.String())) if err != nil { return err } @@ -211,7 +211,7 @@ func setEnqueuingStatus(jobKey spec.JobKey) error { } func setRunningStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobRunning.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobRunning.String())) if err != nil { return err } @@ -225,7 +225,7 @@ func setRunningStatus(jobKey spec.JobKey) error { } func setStoppedStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobStopped.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobStopped.String())) if err != nil { return err } @@ -239,7 +239,7 @@ func setStoppedStatus(jobKey spec.JobKey) error { } func setSucceededStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobSucceeded.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobSucceeded.String())) if err != nil { return err } @@ -253,7 +253,7 @@ func setSucceededStatus(jobKey spec.JobKey) error { } func setCompletedWithFailuresStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobCompletedWithFailures.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobCompletedWithFailures.String())) if err != nil { return err } @@ -267,7 +267,7 @@ func setCompletedWithFailuresStatus(jobKey spec.JobKey) error { } func setWorkerErrorStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobWorkerError.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobWorkerError.String())) if err != nil { return err } @@ -281,7 +281,7 @@ func setWorkerErrorStatus(jobKey spec.JobKey) error { } func setWorkerOOMStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobWorkerOOM.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobWorkerOOM.String())) if err != nil { return err } @@ -295,7 +295,7 @@ func setWorkerOOMStatus(jobKey spec.JobKey) error { } func setEnqueueFailedStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobEnqueueFailed.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobEnqueueFailed.String())) if err != nil { return err } @@ -309,7 +309,7 @@ func setEnqueueFailedStatus(jobKey spec.JobKey) error { } func setUnexpectedErrorStatus(jobKey spec.JobKey) error { - err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(), status.JobUnexpectedError.String())) + err := config.AWS.UploadStringToS3("", config.Cluster.Bucket, path.Join(jobKey.Prefix(config.Cluster.ClusterName), status.JobUnexpectedError.String())) if err != nil { return err } diff --git a/pkg/operator/resources/batchapi/k8s_specs.go b/pkg/operator/resources/batchapi/k8s_specs.go index 36bcde2346..a98ae147ea 100644 --- a/pkg/operator/resources/batchapi/k8s_specs.go +++ b/pkg/operator/resources/batchapi/k8s_specs.go @@ -51,7 +51,7 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) { if container.Name == operator.APIContainerName { containers[i].Env = append(container.Env, kcore.EnvVar{ Name: "CORTEX_JOB_SPEC", - Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(), + Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(config.Cluster.ClusterName), }) } } @@ -100,7 +100,7 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, erro if container.Name == operator.APIContainerName { containers[i].Env = append(container.Env, kcore.EnvVar{ Name: "CORTEX_JOB_SPEC", - Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(), + Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(config.Cluster.ClusterName), }) } } @@ -150,7 +150,7 @@ func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) { if container.Name == operator.APIContainerName { containers[i].Env = append(container.Env, kcore.EnvVar{ Name: "CORTEX_JOB_SPEC", - Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(), + Value: "s3://" + config.Cluster.Bucket + "/" + job.SpecFilePath(config.Cluster.ClusterName), }) } } diff --git a/pkg/operator/resources/batchapi/manage_resources_cron.go b/pkg/operator/resources/batchapi/manage_resources_cron.go index 60a2bfd8d2..4beb6c3a63 100644 --- a/pkg/operator/resources/batchapi/manage_resources_cron.go +++ b/pkg/operator/resources/batchapi/manage_resources_cron.go @@ -141,7 +141,7 @@ func ManageJobResources() error { // existing k8sjob but job is not in progress for jobID := range strset.Difference(k8sJobIDSet, inProgressJobIDSet) { - jobKey := spec.JobKey{APIName: k8sJobMap[jobID].Labels["apiName"], ID: k8sJobMap[jobID].Labels["jobID"], ClusterName: config.Cluster.ClusterName} + jobKey := spec.JobKey{APIName: k8sJobMap[jobID].Labels["apiName"], ID: k8sJobMap[jobID].Labels["jobID"]} // delete both k8sjob and queue err := deleteJobRuntimeResources(jobKey) diff --git a/pkg/operator/resources/batchapi/queue.go b/pkg/operator/resources/batchapi/queue.go index c9bc0f6a85..229487c1ad 100644 --- a/pkg/operator/resources/batchapi/queue.go +++ b/pkg/operator/resources/batchapi/queue.go @@ -58,7 +58,7 @@ func jobKeyFromQueueURL(queueURL string) spec.JobKey { apiNameSplit := dashSplit[1 : len(dashSplit)-1] apiName := strings.Join(apiNameSplit, "-") - return spec.JobKey{APIName: apiName, ID: jobID, ClusterName: config.Cluster.ClusterName} + return spec.JobKey{APIName: apiName, ID: jobID} } func createFIFOQueue(jobKey spec.JobKey, tags map[string]string) (string, error) { diff --git a/pkg/types/spec/job_spec.go b/pkg/types/spec/job_spec.go index ca74aa3a5e..d63184d4aa 100644 --- a/pkg/types/spec/job_spec.go +++ b/pkg/types/spec/job_spec.go @@ -27,9 +27,8 @@ import ( ) type JobKey struct { - ID string `json:"job_id"` - APIName string `json:"api_name"` - ClusterName string `json:"cluster_name"` + ID string `json:"job_id"` + APIName string `json:"api_name"` } func (j JobKey) UserString() string { @@ -37,13 +36,13 @@ func (j JobKey) UserString() string { } // e.g. //jobs////spec.json -func (j JobKey) SpecFilePath() string { - return path.Join(j.Prefix(), "spec.json") +func (j JobKey) SpecFilePath(clusterName string) string { + return path.Join(j.Prefix(clusterName), "spec.json") } // e.g. //jobs/// -func (j JobKey) Prefix() string { - return s.EnsureSuffix(path.Join(BatchAPIJobPrefix(j.APIName, j.ClusterName), j.ID), "/") +func (j JobKey) Prefix(clusterName string) string { + return s.EnsureSuffix(path.Join(BatchAPIJobPrefix(j.APIName, clusterName), j.ID), "/") } func (j JobKey) K8sName() string { From 38fb6ff56e95bd4feae112062751bee2fc6f779a Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 8 Oct 2020 12:33:42 -0700 Subject: [PATCH 10/10] Use /verifycortex endpoint when checking operator readiness --- cli/cmd/lib_cli_config.go | 3 +-- manager/install.sh | 6 +++--- pkg/operator/endpoints/respond.go | 6 ++++++ pkg/operator/endpoints/verify_cortex.go | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cli/cmd/lib_cli_config.go b/cli/cmd/lib_cli_config.go index 1e6b0b0047..ead82f1758 100644 --- a/cli/cmd/lib_cli_config.go +++ b/cli/cmd/lib_cli_config.go @@ -499,18 +499,17 @@ func validateOperatorEndpoint(endpoint string) (string, error) { if err != nil { return "", errors.Wrap(err, "verifying operator endpoint", url) } - req.Header.Set("Content-Type", "application/json") client := http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, }, } + response, err := client.Do(req) if err != nil { return "", ErrorInvalidOperatorEndpoint(url) } - if response.StatusCode != 200 { return "", ErrorInvalidOperatorEndpoint(url) } diff --git a/manager/install.sh b/manager/install.sh index b6059841e2..3d9eb5522c 100755 --- a/manager/install.sh +++ b/manager/install.sh @@ -442,8 +442,8 @@ function validate_cortex() { echo "operator endpoint reachable: $operator_endpoint_reachable" fi if [ "$operator_endpoint" != "" ]; then - echo "operator curl response (404 is expected):" - curl --max-time 3 $operator_endpoint + echo "operator curl response:" + curl --max-time 3 "${operator_endpoint}/verifycortex" fi echo exit 1 @@ -493,7 +493,7 @@ function validate_cortex() { if [ "$CORTEX_OPERATOR_LOAD_BALANCER_SCHEME" == "internet-facing" ]; then if [ "$operator_endpoint_reachable" != "true" ]; then - if ! curl --max-time 3 $operator_endpoint >/dev/null 2>&1; then + if ! curl --max-time 3 "${operator_endpoint}/verifycortex" >/dev/null 2>&1; then continue fi operator_endpoint_reachable="true" diff --git a/pkg/operator/endpoints/respond.go b/pkg/operator/endpoints/respond.go index 29033f4990..aa1d9e5f21 100644 --- a/pkg/operator/endpoints/respond.go +++ b/pkg/operator/endpoints/respond.go @@ -31,6 +31,12 @@ func respond(w http.ResponseWriter, response interface{}) { json.NewEncoder(w).Encode(response) } +func respondPlainText(w http.ResponseWriter, response string) { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + w.Write([]byte(response)) +} + func respondError(w http.ResponseWriter, r *http.Request, err error, strs ...string) { respondErrorCode(w, r, http.StatusBadRequest, err, strs...) } diff --git a/pkg/operator/endpoints/verify_cortex.go b/pkg/operator/endpoints/verify_cortex.go index e3b86409f8..6972a13997 100644 --- a/pkg/operator/endpoints/verify_cortex.go +++ b/pkg/operator/endpoints/verify_cortex.go @@ -21,5 +21,5 @@ import ( ) func VerifyCortex(w http.ResponseWriter, r *http.Request) { - respond(w, "ok") + respondPlainText(w, "ok") }