Skip to content
5 changes: 3 additions & 2 deletions cli/cluster/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/exit"
"github.com/cortexlabs/cortex/pkg/lib/json"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/operator/schema"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -112,7 +113,7 @@ func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[strin
}

func handleConnection(connection *websocket.Conn, done chan struct{}) {
go func() {
routines.RunWithPanicHandler(func() {
defer close(done)
for {
_, message, err := connection.ReadMessage()
Expand All @@ -121,7 +122,7 @@ func handleConnection(connection *websocket.Conn, done chan struct{}) {
}
fmt.Println(string(message))
}
}()
}, false)
}

func closeConnection(connection *websocket.Conn, done chan struct{}, interrupt chan os.Signal) {
Expand Down
6 changes: 4 additions & 2 deletions cli/cmd/lib_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/exit"
"github.com/cortexlabs/cortex/pkg/lib/files"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/yaml"
dockertypes "github.com/docker/docker/api/types"
Expand Down Expand Up @@ -91,12 +92,13 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
caughtCtrlC := false
go func() {

routines.RunWithPanicHandler(func() {
<-c
caughtCtrlC = true
removeContainer()
exit.Error(ErrorDockerCtrlC())
}()
}, false)

for _, copyPath := range copyToPaths {
err = docker.CopyToContainer(containerInfo.ID, copyPath.input, copyPath.containerPath)
Expand Down
5 changes: 3 additions & 2 deletions pkg/lib/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/routines"
)

type Cron struct {
Expand All @@ -39,7 +40,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron {
}
}

go func() {
routines.RunWithPanicHandler(func() {
timer := time.NewTimer(0)
defer timer.Stop()
for {
Expand All @@ -53,7 +54,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron {
}
timer.Reset(delay)
}
}()
}, false)

return Cron{
cronRun: cronRun,
Expand Down
5 changes: 5 additions & 0 deletions pkg/lib/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func Run(fn func() error, fns ...func() error) []error {
}

go func() {
defer func() {
if r := recover(); r != nil {
errChannel <- errors.CastRecoverError(r)
}
}()
errChannel <- fn()
}()
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/lib/routines/routines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2020 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package routines

import (
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/exit"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
)

func RunWithPanicHandler(f func(), exitOnPanic bool) {
go func() {
defer func() {
if r := recover(); r != nil {
err := errors.CastRecoverError(r)
errors.PrintStacktrace(err)
if exitOnPanic {
exit.Error(err)
}
telemetry.Error(err)
}
}()
f()
}()
}
17 changes: 13 additions & 4 deletions pkg/operator/resources/batchapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/operator/operator"
Expand Down Expand Up @@ -51,14 +52,20 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string,

err = applyK8sResources(api, prevVirtualService)
if err != nil {
go deleteK8sResources(api.Name)
routines.RunWithPanicHandler(func() {
deleteK8sResources(api.Name)
}, false)
return nil, "", err
}

err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
if err != nil {
go deleteK8sResources(api.Name)
go operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
routines.RunWithPanicHandler(func() {
deleteK8sResources(api.Name)
}, false)
routines.RunWithPanicHandler(func() {
operator.RemoveAPIFromAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
}, false)
return nil, "", err
}

Expand Down Expand Up @@ -150,7 +157,9 @@ func deleteS3Resources(apiName string) error {
},
func() error {
prefix := spec.BatchAPIJobPrefix(apiName, config.Cluster.ClusterName)
go config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while
routines.RunWithPanicHandler(func() {
config.AWS.DeleteS3Dir(config.Cluster.Bucket, prefix, true) // deleting job files may take a while
}, false)
return nil
},
func() error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/operator/resources/batchapi/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/operator/operator"
Expand Down Expand Up @@ -127,7 +128,9 @@ func SubmitJob(apiName string, submission *schema.JobSubmission) (*spec.Job, err
return nil, err
}

go deployJob(apiSpec, &jobSpec, submission)
routines.RunWithPanicHandler(func() {
deployJob(apiSpec, &jobSpec, submission)
}, false)

return &jobSpec, nil
}
Expand Down Expand Up @@ -256,12 +259,16 @@ func deleteJobRuntimeResources(jobKey spec.JobKey) error {
func StopJob(jobKey spec.JobKey) error {
jobState, err := getJobState(jobKey)
if err != nil {
go deleteJobRuntimeResources(jobKey)
routines.RunWithPanicHandler(func() {
deleteJobRuntimeResources(jobKey)
}, false)
return err
}

if !jobState.Status.IsInProgress() {
go deleteJobRuntimeResources(jobKey)
routines.RunWithPanicHandler(func() {
deleteJobRuntimeResources(jobKey)
}, false)
return errors.Wrap(ErrorJobIsNotInProgress(), jobKey.UserString())
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/operator/resources/batchapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/cache"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
Expand Down Expand Up @@ -63,9 +64,13 @@ func ReadLogs(jobKey spec.JobKey, socket *websocket.Conn) {
defer close(podCheckCancel)

if jobStatus.Status.IsInProgress() {
go streamFromCloudWatch(jobStatus, podCheckCancel, socket)
routines.RunWithPanicHandler(func() {
streamFromCloudWatch(jobStatus, podCheckCancel, socket)
}, false)
} else {
go fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket)
routines.RunWithPanicHandler(func() {
fetchLogsFromCloudWatch(jobStatus, podCheckCancel, socket)
}, false)
}

pumpStdin(socket)
Expand Down
9 changes: 7 additions & 2 deletions pkg/operator/resources/realtimeapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/operator/operator"
"github.com/cortexlabs/cortex/pkg/operator/schema"
Expand Down Expand Up @@ -67,14 +68,18 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A
}

if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil {
go deleteK8sResources(api.Name)
routines.RunWithPanicHandler(func() {
deleteK8sResources(api.Name)
}, false)
return nil, "", err
}

if config.Provider == types.AWSProviderType {
err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
if err != nil {
go deleteK8sResources(api.Name)
routines.RunWithPanicHandler(func() {
deleteK8sResources(api.Name)
}, false)
return nil, "", err
}
err = addAPIToDashboard(config.Cluster.ClusterName, api.Name)
Expand Down
5 changes: 4 additions & 1 deletion pkg/operator/resources/realtimeapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/cache"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
Expand Down Expand Up @@ -56,7 +57,9 @@ type fluentdLog struct {
func ReadLogs(apiName string, socket *websocket.Conn) {
podCheckCancel := make(chan struct{})
defer close(podCheckCancel)
go streamFromCloudWatch(apiName, podCheckCancel, socket)
routines.RunWithPanicHandler(func() {
streamFromCloudWatch(apiName, podCheckCancel, socket)
}, false)
pumpStdin(socket)
podCheckCancel <- struct{}{}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/operator/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/hash"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/operator/operator"
Expand Down Expand Up @@ -287,7 +288,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) {
}
if deployedResource == nil {
// Delete anyways just to be sure everything is deleted
go func() {
routines.RunWithPanicHandler(func() {
err := parallel.RunFirstErr(
func() error {
return realtimeapi.DeleteAPI(apiName, keepCache)
Expand All @@ -308,7 +309,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) {
if err != nil {
telemetry.Error(err)
}
}()
}, false)
return nil, ErrorAPINotDeployed(apiName)
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/operator/resources/trafficsplitter/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/parallel"
"github.com/cortexlabs/cortex/pkg/lib/routines"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/operator/operator"
"github.com/cortexlabs/cortex/pkg/operator/schema"
Expand All @@ -44,13 +45,17 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error)
}

if err := applyK8sVirtualService(api, prevVirtualService); err != nil {
go deleteK8sResources(api.Name)
routines.RunWithPanicHandler(func() {
deleteK8sResources(api.Name)
}, false)
return nil, "", err
}

err = operator.AddAPIToAPIGateway(*api.Networking.Endpoint, api.Networking.APIGateway)
if err != nil {
go deleteK8sResources(api.Name)
routines.RunWithPanicHandler(func() {
deleteK8sResources(api.Name)
}, false)
return nil, "", err
}
return api, fmt.Sprintf("created %s", api.Resource.UserString()), nil
Expand Down