Skip to content

Commit

Permalink
Fix NodePort mode in GKE. (#575)
Browse files Browse the repository at this point in the history
- open node ports in Firewall instead of service ports.
- Deleted firewall rule when Ingress is deleted.
- everything is logged with an ID to easily track relevant logs.
  • Loading branch information
tamalsaha committed Dec 13, 2017
1 parent 52d2fa4 commit 1e9e0a1
Show file tree
Hide file tree
Showing 40 changed files with 1,116 additions and 224 deletions.
12 changes: 7 additions & 5 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Expand Up @@ -60,6 +60,8 @@ import:
version: v4.0.0
- package: k8s.io/kubernetes
version: ~1.7.0
- package: github.com/google/uuid
version: master
testImport:
- package: github.com/stretchr/testify
version: v1.1.4
7 changes: 5 additions & 2 deletions pkg/certificate/controller.go
@@ -1,6 +1,7 @@
package certificate

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
Expand Down Expand Up @@ -42,10 +43,12 @@ type Controller struct {
acmeUser *ACMEUser
acmeClient *acme.Client
store *CertStore
logger *log.Logger
}

func NewController(kubeClient clientset.Interface, extClient acs.VoyagerV1beta1Interface, opt config.Options, tpr *api.Certificate) (*Controller, error) {
func NewController(ctx context.Context, kubeClient clientset.Interface, extClient acs.VoyagerV1beta1Interface, opt config.Options, tpr *api.Certificate) (*Controller, error) {
ctrl := &Controller{
logger: log.New(ctx),
KubeClient: kubeClient,
VoyagerClient: extClient,
Opt: opt,
Expand Down Expand Up @@ -161,7 +164,7 @@ func (c *Controller) getACMEClient() error {
registered := c.acmeUser.Registration != nil && c.acmeUser.Key != nil

if c.acmeUser.Key == nil {
log.Infoln("No ACME user found, registering a new ACME user")
c.logger.Infoln("No ACME user found, registering a new ACME user")
userKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return fmt.Errorf("failed to generate key for Acme User")
Expand Down
7 changes: 3 additions & 4 deletions pkg/certificate/controller_test.go
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

"github.com/appscode/go/log"
api "github.com/appscode/voyager/apis/voyager/v1beta1"
acf "github.com/appscode/voyager/client/fake"
"github.com/appscode/voyager/pkg/config"
Expand Down Expand Up @@ -119,7 +118,7 @@ func TestCreate(t *testing.T) {
list, err := fakeController.KubeClient.CoreV1().Secrets("").List(metav1.ListOptions{})
if err == nil {
for _, item := range list.Items {
log.Infoln("List for Secrets that created", item.Name, item.Namespace)
c.logger.Infoln("List for Secrets that created", item.Name, item.Namespace)
}
}

Expand All @@ -135,8 +134,8 @@ func TestCreate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
log.Infoln(cert.Status)
log.Infoln(cert.Status.LastIssuedCertificate)
c.logger.Infoln(cert.Status)
c.logger.Infoln(cert.Status.LastIssuedCertificate)
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/ingress/controller.go
@@ -1,10 +1,12 @@
package ingress

import (
"context"
"io/ioutil"
"os"
"sync"

"github.com/appscode/go/log"
"github.com/appscode/go/types"
v1u "github.com/appscode/kutil/core/v1"
api "github.com/appscode/voyager/apis/voyager/v1beta1"
Expand Down Expand Up @@ -47,10 +49,12 @@ type controller struct {
// contains raw configMap data parsed from the cfg file.
HAProxyConfig string

logger *log.Logger
sync.Mutex
}

func NewController(
ctx context.Context,
kubeClient clientset.Interface,
crdClient apiextensionsclient.Interface,
extClient acs.VoyagerV1beta1Interface,
Expand All @@ -61,11 +65,11 @@ func NewController(
ingress *api.Ingress) Controller {
switch ingress.LBType() {
case api.LBTypeHostPort:
return NewHostPortController(kubeClient, crdClient, extClient, promClient, serviceLister, endpointsLister, opt, ingress)
return NewHostPortController(ctx, kubeClient, crdClient, extClient, promClient, serviceLister, endpointsLister, opt, ingress)
case api.LBTypeNodePort:
return NewNodePortController(kubeClient, crdClient, extClient, promClient, serviceLister, endpointsLister, opt, ingress)
return NewNodePortController(ctx, kubeClient, crdClient, extClient, promClient, serviceLister, endpointsLister, opt, ingress)
case api.LBTypeLoadBalancer:
return NewLoadBalancerController(kubeClient, crdClient, extClient, promClient, serviceLister, endpointsLister, opt, ingress)
return NewLoadBalancerController(ctx, kubeClient, crdClient, extClient, promClient, serviceLister, endpointsLister, opt, ingress)
}
return nil
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/ingress/create.go
Expand Up @@ -5,7 +5,6 @@ import (
"reflect"

"github.com/appscode/go/errors"
"github.com/appscode/go/log"
api "github.com/appscode/voyager/apis/voyager/v1beta1"
kerr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -34,7 +33,7 @@ func (c *controller) ensureConfigMap() error {
"haproxy.cfg": c.HAProxyConfig,
},
}
log.Infof("Creating ConfigMap %s/%s", cm.Namespace, cm.Name)
c.logger.Infof("Creating ConfigMap %s/%s", cm.Namespace, cm.Name)
_, err = c.KubeClient.CoreV1().ConfigMaps(c.Ingress.Namespace).Create(cm)
return err
} else if err != nil {
Expand All @@ -56,7 +55,7 @@ func (c *controller) ensureConfigMap() error {
}

if needsUpdate {
log.Infof("Updating ConfigMap %s/%s", cm.Namespace, cm.Name)
c.logger.Infof("Updating ConfigMap %s/%s", cm.Namespace, cm.Name)
_, err = c.KubeClient.CoreV1().ConfigMaps(c.Ingress.Namespace).Update(cm)
if err != nil {
return errors.FromErr(err).Err()
Expand Down Expand Up @@ -143,20 +142,20 @@ func (c *controller) ensureStatsService() error {

s, err := c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Get(c.Ingress.StatsServiceName(), metav1.GetOptions{})
if kerr.IsNotFound(err) {
log.Infof("Creating Service %s/%s", svc.Namespace, svc.Name)
c.logger.Infof("Creating Service %s/%s", svc.Namespace, svc.Name)
_, err := c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Create(svc)
if err != nil {
return errors.FromErr(err).Err()
}
return err
} else if err != nil {
log.Errorln(err)
c.logger.Errorln(err)
return errors.FromErr(err).Err()
}
s.Labels = svc.Labels
s.Annotations = svc.Annotations
s.Spec = svc.Spec
log.Infof("Updating Service %s/%s", s.Namespace, s.Name)
c.logger.Infof("Updating Service %s/%s", s.Namespace, s.Name)
_, err = c.KubeClient.CoreV1().Services(s.Namespace).Update(s)
if err != nil {
return errors.FromErr(err).Err()
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingress/delete.go
Expand Up @@ -8,6 +8,7 @@ import (
)

func (c *controller) deleteConfigMap() error {
c.logger.Infof("Deleting ConfigMap %s/%s", c.Ingress.Namespace, c.Ingress.OffshootName())
err := c.KubeClient.CoreV1().ConfigMaps(c.Ingress.Namespace).Delete(c.Ingress.OffshootName(), &metav1.DeleteOptions{})
if err != nil && !kerr.IsNotFound(err) {
return errors.FromErr(err).Err()
Expand All @@ -21,6 +22,7 @@ func (c *controller) deletePodsForSelector(selector *metav1.LabelSelector) error
if err != nil {
return err
}
c.logger.Infof("Deleting Pods in namespace %s with label", c.Ingress.Namespace, r.String())
err = c.KubeClient.CoreV1().Pods(c.Ingress.Namespace).DeleteCollection(&metav1.DeleteOptions{
GracePeriodSeconds: types.Int64P(0),
}, metav1.ListOptions{
Expand All @@ -33,6 +35,7 @@ func (c *controller) deletePodsForSelector(selector *metav1.LabelSelector) error
}

func (c *controller) ensureServiceDeleted() error {
c.logger.Infof("Deleting Service %s/%s", c.Ingress.Namespace, c.Ingress.OffshootName())
err := c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Delete(c.Ingress.OffshootName(), &metav1.DeleteOptions{})
if err != nil && !kerr.IsNotFound(err) {
return errors.FromErr(err).Err()
Expand All @@ -41,6 +44,7 @@ func (c *controller) ensureServiceDeleted() error {
}

func (c *controller) ensureStatsServiceDeleted() error {
c.logger.Infof("Deleting Deployment %s/%s", c.Ingress.Namespace, c.Ingress.StatsServiceName())
err := c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Delete(
c.Ingress.StatsServiceName(),
&metav1.DeleteOptions{},
Expand Down
45 changes: 24 additions & 21 deletions pkg/ingress/hostport.go
@@ -1,6 +1,7 @@
package ingress

import (
"context"
"reflect"
"strconv"

Expand Down Expand Up @@ -35,6 +36,7 @@ type hostPortController struct {
var _ Controller = &hostPortController{}

func NewHostPortController(
ctx context.Context,
kubeClient clientset.Interface,
crdClient apiextensionsclient.Interface,
extClient acs.VoyagerV1beta1Interface,
Expand All @@ -43,8 +45,9 @@ func NewHostPortController(
endpointsLister core.EndpointsLister,
opt config.Options,
ingress *api.Ingress) Controller {
ctrl := &hostPortController{
c := &hostPortController{
controller: &controller{
logger: log.New(ctx),
KubeClient: kubeClient,
CRDClient: crdClient,
VoyagerClient: extClient,
Expand All @@ -56,29 +59,29 @@ func NewHostPortController(
recorder: eventer.NewEventRecorder(kubeClient, "voyager operator"),
},
}
log.Infoln("Initializing cloud manager for provider", opt.CloudProvider)
c.logger.Infoln("Initializing cloud manager for provider", opt.CloudProvider)
if opt.CloudProvider == "aws" || opt.CloudProvider == "gce" || opt.CloudProvider == "azure" {
cloudInterface, err := cloudprovider.InitCloudProvider(opt.CloudProvider, opt.CloudConfigFile)
if err != nil {
log.Errorln("Failed to initialize cloud provider:"+opt.CloudProvider, err)
c.logger.Errorln("Failed to initialize cloud provider:"+opt.CloudProvider, err)
} else {
log.Infoln("Initialized cloud provider: "+opt.CloudProvider, cloudInterface)
ctrl.CloudManager = cloudInterface
c.logger.Infoln("Initialized cloud provider: "+opt.CloudProvider, cloudInterface)
c.CloudManager = cloudInterface
}
} else if opt.CloudProvider == "gke" {
cloudInterface, err := cloudprovider.InitCloudProvider("gce", opt.CloudConfigFile)
if err != nil {
log.Errorln("Failed to initialize cloud provider:"+opt.CloudProvider, err)
c.logger.Errorln("Failed to initialize cloud provider:"+opt.CloudProvider, err)
} else {
log.Infoln("Initialized cloud provider: "+opt.CloudProvider, cloudInterface)
ctrl.CloudManager = cloudInterface
c.logger.Infoln("Initialized cloud provider: "+opt.CloudProvider, cloudInterface)
c.CloudManager = cloudInterface
}
} else if opt.CloudProvider == "minikube" {
ctrl.CloudManager = &fakecloudprovider.FakeCloud{}
c.CloudManager = &fakecloudprovider.FakeCloud{}
} else {
log.Infoln("No cloud manager found for provider", opt.CloudProvider)
c.logger.Infoln("No cloud manager found for provider", opt.CloudProvider)
}
return ctrl
return c
}

func (c *hostPortController) IsExists() bool {
Expand Down Expand Up @@ -413,22 +416,22 @@ func (c *hostPortController) EnsureFirewall(svc *apiv1.Service) error {
func (c *hostPortController) Delete() {
err := c.deletePods()
if err != nil {
log.Errorln(err)
c.logger.Errorln(err)
}
err = c.deleteConfigMap()
if err != nil {
log.Errorln(err)
c.logger.Errorln(err)
}
if c.Opt.EnableRBAC {
if err := c.ensureRBACDeleted(); err != nil {
log.Errorln(err)
c.logger.Errorln(err)
}
}

// delete service
err = c.ensureServiceDeleted()
if err != nil {
log.Errorln(err)
c.logger.Errorln(err)
}

if c.CloudManager != nil {
Expand All @@ -440,7 +443,7 @@ func (c *hostPortController) Delete() {
},
})
if err != nil {
log.Errorln(err)
c.logger.Errorln(err)
}
}
}
Expand All @@ -450,7 +453,7 @@ func (c *hostPortController) Delete() {
}
monSpec, err := c.Ingress.MonitorSpec()
if err != nil {
log.Errorln(err)
c.logger.Errorln(err)
}
if monSpec != nil && monSpec.Prometheus != nil {
ctrl := monitor.NewPrometheusController(c.KubeClient, c.CRDClient, c.PromClient)
Expand Down Expand Up @@ -503,13 +506,13 @@ func (c *hostPortController) ensureService(old *api.Ingress) (*apiv1.Service, er
desired := c.newService()
current, err := c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Get(desired.Name, metav1.GetOptions{})
if kerr.IsNotFound(err) {
log.Infof("Creating Service %s/%s", desired.Namespace, desired.Name)
c.logger.Infof("Creating Service %s/%s", desired.Namespace, desired.Name)
return c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Create(desired)
} else if err != nil {
return nil, err
}
if svc, needsUpdate := c.serviceRequiresUpdate(current, desired, old); needsUpdate {
log.Infof("Updating Service %s/%s", desired.Namespace, desired.Name)
c.logger.Infof("Updating Service %s/%s", desired.Namespace, desired.Name)
return c.KubeClient.CoreV1().Services(c.Ingress.Namespace).Update(svc)
}
return current, nil
Expand Down Expand Up @@ -619,7 +622,7 @@ func (c *hostPortController) ensurePods(old *api.Ingress) (*apps.Deployment, err
desired := c.newPods()
current, err := c.KubeClient.AppsV1beta1().Deployments(c.Ingress.Namespace).Get(desired.Name, metav1.GetOptions{})
if kerr.IsNotFound(err) {
log.Infof("Creating Deployment %s/%s", desired.Namespace, desired.Name)
c.logger.Infof("Creating Deployment %s/%s", desired.Namespace, desired.Name)
return c.KubeClient.AppsV1beta1().Deployments(c.Ingress.Namespace).Create(desired)
} else if err != nil {
return nil, err
Expand Down Expand Up @@ -692,7 +695,7 @@ func (c *hostPortController) ensurePods(old *api.Ingress) (*apps.Deployment, err
current.Spec.Template.Spec.ServiceAccountName = desired.Spec.Template.Spec.ServiceAccountName
}
if needsUpdate {
log.Infof("Updating Deployment %s/%s", desired.Namespace, desired.Name)
c.logger.Infof("Updating Deployment %s/%s", desired.Namespace, desired.Name)
current, err = c.KubeClient.AppsV1beta1().Deployments(c.Ingress.Namespace).Update(current)
return current, err
}
Expand Down

0 comments on commit 1e9e0a1

Please sign in to comment.