Skip to content

Commit

Permalink
[backport/1.8.x] connect: use stronger validation that ingress gatewa…
Browse files Browse the repository at this point in the history
…ys have compatible protocols defined for their upstreams (#8494)

Backport of #8470 to 1.8.x
  • Loading branch information
rboyer committed Aug 13, 2020
1 parent 010a8eb commit 7983023
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 97 deletions.
3 changes: 3 additions & 0 deletions .changelog/8494.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
[backport/1.8.x] connect: use stronger validation that ingress gateways have compatible protocols defined for their upstreams
```
169 changes: 112 additions & 57 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,6 @@ func (s *Store) validateProposedConfigEntryInGraph(
if err != nil {
return err
}
err = s.validateProposedIngressProtocolsInServiceGraph(tx, next, entMeta)
if err != nil {
return err
}
case structs.TerminatingGateway:
err := s.checkGatewayClash(tx, name, structs.TerminatingGateway, structs.IngressGateway, entMeta)
if err != nil {
Expand Down Expand Up @@ -402,7 +398,11 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
) error {
// Collect all of the chains that could be affected by this change
// including our own.
checkChains := make(map[structs.ServiceID]struct{})
var (
checkChains = make(map[structs.ServiceID]struct{})
checkIngress []*structs.IngressGatewayConfigEntry
enforceIngressProtocolsMatch bool
)

if validateAllChains {
// Must be proxy-defaults/global.
Expand All @@ -419,45 +419,138 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{}
}
}

_, entries, err := s.configEntriesByKindTxn(tx, nil, structs.IngressGateway, structs.WildcardEnterpriseMeta())
if err != nil {
return err
}
for _, entry := range entries {
ingress, ok := entry.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", entry)
}
checkIngress = append(checkIngress, ingress)
}

} else if kind == structs.IngressGateway {
// Checking an ingress pointing to multiple chains.

// This is the case for deleting a config entry
if next == nil {
return nil
}

ingress, ok := next.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", next)
}
checkIngress = append(checkIngress, ingress)

// When editing an ingress-gateway directly we are stricter about
// validating the protocol equivalence.
enforceIngressProtocolsMatch = true

} else {
// Must be a single chain.

sid := structs.NewServiceID(name, entMeta)
checkChains[sid] = struct{}{}

iter, err := tx.Get(configTableName, "link", sid)
if err != nil {
return err
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{}
switch entry.GetKind() {
case structs.ServiceRouter, structs.ServiceSplitter, structs.ServiceResolver:
svcID := structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())
checkChains[svcID] = struct{}{}
case structs.IngressGateway:
ingress, ok := entry.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", entry)
}
checkIngress = append(checkIngress, ingress)
}
}
if err != nil {
return err
}

// Ensure if any ingress is affected that we fetch all of the chains needed
// to fully validate that ingress.
for _, ingress := range checkIngress {
for _, svcID := range ingress.ListRelatedServices() {
checkChains[svcID] = struct{}{}
}
}

overrides := map[structs.ConfigEntryKindName]structs.ConfigEntry{
{Kind: kind, Name: name}: next,
}

for chain, _ := range checkChains {
if err := s.testCompileDiscoveryChain(tx, nil, chain.ID, overrides, &chain.EnterpriseMeta); err != nil {
var (
svcProtocols = make(map[structs.ServiceID]string)
svcTopNodeType = make(map[structs.ServiceID]string)
)
for chain := range checkChains {
protocol, topNode, err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta)
if err != nil {
return err
}
svcProtocols[chain] = protocol
svcTopNodeType[chain] = topNode.Type
}

// Now validate all of our ingress gateways.
for _, e := range checkIngress {
for _, listener := range e.Listeners {
expectedProto := listener.Protocol
for _, service := range listener.Services {
if service.Name == structs.WildcardSpecifier {
continue
}
svcID := structs.NewServiceID(service.Name, &service.EnterpriseMeta)

svcProto := svcProtocols[svcID]

if svcProto != expectedProto {
// The only time an ingress gateway and its upstreams can
// have differing protocols is when:
//
// 1. ingress is tcp and the target is not-tcp
// AND
// 2. the disco chain has a resolver as the top node
topNodeType := svcTopNodeType[svcID]
if enforceIngressProtocolsMatch ||
(expectedProto != "tcp") ||
(expectedProto == "tcp" && topNodeType != structs.DiscoveryGraphNodeTypeResolver) {
return fmt.Errorf(
"service %q has protocol %q, which does not match defined listener protocol %q",
svcID.String(),
svcProto,
expectedProto,
)
}
}
}
}
}

return nil
}

// testCompileDiscoveryChain speculatively compiles a discovery chain with
// pending modifications to see if it would be valid. Also returns the computed
// protocol and topmost discovery chain node.
func (s *Store) testCompileDiscoveryChain(
tx *memdb.Txn,
ws memdb.WatchSet,
chainName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
) error {
) (string, *structs.DiscoveryGraphNode, error) {
_, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta)
if err != nil {
return err
return "", nil, err
}

// Note we use an arbitrary namespace and datacenter as those would not
Expand All @@ -472,8 +565,12 @@ func (s *Store) testCompileDiscoveryChain(
UseInDatacenter: "dc1",
Entries: speculativeEntries,
}
_, err = discoverychain.Compile(req)
return err
chain, err := discoverychain.Compile(req)
if err != nil {
return "", nil, err
}

return chain.Protocol, chain.Nodes[chain.StartNode], nil
}

// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
Expand Down Expand Up @@ -841,48 +938,6 @@ func (s *Store) configEntryWithOverridesTxn(
return s.configEntryTxn(tx, ws, kind, name, entMeta)
}

func (s *Store) validateProposedIngressProtocolsInServiceGraph(
tx *memdb.Txn,
next structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
) error {
// This is the case for deleting a config entry
if next == nil {
return nil
}
ingress, ok := next.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", next)
}

validationFn := func(svc structs.ServiceName, expectedProto string) error {
_, svcProto, err := s.protocolForService(tx, nil, svc)
if err != nil {
return err
}

if svcProto != expectedProto {
return fmt.Errorf("service %q has protocol %q, which does not match defined listener protocol %q",
svc.String(), svcProto, expectedProto)
}

return nil
}

for _, l := range ingress.Listeners {
for _, s := range l.Services {
if s.Name == structs.WildcardSpecifier {
continue
}
err := validationFn(s.ToServiceName(), l.Protocol)
if err != nil {
return err
}
}
}
return nil
}

// protocolForService returns the service graph protocol associated to the
// provided service, checking all relevant config entries.
func (s *Store) protocolForService(
Expand Down
Loading

0 comments on commit 7983023

Please sign in to comment.