From 3a2302eaabb23deed7ab88db4b8079d79205d906 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 29 Dec 2020 16:01:27 +0200 Subject: [PATCH 1/7] Add panic handler for telemetry pkg --- pkg/lib/telemetry/errors.go | 10 ++++++++++ pkg/lib/telemetry/handlers.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 pkg/lib/telemetry/handlers.go diff --git a/pkg/lib/telemetry/errors.go b/pkg/lib/telemetry/errors.go index 96b4a6898d..3820fec7af 100644 --- a/pkg/lib/telemetry/errors.go +++ b/pkg/lib/telemetry/errors.go @@ -17,14 +17,24 @@ limitations under the License. package telemetry import ( + "fmt" + "github.com/cortexlabs/cortex/pkg/lib/errors" ) const ( + ErrUnexpectedError = "telemetry.unexpected_error" ErrUserIDNotSpecified = "telemetry.user_id_not_specified" ErrSentryFlushTimeoutExceeded = "telemetry.sentry_flush_timeout_exceeded" ) +func ErrorUnexpectedError(msg string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrUnexpectedError, + Message: fmt.Sprintf("unexpected error occurred: %s", msg), + }) +} + func ErrorUserIDNotSpecified() error { return errors.WithStack(&errors.Error{ Kind: ErrUserIDNotSpecified, diff --git a/pkg/lib/telemetry/handlers.go b/pkg/lib/telemetry/handlers.go new file mode 100644 index 0000000000..0320f1d0ab --- /dev/null +++ b/pkg/lib/telemetry/handlers.go @@ -0,0 +1,29 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package telemetry + +func GoRoutineWithPanicHandler(f func()) { + go func() { + defer func() { + if r := recover(); r != nil { + err := r.(error) + Error(ErrorUnexpectedError(err.Error())) + } + }() + f() + }() +} From 1f1418f53bb10ab9dddab56637a0c00ba76e11dd Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 29 Dec 2020 17:58:07 +0200 Subject: [PATCH 2/7] Catch panics in operator/CLI --- cli/cluster/logs.go | 5 +-- cli/cmd/lib_manager.go | 6 ++-- pkg/lib/cron/cron.go | 5 +-- pkg/lib/parallel/errors.go | 18 ++++++++++ pkg/lib/parallel/parallel.go | 7 ++++ pkg/lib/routines/errors.go | 34 +++++++++++++++++++ .../handlers.go => routines/routines.go} | 12 +++++-- pkg/lib/telemetry/errors.go | 10 ------ pkg/operator/resources/batchapi/api.go | 17 +++++++--- pkg/operator/resources/batchapi/job.go | 13 +++++-- pkg/operator/resources/batchapi/logs.go | 9 +++-- pkg/operator/resources/realtimeapi/api.go | 9 +++-- pkg/operator/resources/realtimeapi/logs.go | 5 ++- pkg/operator/resources/resources.go | 5 +-- pkg/operator/resources/trafficsplitter/api.go | 9 +++-- 15 files changed, 129 insertions(+), 35 deletions(-) create mode 100644 pkg/lib/parallel/errors.go create mode 100644 pkg/lib/routines/errors.go rename pkg/lib/{telemetry/handlers.go => routines/routines.go} (81%) diff --git a/cli/cluster/logs.go b/cli/cluster/logs.go index 870044e2f1..9593bddde5 100644 --- a/cli/cluster/logs.go +++ b/cli/cluster/logs.go @@ -29,6 +29,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/exit" "github.com/cortexlabs/cortex/pkg/lib/json" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/gorilla/websocket" ) @@ -112,7 +113,7 @@ func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[strin } func handleConnection(connection *websocket.Conn, done chan struct{}) { - go func() { + routines.GoRoutineWithPanicHandler(func() { defer close(done) for { _, message, err := connection.ReadMessage() @@ -121,7 +122,7 @@ func handleConnection(connection *websocket.Conn, done chan struct{}) { } fmt.Println(string(message)) } - }() + }) } func closeConnection(connection *websocket.Conn, done chan struct{}, interrupt chan os.Signal) { diff --git a/cli/cmd/lib_manager.go b/cli/cmd/lib_manager.go index 3054350dd1..6f8bbc4ac9 100644 --- a/cli/cmd/lib_manager.go +++ b/cli/cmd/lib_manager.go @@ -34,6 +34,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/exit" "github.com/cortexlabs/cortex/pkg/lib/files" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/yaml" dockertypes "github.com/docker/docker/api/types" @@ -91,12 +92,13 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) caughtCtrlC := false - go func() { + + routines.GoRoutineWithPanicHandler(func() { <-c caughtCtrlC = true removeContainer() exit.Error(ErrorDockerCtrlC()) - }() + }) for _, copyPath := range copyToPaths { err = docker.CopyToContainer(containerInfo.ID, copyPath.input, copyPath.containerPath) diff --git a/pkg/lib/cron/cron.go b/pkg/lib/cron/cron.go index 21bc64a8d8..c60a472471 100644 --- a/pkg/lib/cron/cron.go +++ b/pkg/lib/cron/cron.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" ) type Cron struct { @@ -39,7 +40,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron { } } - go func() { + routines.GoRoutineWithPanicHandler(func() { timer := time.NewTimer(0) defer timer.Stop() for { @@ -53,7 +54,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron { } timer.Reset(delay) } - }() + }) return Cron{ cronRun: cronRun, diff --git a/pkg/lib/parallel/errors.go b/pkg/lib/parallel/errors.go new file mode 100644 index 0000000000..90a63904b2 --- /dev/null +++ b/pkg/lib/parallel/errors.go @@ -0,0 +1,18 @@ +package parallel + +import ( + "fmt" + + "github.com/cortexlabs/cortex/pkg/lib/errors" +) + +const ( + ErrUnexpectedError = "parallel.unexpected_error" +) + +func ErrorUnexpectedError(msg string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrUnexpectedError, + Message: fmt.Sprintf("unexpected error occurred in parallel execution: %s", msg), + }) +} diff --git a/pkg/lib/parallel/parallel.go b/pkg/lib/parallel/parallel.go index fb4c05a641..60638715b2 100644 --- a/pkg/lib/parallel/parallel.go +++ b/pkg/lib/parallel/parallel.go @@ -17,6 +17,8 @@ limitations under the License. package parallel import ( + "fmt" + "github.com/cortexlabs/cortex/pkg/lib/errors" ) @@ -41,6 +43,11 @@ func Run(fn func() error, fns ...func() error) []error { } go func() { + defer func() { + if r := recover(); r != nil { + errChannel <- ErrorUnexpectedError(fmt.Sprintf("%v", r)) + } + }() errChannel <- fn() }() } diff --git a/pkg/lib/routines/errors.go b/pkg/lib/routines/errors.go new file mode 100644 index 0000000000..b410dee3aa --- /dev/null +++ b/pkg/lib/routines/errors.go @@ -0,0 +1,34 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package routines + +import ( + "fmt" + + "github.com/cortexlabs/cortex/pkg/lib/errors" +) + +const ( + ErrUnexpectedError = "routines.unexpected_error" +) + +func ErrorUnexpectedError(msg string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrUnexpectedError, + Message: fmt.Sprintf("unexpected error occurred: %s", msg), + }) +} diff --git a/pkg/lib/telemetry/handlers.go b/pkg/lib/routines/routines.go similarity index 81% rename from pkg/lib/telemetry/handlers.go rename to pkg/lib/routines/routines.go index 0320f1d0ab..c6d2f77b35 100644 --- a/pkg/lib/telemetry/handlers.go +++ b/pkg/lib/routines/routines.go @@ -14,14 +14,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package telemetry +package routines + +import ( + "fmt" + + "github.com/cortexlabs/cortex/pkg/lib/exit" +) func GoRoutineWithPanicHandler(f func()) { go func() { defer func() { if r := recover(); r != nil { - err := r.(error) - Error(ErrorUnexpectedError(err.Error())) + err := ErrorUnexpectedError(fmt.Sprintf("%v", r)) + exit.Error(err) } }() f() diff --git a/pkg/lib/telemetry/errors.go b/pkg/lib/telemetry/errors.go index 3820fec7af..96b4a6898d 100644 --- a/pkg/lib/telemetry/errors.go +++ b/pkg/lib/telemetry/errors.go @@ -17,24 +17,14 @@ limitations under the License. package telemetry import ( - "fmt" - "github.com/cortexlabs/cortex/pkg/lib/errors" ) const ( - ErrUnexpectedError = "telemetry.unexpected_error" ErrUserIDNotSpecified = "telemetry.user_id_not_specified" ErrSentryFlushTimeoutExceeded = "telemetry.sentry_flush_timeout_exceeded" ) -func ErrorUnexpectedError(msg string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrUnexpectedError, - Message: fmt.Sprintf("unexpected error occurred: %s", msg), - }) -} - func ErrorUserIDNotSpecified() error { return errors.WithStack(&errors.Error{ Kind: ErrUserIDNotSpecified, diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index ae6067a356..cda670790a 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" @@ -51,14 +52,20 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, err = applyK8sResources(api, prevVirtualService) if err != nil { - go deleteK8sResources(api.Name) + routines.GoRoutineWithPanicHandler(func() { + deleteK8sResources(api.Name) + }) return nil, "", err } err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - go deleteK8sResources(api.Name) - go operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) + routines.GoRoutineWithPanicHandler(func() { + deleteK8sResources(api.Name) + }) + routines.GoRoutineWithPanicHandler(func() { + operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) + }) return nil, "", err } @@ -150,7 +157,9 @@ func deleteS3Resources(apiName string) error { }, func() error { prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName) - go config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while + routines.GoRoutineWithPanicHandler(func() { + config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while + }) return nil }, func() error { diff --git a/pkg/operator/resources/batchapi/job.go b/pkg/operator/resources/batchapi/job.go index 9be04bbb75..1850089059 100644 --- a/pkg/operator/resources/batchapi/job.go +++ b/pkg/operator/resources/batchapi/job.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" @@ -127,7 +128,9 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err return nil, err } - go deployJob(apiSpec, &jobSpec, submission) + routines.GoRoutineWithPanicHandler(func() { + deployJob(apiSpec, &jobSpec, submission) + }) return &jobSpec, nil } @@ -256,12 +259,16 @@ func deleteJobRuntimeResources(jobKey spec.JobKey) error { func StopJob(jobKey spec.JobKey) error { jobState, err := getJobState(jobKey) if err != nil { - go deleteJobRuntimeResources(jobKey) + routines.GoRoutineWithPanicHandler(func() { + deleteJobRuntimeResources(jobKey) + }) return err } if !jobState.Status.IsInProgress() { - go deleteJobRuntimeResources(jobKey) + routines.GoRoutineWithPanicHandler(func() { + deleteJobRuntimeResources(jobKey) + }) return errors.Wrap(ErrorJobIsNotInProgress(), jobKey.UserString()) } diff --git a/pkg/operator/resources/batchapi/logs.go b/pkg/operator/resources/batchapi/logs.go index 3c6dd6ecff..7a4bfa7600 100644 --- a/pkg/operator/resources/batchapi/logs.go +++ b/pkg/operator/resources/batchapi/logs.go @@ -25,6 +25,7 @@ import ( awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/cache" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/telemetry" @@ -63,9 +64,13 @@ func ReadLogs(jobKey spec.JobKey, socket *websocket.Conn) { defer close(podCheckCancel) if jobStatus.Status.IsInProgress() { - go streamFromCloudWatch(jobStatus, podCheckCancel, socket) + routines.GoRoutineWithPanicHandler(func() { + streamFromCloudWatch(jobStatus, podCheckCancel, socket) + }) } else { - go fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket) + routines.GoRoutineWithPanicHandler(func() { + fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket) + }) } pumpStdin(socket) diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 0392254fa0..1188d035d8 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -25,6 +25,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -67,14 +68,18 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A } if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { - go deleteK8sResources(api.Name) + routines.GoRoutineWithPanicHandler(func() { + deleteK8sResources(api.Name) + }) return nil, "", err } if config.Provider == types.AWSProviderType { err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - go deleteK8sResources(api.Name) + routines.GoRoutineWithPanicHandler(func() { + deleteK8sResources(api.Name) + }) return nil, "", err } err = addAPIToDashboard(config.Cluster.ClusterName, api.Name) diff --git a/pkg/operator/resources/realtimeapi/logs.go b/pkg/operator/resources/realtimeapi/logs.go index 720dbda5d1..66f69ef1ea 100644 --- a/pkg/operator/resources/realtimeapi/logs.go +++ b/pkg/operator/resources/realtimeapi/logs.go @@ -25,6 +25,7 @@ import ( awslib "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/cache" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/telemetry" @@ -56,7 +57,9 @@ type fluentdLog struct { func ReadLogs(apiName string, socket *websocket.Conn) { podCheckCancel := make(chan struct{}) defer close(podCheckCancel) - go streamFromCloudWatch(apiName, podCheckCancel, socket) + routines.GoRoutineWithPanicHandler(func() { + streamFromCloudWatch(apiName, podCheckCancel, socket) + }) pumpStdin(socket) podCheckCancel <- struct{}{} } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 722843d75f..b9e57b393e 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -26,6 +26,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/hash" "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/lib/pointer" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" @@ -287,7 +288,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { } if deployedResource == nil { // Delete anyways just to be sure everything is deleted - go func() { + routines.GoRoutineWithPanicHandler(func() { err := parallel.RunFirstErr( func() error { return realtimeapi.DeleteAPI(apiName, keepCache) @@ -308,7 +309,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { if err != nil { telemetry.Error(err) } - }() + }) return nil, ErrorAPINotDeployed(apiName) } diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 3a1aeea2aa..45eb8e6136 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -23,6 +23,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -44,13 +45,17 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) } if err := applyK8sVirtualService(api, prevVirtualService); err != nil { - go deleteK8sResources(api.Name) + routines.GoRoutineWithPanicHandler(func() { + deleteK8sResources(api.Name) + }) return nil, "", err } err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - go deleteK8sResources(api.Name) + routines.GoRoutineWithPanicHandler(func() { + deleteK8sResources(api.Name) + }) return nil, "", err } return api, fmt.Sprintf("created %s", api.Resource.UserString()), nil From 895f46f49f86bddf9977afb6ecc398fe6a59dea8 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 29 Dec 2020 18:12:58 +0200 Subject: [PATCH 3/7] Add missing Cortex license --- pkg/lib/parallel/errors.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/lib/parallel/errors.go b/pkg/lib/parallel/errors.go index 90a63904b2..c83ac6dff5 100644 --- a/pkg/lib/parallel/errors.go +++ b/pkg/lib/parallel/errors.go @@ -1,3 +1,19 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package parallel import ( From e637edfff4055fb9afe534318e4dea59214b36fd Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 30 Dec 2020 00:10:38 +0200 Subject: [PATCH 4/7] Use CastRecoverError instead --- pkg/lib/parallel/errors.go | 34 ---------------------------------- pkg/lib/parallel/parallel.go | 4 +--- 2 files changed, 1 insertion(+), 37 deletions(-) delete mode 100644 pkg/lib/parallel/errors.go diff --git a/pkg/lib/parallel/errors.go b/pkg/lib/parallel/errors.go deleted file mode 100644 index c83ac6dff5..0000000000 --- a/pkg/lib/parallel/errors.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2020 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package parallel - -import ( - "fmt" - - "github.com/cortexlabs/cortex/pkg/lib/errors" -) - -const ( - ErrUnexpectedError = "parallel.unexpected_error" -) - -func ErrorUnexpectedError(msg string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrUnexpectedError, - Message: fmt.Sprintf("unexpected error occurred in parallel execution: %s", msg), - }) -} diff --git a/pkg/lib/parallel/parallel.go b/pkg/lib/parallel/parallel.go index 60638715b2..d535daf191 100644 --- a/pkg/lib/parallel/parallel.go +++ b/pkg/lib/parallel/parallel.go @@ -17,8 +17,6 @@ limitations under the License. package parallel import ( - "fmt" - "github.com/cortexlabs/cortex/pkg/lib/errors" ) @@ -45,7 +43,7 @@ func Run(fn func() error, fns ...func() error) []error { go func() { defer func() { if r := recover(); r != nil { - errChannel <- ErrorUnexpectedError(fmt.Sprintf("%v", r)) + errChannel <- errors.CastRecoverError(r) } }() errChannel <- fn() From 221499453eafd4ab64ebc0995ba4c8a4f33df983 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 30 Dec 2020 00:12:10 +0200 Subject: [PATCH 5/7] Rename function --- cli/cluster/logs.go | 2 +- cli/cmd/lib_manager.go | 2 +- pkg/lib/cron/cron.go | 2 +- pkg/lib/routines/routines.go | 2 +- pkg/operator/resources/batchapi/api.go | 8 ++++---- pkg/operator/resources/batchapi/job.go | 6 +++--- pkg/operator/resources/batchapi/logs.go | 4 ++-- pkg/operator/resources/realtimeapi/api.go | 4 ++-- pkg/operator/resources/realtimeapi/logs.go | 2 +- pkg/operator/resources/resources.go | 2 +- pkg/operator/resources/trafficsplitter/api.go | 4 ++-- 11 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cli/cluster/logs.go b/cli/cluster/logs.go index 9593bddde5..0e6041b37c 100644 --- a/cli/cluster/logs.go +++ b/cli/cluster/logs.go @@ -113,7 +113,7 @@ func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[strin } func handleConnection(connection *websocket.Conn, done chan struct{}) { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { defer close(done) for { _, message, err := connection.ReadMessage() diff --git a/cli/cmd/lib_manager.go b/cli/cmd/lib_manager.go index 6f8bbc4ac9..eef02c6747 100644 --- a/cli/cmd/lib_manager.go +++ b/cli/cmd/lib_manager.go @@ -93,7 +93,7 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop signal.Notify(c, os.Interrupt, syscall.SIGTERM) caughtCtrlC := false - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { <-c caughtCtrlC = true removeContainer() diff --git a/pkg/lib/cron/cron.go b/pkg/lib/cron/cron.go index c60a472471..558d75b27e 100644 --- a/pkg/lib/cron/cron.go +++ b/pkg/lib/cron/cron.go @@ -40,7 +40,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron { } } - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { timer := time.NewTimer(0) defer timer.Stop() for { diff --git a/pkg/lib/routines/routines.go b/pkg/lib/routines/routines.go index c6d2f77b35..fdb82fb2de 100644 --- a/pkg/lib/routines/routines.go +++ b/pkg/lib/routines/routines.go @@ -22,7 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/exit" ) -func GoRoutineWithPanicHandler(f func()) { +func RunWithPanicHandler(f func()) { go func() { defer func() { if r := recover(); r != nil { diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index cda670790a..d8806a8ad1 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -52,7 +52,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, err = applyK8sResources(api, prevVirtualService) if err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) }) return nil, "", err @@ -60,10 +60,10 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) }) - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) }) return nil, "", err @@ -157,7 +157,7 @@ func deleteS3Resources(apiName string) error { }, func() error { prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName) - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while }) return nil diff --git a/pkg/operator/resources/batchapi/job.go b/pkg/operator/resources/batchapi/job.go index 1850089059..a81192c5b0 100644 --- a/pkg/operator/resources/batchapi/job.go +++ b/pkg/operator/resources/batchapi/job.go @@ -128,7 +128,7 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err return nil, err } - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deployJob(apiSpec, &jobSpec, submission) }) @@ -259,14 +259,14 @@ func deleteJobRuntimeResources(jobKey spec.JobKey) error { func StopJob(jobKey spec.JobKey) error { jobState, err := getJobState(jobKey) if err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteJobRuntimeResources(jobKey) }) return err } if !jobState.Status.IsInProgress() { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteJobRuntimeResources(jobKey) }) return errors.Wrap(ErrorJobIsNotInProgress(), jobKey.UserString()) diff --git a/pkg/operator/resources/batchapi/logs.go b/pkg/operator/resources/batchapi/logs.go index 7a4bfa7600..59ddac019e 100644 --- a/pkg/operator/resources/batchapi/logs.go +++ b/pkg/operator/resources/batchapi/logs.go @@ -64,11 +64,11 @@ func ReadLogs(jobKey spec.JobKey, socket *websocket.Conn) { defer close(podCheckCancel) if jobStatus.Status.IsInProgress() { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { streamFromCloudWatch(jobStatus, podCheckCancel, socket) }) } else { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket) }) } diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 1188d035d8..902198cc48 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -68,7 +68,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A } if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) }) return nil, "", err @@ -77,7 +77,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A if config.Provider == types.AWSProviderType { err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) }) return nil, "", err diff --git a/pkg/operator/resources/realtimeapi/logs.go b/pkg/operator/resources/realtimeapi/logs.go index 66f69ef1ea..a84b90925f 100644 --- a/pkg/operator/resources/realtimeapi/logs.go +++ b/pkg/operator/resources/realtimeapi/logs.go @@ -57,7 +57,7 @@ type fluentdLog struct { func ReadLogs(apiName string, socket *websocket.Conn) { podCheckCancel := make(chan struct{}) defer close(podCheckCancel) - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { streamFromCloudWatch(apiName, podCheckCancel, socket) }) pumpStdin(socket) diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index b9e57b393e..0ed52d01f5 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -288,7 +288,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { } if deployedResource == nil { // Delete anyways just to be sure everything is deleted - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { err := parallel.RunFirstErr( func() error { return realtimeapi.DeleteAPI(apiName, keepCache) diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 45eb8e6136..4ba8a2ea55 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -45,7 +45,7 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) } if err := applyK8sVirtualService(api, prevVirtualService); err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) }) return nil, "", err @@ -53,7 +53,7 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) if err != nil { - routines.GoRoutineWithPanicHandler(func() { + routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) }) return nil, "", err From 8b32cb7ddaa37d9191a03f1579343bd0f5215940 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Thu, 31 Dec 2020 04:29:32 +0200 Subject: [PATCH 6/7] Address review requests --- cli/cluster/logs.go | 2 +- cli/cmd/lib_manager.go | 2 +- pkg/lib/cron/cron.go | 2 +- pkg/lib/routines/errors.go | 34 ------------------- pkg/lib/routines/routines.go | 13 ++++--- pkg/operator/resources/batchapi/api.go | 8 ++--- pkg/operator/resources/batchapi/job.go | 6 ++-- pkg/operator/resources/batchapi/logs.go | 4 +-- pkg/operator/resources/realtimeapi/api.go | 4 +-- pkg/operator/resources/realtimeapi/logs.go | 2 +- pkg/operator/resources/resources.go | 2 +- pkg/operator/resources/trafficsplitter/api.go | 4 +-- 12 files changed, 26 insertions(+), 57 deletions(-) delete mode 100644 pkg/lib/routines/errors.go diff --git a/cli/cluster/logs.go b/cli/cluster/logs.go index 0e6041b37c..5a980544f9 100644 --- a/cli/cluster/logs.go +++ b/cli/cluster/logs.go @@ -122,7 +122,7 @@ func handleConnection(connection *websocket.Conn, done chan struct{}) { } fmt.Println(string(message)) } - }) + }, false) } func closeConnection(connection *websocket.Conn, done chan struct{}, interrupt chan os.Signal) { diff --git a/cli/cmd/lib_manager.go b/cli/cmd/lib_manager.go index eef02c6747..7370f040d6 100644 --- a/cli/cmd/lib_manager.go +++ b/cli/cmd/lib_manager.go @@ -98,7 +98,7 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop caughtCtrlC = true removeContainer() exit.Error(ErrorDockerCtrlC()) - }) + }, false) for _, copyPath := range copyToPaths { err = docker.CopyToContainer(containerInfo.ID, copyPath.input, copyPath.containerPath) diff --git a/pkg/lib/cron/cron.go b/pkg/lib/cron/cron.go index 558d75b27e..741814f441 100644 --- a/pkg/lib/cron/cron.go +++ b/pkg/lib/cron/cron.go @@ -54,7 +54,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron { } timer.Reset(delay) } - }) + }, false) return Cron{ cronRun: cronRun, diff --git a/pkg/lib/routines/errors.go b/pkg/lib/routines/errors.go deleted file mode 100644 index b410dee3aa..0000000000 --- a/pkg/lib/routines/errors.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2020 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package routines - -import ( - "fmt" - - "github.com/cortexlabs/cortex/pkg/lib/errors" -) - -const ( - ErrUnexpectedError = "routines.unexpected_error" -) - -func ErrorUnexpectedError(msg string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrUnexpectedError, - Message: fmt.Sprintf("unexpected error occurred: %s", msg), - }) -} diff --git a/pkg/lib/routines/routines.go b/pkg/lib/routines/routines.go index fdb82fb2de..7dfba6181a 100644 --- a/pkg/lib/routines/routines.go +++ b/pkg/lib/routines/routines.go @@ -17,17 +17,20 @@ limitations under the License. package routines import ( - "fmt" - + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/exit" + "github.com/cortexlabs/cortex/pkg/lib/telemetry" ) -func RunWithPanicHandler(f func()) { +func RunWithPanicHandler(f func(), exitOnPanic bool) { go func() { defer func() { if r := recover(); r != nil { - err := ErrorUnexpectedError(fmt.Sprintf("%v", r)) - exit.Error(err) + err := errors.CastRecoverError(r) + if exitOnPanic { + exit.Error(err) + } + telemetry.Error(err) } }() f() diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index d8806a8ad1..8d3ea44a5e 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -54,7 +54,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, if err != nil { routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) - }) + }, false) return nil, "", err } @@ -62,10 +62,10 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, if err != nil { routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) - }) + }, false) routines.RunWithPanicHandler(func() { operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway) - }) + }, false) return nil, "", err } @@ -159,7 +159,7 @@ func deleteS3Resources(apiName string) error { prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName) routines.RunWithPanicHandler(func() { config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while - }) + }, false) return nil }, func() error { diff --git a/pkg/operator/resources/batchapi/job.go b/pkg/operator/resources/batchapi/job.go index a81192c5b0..90c85dcc05 100644 --- a/pkg/operator/resources/batchapi/job.go +++ b/pkg/operator/resources/batchapi/job.go @@ -130,7 +130,7 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err routines.RunWithPanicHandler(func() { deployJob(apiSpec, &jobSpec, submission) - }) + }, false) return &jobSpec, nil } @@ -261,14 +261,14 @@ func StopJob(jobKey spec.JobKey) error { if err != nil { routines.RunWithPanicHandler(func() { deleteJobRuntimeResources(jobKey) - }) + }, false) return err } if !jobState.Status.IsInProgress() { routines.RunWithPanicHandler(func() { deleteJobRuntimeResources(jobKey) - }) + }, false) return errors.Wrap(ErrorJobIsNotInProgress(), jobKey.UserString()) } diff --git a/pkg/operator/resources/batchapi/logs.go b/pkg/operator/resources/batchapi/logs.go index 59ddac019e..434ca2c0dd 100644 --- a/pkg/operator/resources/batchapi/logs.go +++ b/pkg/operator/resources/batchapi/logs.go @@ -66,11 +66,11 @@ func ReadLogs(jobKey spec.JobKey, socket *websocket.Conn) { if jobStatus.Status.IsInProgress() { routines.RunWithPanicHandler(func() { streamFromCloudWatch(jobStatus, podCheckCancel, socket) - }) + }, false) } else { routines.RunWithPanicHandler(func() { fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket) - }) + }, false) } pumpStdin(socket) diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 902198cc48..ef52006798 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -70,7 +70,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) - }) + }, false) return nil, "", err } @@ -79,7 +79,7 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A if err != nil { routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) - }) + }, false) return nil, "", err } err = addAPIToDashboard(config.Cluster.ClusterName, api.Name) diff --git a/pkg/operator/resources/realtimeapi/logs.go b/pkg/operator/resources/realtimeapi/logs.go index a84b90925f..461a0c9939 100644 --- a/pkg/operator/resources/realtimeapi/logs.go +++ b/pkg/operator/resources/realtimeapi/logs.go @@ -59,7 +59,7 @@ func ReadLogs(apiName string, socket *websocket.Conn) { defer close(podCheckCancel) routines.RunWithPanicHandler(func() { streamFromCloudWatch(apiName, podCheckCancel, socket) - }) + }, false) pumpStdin(socket) podCheckCancel <- struct{}{} } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 0ed52d01f5..31de86159f 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -309,7 +309,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { if err != nil { telemetry.Error(err) } - }) + }, false) return nil, ErrorAPINotDeployed(apiName) } diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 4ba8a2ea55..a4d0654cab 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -47,7 +47,7 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) if err := applyK8sVirtualService(api, prevVirtualService); err != nil { routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) - }) + }, false) return nil, "", err } @@ -55,7 +55,7 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) if err != nil { routines.RunWithPanicHandler(func() { deleteK8sResources(api.Name) - }) + }, false) return nil, "", err } return api, fmt.Sprintf("created %s", api.Resource.UserString()), nil From 829c7282a0a360f4fc59867e3215f9db4179ebb6 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 30 Dec 2020 21:11:28 -0800 Subject: [PATCH 7/7] Print stack trace --- pkg/lib/routines/routines.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/lib/routines/routines.go b/pkg/lib/routines/routines.go index 7dfba6181a..af7886a6c3 100644 --- a/pkg/lib/routines/routines.go +++ b/pkg/lib/routines/routines.go @@ -27,6 +27,7 @@ func RunWithPanicHandler(f func(), exitOnPanic bool) { defer func() { if r := recover(); r != nil { err := errors.CastRecoverError(r) + errors.PrintStacktrace(err) if exitOnPanic { exit.Error(err) }