Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support status sync for Gateway API resources #752

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 71 additions & 5 deletions pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"istio.io/istio/pilot/pkg/server"
"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pilot/pkg/serviceregistry/serviceentry"
"istio.io/istio/pilot/pkg/xds"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
Expand Down Expand Up @@ -137,6 +138,7 @@ type Server struct {
multiclusterController *multicluster.Controller
configController model.ConfigStoreController
configStores []model.ConfigStoreController
serviceEntryController *serviceentry.Controller
httpServer *http.Server
httpMux *http.ServeMux
grpcServer *grpc.Server
Expand All @@ -148,6 +150,9 @@ type Server struct {
var (
PodNamespace = env.RegisterStringVar("POD_NAMESPACE", "higress-system", "").Get()
PodName = env.RegisterStringVar("POD_NAME", "", "").Get()
// Revision is the value of the Istio control plane revision, e.g. "canary",
// and is the value used by the "istio.io/rev" label.
Revision = env.Register("REVISION", "", "").Get()
)

func NewServer(args *ServerArgs) (*Server, error) {
Expand All @@ -174,7 +179,7 @@ func NewServer(args *ServerArgs) (*Server, error) {
s.initKubeClient,
s.initXdsServer,
s.initHttpServer,
s.initConfigController,
s.initControllers,
s.initRegistryEventHandlers,
s.initAuthenticators,
}
Expand All @@ -197,6 +202,10 @@ func NewServer(args *ServerArgs) (*Server, error) {
return s, nil
}

func (s *Server) ServiceController() *aggregate.Controller {
return s.environment.ServiceDiscovery.(*aggregate.Controller)
}

// initRegistryEventHandlers sets up event handlers for config updates
func (s *Server) initRegistryEventHandlers() error {
log.Info("initializing registry event handlers")
Expand All @@ -220,15 +229,14 @@ func (s *Server) initRegistryEventHandlers() error {
return nil
}

func (s *Server) initConfigController() error {
ns := PodNamespace
func (s *Server) initControllers() error {
options := common.Options{
Enable: true,
ClusterId: s.RegistryOptions.KubeOptions.ClusterID,
IngressClass: s.IngressClass,
WatchNamespace: s.WatchNamespace,
EnableStatus: s.EnableStatus,
SystemNamespace: ns,
SystemNamespace: PodNamespace,
GatewaySelectorKey: s.GatewaySelectorKey,
GatewaySelectorValue: s.GatewaySelectorValue,
}
Expand All @@ -238,8 +246,17 @@ func (s *Server) initConfigController() error {

s.initMulticluster(options)
s.initSDSServer(options)
if err := s.initConfigController(options); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
if err := s.initServiceControllers(options); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}

ingressConfig := translation.NewIngressTranslation(s.kubeClient, s.xdsServer, ns, options.ClusterId)
func (s *Server) initConfigController(options common.Options) error {
ingressConfig := translation.NewIngressTranslation(s.environment, s.kubeClient, s.xdsServer, options.SystemNamespace, options.ClusterId)
ingressConfig.AddLocalCluster(options)

s.configStores = append(s.configStores, ingressConfig)
Expand All @@ -262,6 +279,7 @@ func (s *Server) initConfigController() error {
go s.configController.Run(stop)
return nil
})

return nil
}

Expand Down Expand Up @@ -290,6 +308,54 @@ func (s *Server) initSDSServer(options common.Options) {
s.environment.CredentialsController = creds
}

func (s *Server) initServiceControllers(options common.Options) error {
serviceControllers := s.ServiceController()

s.serviceEntryController = serviceentry.NewController(
s.configController, s.xdsServer,
serviceentry.WithClusterID(s.RegistryOptions.KubeOptions.ClusterID),
)
serviceControllers.AddRegistry(s.serviceEntryController)

if err := s.initKubeRegistry(options); err != nil {
return err
}

// Defer running of the service controllers.
s.server.RunComponent("service controllers", func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})

return nil
}

// initKubeRegistry creates all the k8s service controllers under this pilot
func (s *Server) initKubeRegistry(options common.Options) (err error) {
s.RegistryOptions.KubeOptions.ClusterID = options.ClusterId
s.RegistryOptions.KubeOptions.Metrics = s.environment
s.RegistryOptions.KubeOptions.XDSUpdater = s.xdsServer
s.RegistryOptions.KubeOptions.MeshNetworksWatcher = s.environment.NetworksWatcher
s.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
s.RegistryOptions.KubeOptions.SystemNamespace = options.SystemNamespace
s.RegistryOptions.KubeOptions.MeshServiceController = s.ServiceController()
// pass namespace to k8s service registry
s.RegistryOptions.KubeOptions.DiscoveryNamespacesFilter = s.multiclusterController.DiscoveryNamespacesFilter
s.multiclusterController.AddHandler(kubecontroller.NewMulticluster(PodName,
s.kubeClient.Kube(),
s.RegistryOptions.ClusterRegistriesNamespace,
s.RegistryOptions.KubeOptions,
s.serviceEntryController,
s.configController,
//s.istiodCertBundleWatcher,
nil,
Revision,
false,
s.environment.ClusterLocal(),
s.server))

return
}
func (s *Server) Start(stop <-chan struct{}) error {
if err := s.multiclusterController.Run(stop); err != nil {
return err
Expand Down
13 changes: 8 additions & 5 deletions pkg/ingress/config/ingress_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type IngressConfig struct {
ingressRouteCache model.IngressRouteCollection
ingressDomainCache model.IngressDomainCollection

environment *istiomodel.Environment
localKubeClient kube.Client

virtualServiceHandlers []istiomodel.EventHandler
Expand Down Expand Up @@ -152,13 +153,15 @@ type IngressConfig struct {
clusterId cluster.ID
}

func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater, namespace string, clusterId cluster.ID) *IngressConfig {
func NewIngressConfig(environment *istiomodel.Environment, localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater,
namespace string, clusterId cluster.ID) *IngressConfig {
if clusterId == "Kubernetes" {
clusterId = ""
}
config := &IngressConfig{
remoteIngressControllers: make(map[cluster.ID]common.IngressController),
remoteGatewayControllers: make(map[cluster.ID]common.GatewayController),
environment: environment,
localKubeClient: localKubeClient,
XDSUpdater: xdsUpdater,
annotationHandler: annotations.NewAnnotationHandlerManager(),
Expand Down Expand Up @@ -234,7 +237,7 @@ func (m *IngressConfig) AddLocalCluster(options common.Options) {
}
m.remoteIngressControllers[options.ClusterId] = ingressController

m.remoteGatewayControllers[options.ClusterId] = gateway.NewController(m.localKubeClient, options)
m.remoteGatewayControllers[options.ClusterId] = gateway.NewController(m.environment, m.localKubeClient, options)
}

func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) []config.Config {
Expand Down Expand Up @@ -1477,9 +1480,9 @@ func (m *IngressConfig) Run(stop <-chan struct{}) {
_ = remoteGatewayController.SetWatchErrorHandler(m.watchErrorHandler)
go remoteGatewayController.Run(stop)
}
//go m.mcpbridgeController.Run(stop)
//go m.wasmPluginController.Run(stop)
//go m.http2rpcController.Run(stop)
go m.mcpbridgeController.Run(stop)
go m.wasmPluginController.Run(stop)
go m.http2rpcController.Run(stop)
go m.configmapMgr.HigressConfigController.Run(stop)
}

Expand Down
42 changes: 30 additions & 12 deletions pkg/ingress/kube/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
kubecredentials "istio.io/istio/pilot/pkg/credentials/kube"
"istio.io/istio/pilot/pkg/model"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pilot/pkg/status"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/collection"
Expand All @@ -45,14 +46,16 @@ type gatewayController struct {
destinationRuleHandlers []model.EventHandler
envoyFilterHandlers []model.EventHandler

environment *model.Environment
store model.ConfigStoreController
credsController credentials.MulticlusterController
istioController *istiogateway.Controller
statusManager *status.Manager

resourceUpToDate atomic.Bool
}

func NewController(client kube.Client, options common.Options) common.GatewayController {
func NewController(environment *model.Environment, client kube.Client, options common.Options) common.GatewayController {
domainSuffix := util.GetDomainSuffix()
opts := crdclient.Option{
Revision: bootstrap.Revision,
Expand All @@ -71,22 +74,25 @@ func NewController(client kube.Client, options common.Options) common.GatewayCon
clusterId := options.ClusterId
credsController := kubecredentials.NewMulticluster(clusterId)
credsController.ClusterAdded(&multicluster.Cluster{ID: clusterId, Client: client}, nil)
istioController := istiogateway.NewController(client, store, client.CrdWatcher().WaitForCRD, credsController, kubecontroller.Options{DomainSuffix: domainSuffix})
istioController := istiogateway.NewController(environment, client, store, client.CrdWatcher().WaitForCRD, credsController, kubecontroller.Options{DomainSuffix: domainSuffix})
if options.GatewaySelectorKey != "" {
istioController.DefaultGatewaySelector = map[string]string{options.GatewaySelectorKey: options.GatewaySelectorValue}
}

var statusManager *status.Manager = nil
if options.EnableStatus {
// TODO: Add status sync support
//istioController.SetStatusWrite(true,)
statusManager = status.NewManager(store)
istioController.SetStatusWrite(true, statusManager)
} else {
IngressLog.Infof("Disable status update for cluster %s", clusterId)
}

return &gatewayController{
environment: environment,
store: store,
credsController: credsController,
istioController: istioController,
statusManager: statusManager,
}
}

Expand All @@ -100,10 +106,7 @@ func (g *gatewayController) Get(typ config.GroupVersionKind, name, namespace str

func (g *gatewayController) List(typ config.GroupVersionKind, namespace string) []config.Config {
if g.resourceUpToDate.CompareAndSwap(false, true) {
err := g.istioController.Reconcile(model.NewPushContext())
if err != nil {
IngressLog.Errorf("failed to recompute Gateway API resources: %v", err)
}
_ = g.reconcile()
}
return g.istioController.List(typ, namespace)
}
Expand Down Expand Up @@ -148,6 +151,9 @@ func (g *gatewayController) Run(stop <-chan struct{}) {
})
go g.store.Run(stop)
go g.istioController.Run(stop)
if g.statusManager != nil {
g.statusManager.Start(stop)
}
}

func (g *gatewayController) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
Expand All @@ -158,14 +164,26 @@ func (g *gatewayController) SetWatchErrorHandler(f func(r *cache.Reflector, err
func (g *gatewayController) HasSynced() bool {
ret := g.istioController.HasSynced()
if ret {
err := g.istioController.Reconcile(model.NewPushContext())
if err != nil {
IngressLog.Errorf("failed to recompute Gateway API resources: %v", err)
}
_ = g.reconcile()
}
return ret
}

func (g *gatewayController) reconcile() error {
//ps := model.NewPushContext()
//if err := ps.InitContext(g.environment, nil, nil); err != nil {
// IngressLog.Errorf("failed to init PushContext. use empty PushContext instead: %v", err)
//}
err := g.istioController.Reconcile(model.NewPushContext())
if err != nil {
IngressLog.Errorf("failed to recompute Gateway API resources: %v", err)
}
return err
}

//func (g *gatewayController) initPushContext(ps *model.PushContext) error {
//}

func (g *gatewayController) onEvent(prev config.Config, curr config.Config, event model.Event) {
g.resourceUpToDate.Store(false)

Expand Down
45 changes: 38 additions & 7 deletions pkg/ingress/kube/gateway/istio/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Updated based on Istio codebase by Higress

package istio

import (
Expand All @@ -32,10 +34,11 @@ import (
// GatewayContext contains a minimal subset of push context functionality to be exposed to GatewayAPIControllers
type GatewayContext struct {
ps *model.PushContext
si *serviceIndex
}

func NewGatewayContext(ps *model.PushContext) GatewayContext {
return GatewayContext{ps}
func NewGatewayContext(ps *model.PushContext, si *serviceIndex) GatewayContext {
return GatewayContext{ps, si}
}

// ResolveGatewayInstances attempts to resolve all instances that a gateway will be exposed on.
Expand All @@ -50,6 +53,9 @@ func NewGatewayContext(ps *model.PushContext) GatewayContext {
func (gc GatewayContext) ResolveGatewayInstances(
namespace string,
gwsvcs []string,
// Start - Updated by Higress
gatewaySelector map[string]string,
// End - Updated by Higress
servers []*networking.Server,
) (internal, external, pending, warns []string) {
ports := map[int]struct{}{}
Expand All @@ -60,11 +66,30 @@ func (gc GatewayContext) ResolveGatewayInstances(
foundExternal := sets.New[string]()
foundPending := sets.New[string]()
warnings := []string{}
// Start - Added by Higress
if gatewaySelector != nil && len(gatewaySelector) != 0 {
gwsvcs = append([]string{}, gwsvcs...)
for _, svc := range gc.si.all {
matches := true
for k, v := range gatewaySelector {
if svc.Attributes.Labels[k] != v {
matches = false
break
}
}
if matches {
gwsvcs = append(gwsvcs, string(svc.Hostname))
}
}
}
// End - Added by Higress
for _, g := range gwsvcs {
svc, f := gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)][namespace]
// Start - Updated by Higress
svc, f := gc.si.HostnameAndNamespace[host.Name(g)][namespace]
// End - Updated by Higress
if !f {
otherNamespaces := []string{}
for ns := range gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)] {
for ns := range gc.si.HostnameAndNamespace[host.Name(g)] {
otherNamespaces = append(otherNamespaces, `"`+ns+`"`) // Wrap in quotes for output
}
if len(otherNamespaces) > 0 {
Expand All @@ -78,7 +103,9 @@ func (gc GatewayContext) ResolveGatewayInstances(
}
svcKey := svc.Key()
for port := range ports {
instances := gc.ps.ServiceInstancesByPort(svc, port, nil)
// Start - Updated by Higress
instances := gc.si.ServiceInstancesByPort(svc, port, nil)
// End - Updated by Higress
if len(instances) > 0 {
foundInternal.Insert(fmt.Sprintf("%s:%d", g, port))
if svc.Attributes.ClusterExternalAddresses.Len() > 0 {
Expand All @@ -93,7 +120,9 @@ func (gc GatewayContext) ResolveGatewayInstances(
}
}
} else {
instancesByPort := gc.ps.ServiceInstances(svcKey)
// Start - Updated by Higress
instancesByPort := gc.si.ServiceInstances(svcKey)
// End - Updated by Higress
if instancesEmpty(instancesByPort) {
warnings = append(warnings, fmt.Sprintf("no instances found for hostname %q", g))
} else {
Expand Down Expand Up @@ -121,7 +150,9 @@ func (gc GatewayContext) ResolveGatewayInstances(
}

func (gc GatewayContext) GetService(hostname, namespace string) *model.Service {
return gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(hostname)][namespace]
// Start - Updated by Higress
return gc.si.HostnameAndNamespace[host.Name(hostname)][namespace]
// End - Updated by Higress
}

func instancesEmpty(m map[int][]*model.ServiceInstance) bool {
Expand Down
Loading