From c0d2fdb845adcc14a29744baf97a4f63decab1d9 Mon Sep 17 00:00:00 2001 From: Amelia Downs Date: Fri, 15 Dec 2023 12:29:45 +0000 Subject: [PATCH] add availability zone to endpoint * get it to show up in /routes endpoint * remove easyjson. it has not been released since 2021 and has had no commits in 19 months. --- mbus/subscriber.go | 27 ++- mbus/subscriber_easyjson.go | 374 ------------------------------------ mbus/subscriber_test.go | 66 +++++++ registry/registry_test.go | 2 +- route/pool.go | 5 + route/pool_test.go | 9 +- 6 files changed, 90 insertions(+), 393 deletions(-) delete mode 100644 mbus/subscriber_easyjson.go diff --git a/mbus/subscriber.go b/mbus/subscriber.go index c4382243..4eec703a 100644 --- a/mbus/subscriber.go +++ b/mbus/subscriber.go @@ -17,30 +17,26 @@ import ( "code.cloudfoundry.org/localip" "code.cloudfoundry.org/routing-api/models" - "github.com/mailru/easyjson" "github.com/nats-io/nats.go" "github.com/uber-go/zap" ) -// RegistryMessage defines the format of a route registration/unregistration -// easyjson:json -// -//go:generate easyjson --all subscriber.go type RegistryMessage struct { + App string `json:"app"` + AvailabilityZone string `json:"availability_zone"` + EndpointUpdatedAtNs int64 `json:"endpoint_updated_at_ns"` Host string `json:"host"` + IsolationSegment string `json:"isolation_segment"` Port uint16 `json:"port"` + PrivateInstanceID string `json:"private_instance_id"` + PrivateInstanceIndex string `json:"private_instance_index"` Protocol string `json:"protocol"` - TLSPort uint16 `json:"tls_port"` - Uris []route.Uri `json:"uris"` - Tags map[string]string `json:"tags"` - App string `json:"app"` - StaleThresholdInSeconds int `json:"stale_threshold_in_seconds"` RouteServiceURL string `json:"route_service_url"` - PrivateInstanceID string `json:"private_instance_id"` ServerCertDomainSAN string `json:"server_cert_domain_san"` - PrivateInstanceIndex string `json:"private_instance_index"` - IsolationSegment string `json:"isolation_segment"` - EndpointUpdatedAtNs int64 `json:"endpoint_updated_at_ns"` + StaleThresholdInSeconds int `json:"stale_threshold_in_seconds"` + TLSPort uint16 `json:"tls_port"` + Tags map[string]string `json:"tags"` + Uris []route.Uri `json:"uris"` } func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, error) { @@ -60,6 +56,7 @@ func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, err return route.NewEndpoint(&route.EndpointOpts{ AppId: rm.App, + AvailabilityZone: rm.AvailabilityZone, Host: rm.Host, Port: port, Protocol: protocol, @@ -296,7 +293,7 @@ func (s *Subscriber) sendStartMessage() error { func createRegistryMessage(data []byte) (*RegistryMessage, error) { var msg RegistryMessage - jsonErr := easyjson.Unmarshal(data, &msg) + jsonErr := json.Unmarshal(data, &msg) if jsonErr != nil { return nil, jsonErr } diff --git a/mbus/subscriber_easyjson.go b/mbus/subscriber_easyjson.go deleted file mode 100644 index 91825efc..00000000 --- a/mbus/subscriber_easyjson.go +++ /dev/null @@ -1,374 +0,0 @@ -// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. - -package mbus - -import ( - route "code.cloudfoundry.org/gorouter/route" - json "encoding/json" - easyjson "github.com/mailru/easyjson" - jlexer "github.com/mailru/easyjson/jlexer" - jwriter "github.com/mailru/easyjson/jwriter" -) - -// suppress unused package warning -var ( - _ *json.RawMessage - _ *jlexer.Lexer - _ *jwriter.Writer - _ easyjson.Marshaler -) - -func easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus(in *jlexer.Lexer, out *startMessageParams) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus(out *jwriter.Writer, in startMessageParams) { - out.RawByte('{') - first := true - _ = first - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v startMessageParams) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v startMessageParams) MarshalEasyJSON(w *jwriter.Writer) { - easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *startMessageParams) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *startMessageParams) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus(l, v) -} -func easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus1(in *jlexer.Lexer, out *Subscriber) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus1(out *jwriter.Writer, in Subscriber) { - out.RawByte('{') - first := true - _ = first - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v Subscriber) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus1(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v Subscriber) MarshalEasyJSON(w *jwriter.Writer) { - easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus1(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *Subscriber) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus1(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *Subscriber) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus1(l, v) -} -func easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus2(in *jlexer.Lexer, out *RegistryMessage) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "host": - out.Host = string(in.String()) - case "port": - out.Port = uint16(in.Uint16()) - case "protocol": - out.Protocol = string(in.String()) - case "tls_port": - out.TLSPort = uint16(in.Uint16()) - case "uris": - if in.IsNull() { - in.Skip() - out.Uris = nil - } else { - in.Delim('[') - if out.Uris == nil { - if !in.IsDelim(']') { - out.Uris = make([]route.Uri, 0, 4) - } else { - out.Uris = []route.Uri{} - } - } else { - out.Uris = (out.Uris)[:0] - } - for !in.IsDelim(']') { - var v1 route.Uri - v1 = route.Uri(in.String()) - out.Uris = append(out.Uris, v1) - in.WantComma() - } - in.Delim(']') - } - case "tags": - if in.IsNull() { - in.Skip() - } else { - in.Delim('{') - if !in.IsDelim('}') { - out.Tags = make(map[string]string) - } else { - out.Tags = nil - } - for !in.IsDelim('}') { - key := string(in.String()) - in.WantColon() - var v2 string - v2 = string(in.String()) - (out.Tags)[key] = v2 - in.WantComma() - } - in.Delim('}') - } - case "app": - out.App = string(in.String()) - case "stale_threshold_in_seconds": - out.StaleThresholdInSeconds = int(in.Int()) - case "route_service_url": - out.RouteServiceURL = string(in.String()) - case "private_instance_id": - out.PrivateInstanceID = string(in.String()) - case "server_cert_domain_san": - out.ServerCertDomainSAN = string(in.String()) - case "private_instance_index": - out.PrivateInstanceIndex = string(in.String()) - case "isolation_segment": - out.IsolationSegment = string(in.String()) - case "endpoint_updated_at_ns": - out.EndpointUpdatedAtNs = int64(in.Int64()) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus2(out *jwriter.Writer, in RegistryMessage) { - out.RawByte('{') - first := true - _ = first - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"host\":") - out.String(string(in.Host)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"port\":") - out.Uint16(uint16(in.Port)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"protocol\":") - out.String(string(in.Protocol)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"tls_port\":") - out.Uint16(uint16(in.TLSPort)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"uris\":") - if in.Uris == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v3, v4 := range in.Uris { - if v3 > 0 { - out.RawByte(',') - } - out.String(string(v4)) - } - out.RawByte(']') - } - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"tags\":") - if in.Tags == nil && (out.Flags&jwriter.NilMapAsEmpty) == 0 { - out.RawString(`null`) - } else { - out.RawByte('{') - v5First := true - for v5Name, v5Value := range in.Tags { - if !v5First { - out.RawByte(',') - } - v5First = false - out.String(string(v5Name)) - out.RawByte(':') - out.String(string(v5Value)) - } - out.RawByte('}') - } - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"app\":") - out.String(string(in.App)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"stale_threshold_in_seconds\":") - out.Int(int(in.StaleThresholdInSeconds)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"route_service_url\":") - out.String(string(in.RouteServiceURL)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"private_instance_id\":") - out.String(string(in.PrivateInstanceID)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"server_cert_domain_san\":") - out.String(string(in.ServerCertDomainSAN)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"private_instance_index\":") - out.String(string(in.PrivateInstanceIndex)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"isolation_segment\":") - out.String(string(in.IsolationSegment)) - if !first { - out.RawByte(',') - } - first = false - out.RawString("\"endpoint_updated_at_ns\":") - out.Int64(int64(in.EndpointUpdatedAtNs)) - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v RegistryMessage) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus2(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v RegistryMessage) MarshalEasyJSON(w *jwriter.Writer) { - easyjson639f989aEncodeCodeCloudfoundryOrgGorouterMbus2(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *RegistryMessage) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus2(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *RegistryMessage) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson639f989aDecodeCodeCloudfoundryOrgGorouterMbus2(l, v) -} diff --git a/mbus/subscriber_test.go b/mbus/subscriber_test.go index 8f92dd07..2768e536 100644 --- a/mbus/subscriber_test.go +++ b/mbus/subscriber_test.go @@ -349,6 +349,72 @@ var _ = Describe("Subscriber", func() { }) }) + Context("when the message contains an availability_zone", func() { + BeforeEach(func() { + sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, l) + process = ifrit.Invoke(sub) + Eventually(process.Ready()).Should(BeClosed()) + }) + + It("endpoint is constructed with an availability_zone", func() { + msg := mbus.RegistryMessage{ + Host: "host", + App: "app", + Uris: []route.Uri{"test.example.com"}, + AvailabilityZone: "zone-meow", + } + + data, err := json.Marshal(msg) + Expect(err).NotTo(HaveOccurred()) + + err = natsClient.Publish("router.register", data) + Expect(err).ToNot(HaveOccurred()) + + Eventually(registry.RegisterCallCount).Should(Equal(1)) + _, originalEndpoint := registry.RegisterArgsForCall(0) + expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "host", + AppId: "app", + Protocol: "http1", + AvailabilityZone: "zone-meow", + }) + + Expect(originalEndpoint).To(Equal(expectedEndpoint)) + }) + }) + + Context("when the message does not contain an availability_zone", func() { + BeforeEach(func() { + sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, l) + process = ifrit.Invoke(sub) + Eventually(process.Ready()).Should(BeClosed()) + }) + + It("endpoint is constructed without an availability_zone", func() { + msg := mbus.RegistryMessage{ + Host: "host", + App: "app", + Uris: []route.Uri{"test.example.com"}, + } + + data, err := json.Marshal(msg) + Expect(err).NotTo(HaveOccurred()) + + err = natsClient.Publish("router.register", data) + Expect(err).ToNot(HaveOccurred()) + + Eventually(registry.RegisterCallCount).Should(Equal(1)) + _, originalEndpoint := registry.RegisterArgsForCall(0) + expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "host", + AppId: "app", + Protocol: "http1", + }) + + Expect(originalEndpoint).To(Equal(expectedEndpoint)) + }) + }) + Context("when the message does not contain a protocol", func() { BeforeEach(func() { sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, l) diff --git a/registry/registry_test.go b/registry/registry_test.go index 7d78fee4..37df8f44 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -1353,7 +1353,7 @@ var _ = Describe("RouteRegistry", func() { marshalled, err := json.Marshal(r) Expect(err).NotTo(HaveOccurred()) - Expect(string(marshalled)).To(Equal(`{"foo":[{"address":"192.168.1.1:1234","protocol":"http2","tls":false,"ttl":-1,"route_service_url":"https://my-routeService.com","tags":null}]}`)) + Expect(string(marshalled)).To(Equal(`{"foo":[{"address":"192.168.1.1:1234","availability_zone":"","protocol":"http2","tls":false,"ttl":-1,"route_service_url":"https://my-routeService.com","tags":null}]}`)) r.Unregister("foo", m) marshalled, err = json.Marshal(r) Expect(err).NotTo(HaveOccurred()) diff --git a/route/pool.go b/route/pool.go index f54bb5ea..de80c3a0 100644 --- a/route/pool.go +++ b/route/pool.go @@ -61,6 +61,7 @@ type ProxyRoundTripper interface { type Endpoint struct { ApplicationId string + AvailabilityZone string addr string Protocol string Tags map[string]string @@ -159,6 +160,7 @@ type EndpointPool struct { type EndpointOpts struct { AppId string + AvailabilityZone string Host string Port uint16 Protocol string @@ -177,6 +179,7 @@ type EndpointOpts struct { func NewEndpoint(opts *EndpointOpts) *Endpoint { return &Endpoint{ ApplicationId: opts.AppId, + AvailabilityZone: opts.AvailabilityZone, addr: fmt.Sprintf("%s:%d", opts.Host, opts.Port), Protocol: opts.Protocol, Tags: opts.Tags, @@ -484,6 +487,7 @@ func (e *endpointElem) isOverloaded() bool { func (e *Endpoint) MarshalJSON() ([]byte, error) { var jsonObj struct { Address string `json:"address"` + AvailabilityZone string `json:"availability_zone"` Protocol string `json:"protocol"` TLS bool `json:"tls"` TTL int `json:"ttl"` @@ -495,6 +499,7 @@ func (e *Endpoint) MarshalJSON() ([]byte, error) { } jsonObj.Address = e.addr + jsonObj.AvailabilityZone = e.AvailabilityZone jsonObj.Protocol = e.Protocol jsonObj.TLS = e.IsTLS() jsonObj.RouteServiceUrl = e.RouteServiceUrl diff --git a/route/pool_test.go b/route/pool_test.go index 9eab7a06..5a2bba2d 100644 --- a/route/pool_test.go +++ b/route/pool_test.go @@ -697,6 +697,7 @@ var _ = Describe("EndpointPool", func() { It("marshals json", func() { e := route.NewEndpoint(&route.EndpointOpts{ + AvailabilityZone: "az-meow", Host: "1.2.3.4", Port: 5678, Protocol: "http1", @@ -720,7 +721,7 @@ var _ = Describe("EndpointPool", func() { json, err := pool.MarshalJSON() Expect(err).ToNot(HaveOccurred()) - Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","protocol":"http1","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":null},{"address":"5.6.7.8:5678","protocol":"http2","tls":true,"ttl":-1,"tags":null,"private_instance_id":"pvt_test_instance_id","server_cert_domain_san":"pvt_test_san"}]`)) + Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","availability_zone":"az-meow","protocol":"http1","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":null},{"address":"5.6.7.8:5678","availability_zone":"","protocol":"http2","tls":true,"ttl":-1,"tags":null,"private_instance_id":"pvt_test_instance_id","server_cert_domain_san":"pvt_test_san"}]`)) }) Context("when endpoints do not have empty tags", func() { @@ -729,6 +730,7 @@ var _ = Describe("EndpointPool", func() { sample_tags := map[string]string{ "some-key": "some-value"} e = route.NewEndpoint(&route.EndpointOpts{ + AvailabilityZone: "az-meow", Host: "1.2.3.4", Port: 5678, Protocol: "http2", @@ -742,7 +744,7 @@ var _ = Describe("EndpointPool", func() { pool.Put(e) json, err := pool.MarshalJSON() Expect(err).ToNot(HaveOccurred()) - Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","protocol":"http2","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":{"some-key":"some-value"}}]`)) + Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","availability_zone":"az-meow","protocol":"http2","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":{"some-key":"some-value"}}]`)) }) }) @@ -751,6 +753,7 @@ var _ = Describe("EndpointPool", func() { BeforeEach(func() { sample_tags := map[string]string{} e = route.NewEndpoint(&route.EndpointOpts{ + AvailabilityZone: "az-meow", Host: "1.2.3.4", Port: 5678, Protocol: "http2", @@ -765,7 +768,7 @@ var _ = Describe("EndpointPool", func() { pool.Put(e) json, err := pool.MarshalJSON() Expect(err).ToNot(HaveOccurred()) - Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","protocol":"http2","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":{}}]`)) + Expect(string(json)).To(Equal(`[{"address":"1.2.3.4:5678","availability_zone":"az-meow","protocol":"http2","tls":false,"ttl":-1,"route_service_url":"https://my-rs.com","tags":{}}]`)) }) }) })