Skip to content

Commit

Permalink
ref(main.go,mode): eagerly create grpc conn to tiller (#208)
Browse files Browse the repository at this point in the history
Fixes #140
  • Loading branch information
arschles committed Sep 28, 2016
1 parent 9a5970d commit bfe621c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 38 deletions.
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ func main() {
ctx, cancelFn := context.WithCancel(rootCtx)
defer cancelFn()

if err := modeutils.Run(ctx, httpCl, cfg.Mode, cfg.BrokerName, errCh, cfg.WatchNamespaces); err != nil {
cleanup, err := modeutils.Run(ctx, httpCl, cfg.Mode, cfg.BrokerName, errCh, cfg.WatchNamespaces)
if err != nil {
logger.Criticalf("Error starting %s mode: %s", cfg.Mode, err)
exitWithCode(cancelFn, 1)
}
defer cleanup()

// Start the API server
go api.Serve(errCh)
Expand Down
23 changes: 15 additions & 8 deletions mode/helm/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,50 @@ import (
"net/http"

"github.com/deis/steward/mode"
"google.golang.org/grpc"
"k8s.io/client-go/1.4/kubernetes/typed/core/v1"
)

// GetComponents returns suitable implementations of the Cataloger and Lifecycler interfaces
// GetComponents returns tiller-backed implementations of the Cataloger and Lifecycler interfaces. Also returns the underlying gRPC connection used for Tiller communications. Callers should close this connection when done communicating with tiller
func GetComponents(
ctx context.Context,
httpCl *http.Client,
cmNamespacer v1.ConfigMapsGetter,
) (mode.Cataloger, *mode.Lifecycler, error) {
) (*grpc.ClientConn, mode.Cataloger, *mode.Lifecycler, error) {
helmCfg, err := getConfig()
if err != nil {
logger.Errorf("getting helm config (%s)", err)
return nil, nil, err
return nil, nil, nil, err
}
logger.Infof("starting in Helm mode with Tiller backend at %s:%d", helmCfg.TillerIP, helmCfg.TillerPort)

provBehavior, err := provisionBehaviorFromString(helmCfg.ProvisionBehavior)
if err != nil {
logger.Errorf("parsing provision behavior from '%s' (%s)", helmCfg.ProvisionBehavior, err)
return nil, nil, err
return nil, nil, nil, err
}

chart, _, err := getChart(ctx, httpCl, helmCfg.ChartURL)
if err != nil {
logger.Errorf("getting chart from %s (%s)", helmCfg.ChartURL, err)
return nil, nil, err
return nil, nil, nil, err
}

tillerHost := fmt.Sprintf("%s:%d", helmCfg.TillerIP, helmCfg.TillerPort)
creatorDeleter := newTillerReleaseCreatorDeleter(tillerHost)
conn, err := grpc.Dial(tillerHost, grpc.WithInsecure())
if err != nil {
return nil, nil, nil, err
}

creatorDeleter := newTillerReleaseCreatorDeleter(conn)

cataloger := newCataloger(helmCfg)
lifecycler, err := newLifecycler(ctx, chart, helmCfg.ChartInstallNS, provBehavior, creatorDeleter, cmNamespacer)
if err != nil {
logger.Errorf("creating a new helm mode lifecycler (%s)", err)
return nil, nil, err
conn.Close()
return nil, nil, nil, err
}

return cataloger, lifecycler, nil
return conn, cataloger, lifecycler, nil
}
20 changes: 5 additions & 15 deletions mode/helm/tiller_release_creator_deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@ import (
)

type tillerRCD struct {
tillerHost string
conn *grpc.ClientConn
}

func (t tillerRCD) Create(ch *chart.Chart, installNS string) (*rls.InstallReleaseResponse, error) {
c, err := grpc.Dial(t.tillerHost, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer c.Close()
rlsCl := rls.NewReleaseServiceClient(c)
rlsCl := rls.NewReleaseServiceClient(t.conn)
ctx := context.Background()
req := &rls.InstallReleaseRequest{
Chart: ch,
Expand All @@ -30,12 +25,7 @@ func (t tillerRCD) Create(ch *chart.Chart, installNS string) (*rls.InstallReleas
}

func (t tillerRCD) Delete(relName string) (*rls.UninstallReleaseResponse, error) {
c, err := grpc.Dial(t.tillerHost, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer c.Close()
rlsCl := rls.NewReleaseServiceClient(c)
rlsCl := rls.NewReleaseServiceClient(t.conn)
ctx := context.Background()
req := &rls.UninstallReleaseRequest{
Name: relName,
Expand All @@ -46,6 +36,6 @@ func (t tillerRCD) Delete(relName string) (*rls.UninstallReleaseResponse, error)
}

// newTillerReleaseCreatorDeleter returns a new ReleaseCreatorDeleter implemented with a tiller backend
func newTillerReleaseCreatorDeleter(tillerHost string) ReleaseCreatorDeleter {
return tillerRCD{tillerHost: tillerHost}
func newTillerReleaseCreatorDeleter(conn *grpc.ClientConn) ReleaseCreatorDeleter {
return tillerRCD{conn: conn}
}
33 changes: 19 additions & 14 deletions mode/utils/mode_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/deis/steward/mode/cf"
"github.com/deis/steward/mode/cmd"
"github.com/deis/steward/mode/helm"
"google.golang.org/grpc"
"k8s.io/client-go/1.4/kubernetes"
"k8s.io/client-go/1.4/pkg/api"
"k8s.io/client-go/1.4/pkg/api/errors"
Expand All @@ -24,49 +25,53 @@ const (
)

// Run publishes the underlying broker's service offerings to the catalog, then starts Steward's
// control loop in the specified mode.
// control loop in the specified mode. returns a function that should be used by the caller to clean up, should the run stop. this function will be
func Run(
ctx context.Context,
httpCl *http.Client,
modeStr string,
brokerName string,
errCh chan<- error,
namespaces []string,
) error {
) (func(), error) {

config, err := rest.InClusterConfig()
if err != nil {
return err
return nil, err
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return errGettingK8sClient{Original: err}
return nil, errGettingK8sClient{Original: err}
}

cleanupFunc := func() {}

var cataloger mode.Cataloger
var lifecycler *mode.Lifecycler
// Get the right implementations of mode.Cataloger and mode.Lifecycler
switch modeStr {
case cfMode:
cataloger, lifecycler, err = cf.GetComponents(ctx, httpCl)
if err != nil {
return err
return nil, err
}
case helmMode:
var err error
cataloger, lifecycler, err = helm.GetComponents(ctx, httpCl, k8sClient)
// ignore the returned connection. we're going to hold onto it for the whole lifespan of this steward, so we don't need to close it
var conn *grpc.ClientConn
conn, cataloger, lifecycler, err = helm.GetComponents(ctx, httpCl, k8sClient)
if err != nil {
return err
return nil, err
}
cleanupFunc = func() { conn.Close() }
case cmdMode:
fallthrough
case commandMode:
cataloger, lifecycler, err = cmd.GetComponents(k8sClient)
if err != nil {
return err
return nil, err
}
default:
return errUnrecognizedMode{mode: modeStr}
return nil, errUnrecognizedMode{mode: modeStr}
}

// Everything else does not vary by mode...
Expand All @@ -77,25 +82,25 @@ func Run(

_, err = tpr.Create(k8s.ServiceCatalog3PR)
if err != nil && !errors.IsAlreadyExists(err) {
return errCreatingThirdPartyResource{Original: err}
return nil, errCreatingThirdPartyResource{Original: err}
}

catalogInteractor := k8s.NewK8sServiceCatalogInteractor(k8sClient.CoreClient.RESTClient)
published, err := publishCatalog(brokerName, cataloger, catalogInteractor)
if err != nil {
return errPublishingServiceCatalog{Original: err}
return nil, errPublishingServiceCatalog{Original: err}
}
logger.Infof("published %d entries into the service catalog", len(published))

evtNamespacer := claim.NewConfigMapsInteractorNamespacer(k8sClient)
lookup, err := k8s.FetchServiceCatalogLookup(catalogInteractor)
if err != nil {
return errGettingServiceCatalogLookupTable{Original: err}
return nil, errGettingServiceCatalogLookupTable{Original: err}
}
logger.Infof("created service catalog lookup with %d items", lookup.Len())
claim.StartControlLoops(ctx, evtNamespacer, k8sClient, *lookup, lifecycler, namespaces, errCh)

return nil
return cleanupFunc, nil
}

// Does the following:
Expand Down

0 comments on commit bfe621c

Please sign in to comment.