diff --git a/cli/cluster/logs.go b/cli/cluster/logs.go index 870044e2f1..5a980544f9 100644 --- a/cli/cluster/logs.go +++ b/cli/cluster/logs.go @@ -29,6 +29,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/exit" "github.com/cortexlabs/cortex/pkg/lib/json" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/gorilla/websocket" ) @@ -112,7 +113,7 @@ func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[strin } func handleConnection(connection *websocket.Conn, done chan struct{}) { - go func() { + routines.RunWithPanicHandler(func() { defer close(done) for { _, message, err := connection.ReadMessage() @@ -121,7 +122,7 @@ func handleConnection(connection *websocket.Conn, done chan struct{}) { } fmt.Println(string(message)) } - }() + }, false) } func closeConnection(connection *websocket.Conn, done chan struct{}, interrupt chan os.Signal) { diff --git a/cli/cmd/lib_manager.go b/cli/cmd/lib_manager.go index 3054350dd1..7370f040d6 100644 --- a/cli/cmd/lib_manager.go +++ b/cli/cmd/lib_manager.go @@ -34,6 +34,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/exit" "github.com/cortexlabs/cortex/pkg/lib/files" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/yaml" dockertypes "github.com/docker/docker/api/types" @@ -91,12 +92,13 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) caughtCtrlC := false - go func() { + + routines.RunWithPanicHandler(func() { <-c caughtCtrlC = true removeContainer() exit.Error(ErrorDockerCtrlC()) - }() + }, false) for _, copyPath := range copyToPaths { err = docker.CopyToContainer(containerInfo.ID, copyPath.input, copyPath.containerPath) diff --git a/pkg/lib/cron/cron.go b/pkg/lib/cron/cron.go index 21bc64a8d8..741814f441 100644 --- a/pkg/lib/cron/cron.go +++ b/pkg/lib/cron/cron.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" ) type Cron struct { @@ -39,7 +40,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron { } } - go func() { + routines.RunWithPanicHandler(func() { timer := time.NewTimer(0) defer timer.Stop() for { @@ -53,7 +54,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron { } timer.Reset(delay) } - }() + }, false) return Cron{ cronRun: cronRun, diff --git a/pkg/lib/parallel/parallel.go b/pkg/lib/parallel/parallel.go index fb4c05a641..d535daf191 100644 --- a/pkg/lib/parallel/parallel.go +++ b/pkg/lib/parallel/parallel.go @@ -41,6 +41,11 @@ func Run(fn func() error, fns ...func() error) []error { } go func() { + defer func() { + if r := recover(); r != nil { + errChannel <- errors.CastRecoverError(r) + } + }() errChannel <- fn() }() } diff --git a/pkg/lib/routines/routines.go b/pkg/lib/routines/routines.go new file mode 100644 index 0000000000..af7886a6c3 --- /dev/null +++ b/pkg/lib/routines/routines.go @@ -0,0 +1,39 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package routines + +import ( + "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/exit" + "github.com/cortexlabs/cortex/pkg/lib/telemetry" +) + +func RunWithPanicHandler(f func(), exitOnPanic bool) { + go func() { + defer func() { + if r := recover(); r != nil { + err := errors.CastRecoverError(r) + errors.PrintStacktrace(err) + if exitOnPanic { + exit.Error(err) + } + telemetry.Error(err) + } + }() + f() + }() +} diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index ae6067a356..8d3ea44a5e 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" @@ -51,14 +52,20 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, err = applyK8sResources(api, prevVirtualService) if err != nil { - go deleteK8sResources(api.Name) + routines.RunWithPanicHandler(func() { + deleteK8sResources(api.Name) + }, false) return nil, "", err } err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - go deleteK8sResources(api.Name) - go operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) + routines.RunWithPanicHandler(func() { + deleteK8sResources(api.Name) + }, false) + routines.RunWithPanicHandler(func() { + operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) + }, false) return nil, "", err } @@ -150,7 +157,9 @@ func deleteS3Resources(apiName string) error { }, func() error { prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName) - go config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while + routines.RunWithPanicHandler(func() { + config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while + }, false) return nil }, func() error { diff --git a/pkg/operator/resources/batchapi/job.go b/pkg/operator/resources/batchapi/job.go index 9be04bbb75..90c85dcc05 100644 --- a/pkg/operator/resources/batchapi/job.go +++ b/pkg/operator/resources/batchapi/job.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" @@ -127,7 +128,9 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err return nil, err } - go deployJob(apiSpec, &jobSpec, submission) + routines.RunWithPanicHandler(func() { + deployJob(apiSpec, &jobSpec, submission) + }, false) return &jobSpec, nil } @@ -256,12 +259,16 @@ func deleteJobRuntimeResources(jobKey spec.JobKey) error { func StopJob(jobKey spec.JobKey) error { jobState, err := getJobState(jobKey) if err != nil { - go deleteJobRuntimeResources(jobKey) + routines.RunWithPanicHandler(func() { + deleteJobRuntimeResources(jobKey) + }, false) return err } if !jobState.Status.IsInProgress() { - go deleteJobRuntimeResources(jobKey) + routines.RunWithPanicHandler(func() { + deleteJobRuntimeResources(jobKey) + }, false) return errors.Wrap(ErrorJobIsNotInProgress(), jobKey.UserString()) } diff --git a/pkg/operator/resources/batchapi/logs.go b/pkg/operator/resources/batchapi/logs.go index 3c6dd6ecff..434ca2c0dd 100644 --- a/pkg/operator/resources/batchapi/logs.go +++ b/pkg/operator/resources/batchapi/logs.go @@ -25,6 +25,7 @@ import ( awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/cache" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/telemetry" @@ -63,9 +64,13 @@ func ReadLogs(jobKey spec.JobKey, socket *websocket.Conn) { defer close(podCheckCancel) if jobStatus.Status.IsInProgress() { - go streamFromCloudWatch(jobStatus, podCheckCancel, socket) + routines.RunWithPanicHandler(func() { + streamFromCloudWatch(jobStatus, podCheckCancel, socket) + }, false) } else { - go fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket) + routines.RunWithPanicHandler(func() { + fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket) + }, false) } pumpStdin(socket) diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 0392254fa0..ef52006798 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -25,6 +25,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -67,14 +68,18 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A } if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { - go deleteK8sResources(api.Name) + routines.RunWithPanicHandler(func() { + deleteK8sResources(api.Name) + }, false) return nil, "", err } if config.Provider == types.AWSProviderType { err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - go deleteK8sResources(api.Name) + routines.RunWithPanicHandler(func() { + deleteK8sResources(api.Name) + }, false) return nil, "", err } err = addAPIToDashboard(config.Cluster.ClusterName, api.Name) diff --git a/pkg/operator/resources/realtimeapi/logs.go b/pkg/operator/resources/realtimeapi/logs.go index 720dbda5d1..461a0c9939 100644 --- a/pkg/operator/resources/realtimeapi/logs.go +++ b/pkg/operator/resources/realtimeapi/logs.go @@ -25,6 +25,7 @@ import ( awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/cache" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/telemetry" @@ -56,7 +57,9 @@ type fluentdLog struct { func ReadLogs(apiName string, socket *websocket.Conn) { podCheckCancel := make(chan struct{}) defer close(podCheckCancel) - go streamFromCloudWatch(apiName, podCheckCancel, socket) + routines.RunWithPanicHandler(func() { + streamFromCloudWatch(apiName, podCheckCancel, socket) + }, false) pumpStdin(socket) podCheckCancel <- struct{}{} } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 722843d75f..31de86159f 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -26,6 +26,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/hash" "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" @@ -287,7 +288,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { } if deployedResource == nil { // Delete anyways just to be sure everything is deleted - go func() { + routines.RunWithPanicHandler(func() { err := parallel.RunFirstErr( func() error { return realtimeapi.DeleteAPI(apiName, keepCache) @@ -308,7 +309,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { if err != nil { telemetry.Error(err) } - }() + }, false) return nil, ErrorAPINotDeployed(apiName) } diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 3a1aeea2aa..a4d0654cab 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -23,6 +23,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -44,13 +45,17 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) } if err := applyK8sVirtualService(api, prevVirtualService); err != nil { - go deleteK8sResources(api.Name) + routines.RunWithPanicHandler(func() { + deleteK8sResources(api.Name) + }, false) return nil, "", err } err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - go deleteK8sResources(api.Name) + routines.RunWithPanicHandler(func() { + deleteK8sResources(api.Name) + }, false) return nil, "", err } return api, fmt.Sprintf("created %s", api.Resource.UserString()), nil