Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve watch for polaris registrar #2788

Merged
merged 7 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 21 additions & 10 deletions contrib/registry/polaris/polaris_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]gsvc.Serv
if err != nil {
return nil, err
}

serviceInstances := instancesToServiceInstances(instancesResponse.GetInstances())
// Service filter.
filteredServices := make([]gsvc.Service, 0)
Expand Down Expand Up @@ -78,26 +79,29 @@ func instancesToServiceInstances(instances []model.Instance) []gsvc.Service {
serviceInstances = make([]gsvc.Service, 0, len(instances))
endpointStr bytes.Buffer
)

for _, instance := range instances {
if instance.IsHealthy() {
endpointStr.WriteString(fmt.Sprintf("%s:%d%s", instance.GetHost(), instance.GetPort(), gsvc.EndpointsDelimiter))
}
}

for _, instance := range instances {
if instance.IsHealthy() {
serviceInstances = append(serviceInstances, instanceToServiceInstance(instance, gstr.TrimRight(endpointStr.String(), gsvc.EndpointsDelimiter)))
if endpointStr.Len() > 0 {
for _, instance := range instances {
if instance.IsHealthy() {
serviceInstances = append(serviceInstances, instanceToServiceInstance(instance, gstr.TrimRight(endpointStr.String(), gsvc.EndpointsDelimiter), ""))
}
}
}
return serviceInstances
}

func instanceToServiceInstance(instance model.Instance, endpointStr string) gsvc.Service {
// instanceToServiceInstance converts the instance to service instance.
// instanceID Must be null when creating and adding, and non-null when updating and deleting
func instanceToServiceInstance(instance model.Instance, endpointStr, instanceID string) gsvc.Service {
var (
s *gsvc.LocalService
metadata = instance.GetMetadata()
names = strings.Split(instance.GetService(), instanceIDSeparator)
// endpoints = gsvc.NewEndpoints(fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()))
s *gsvc.LocalService
metadata = instance.GetMetadata()
names = strings.Split(instance.GetService(), instanceIDSeparator)
endpoints = gsvc.NewEndpoints(endpointStr)
)
if names != nil && len(names) > 4 {
Expand Down Expand Up @@ -126,9 +130,16 @@ func instanceToServiceInstance(instance model.Instance, endpointStr string) gsvc
Endpoints: endpoints,
}
}
return &Service{
service := &Service{
Service: s,
}
if instance.GetId() != "" {
service.ID = instance.GetId()
}
if gstr.Trim(instanceID) != "" {
service.ID = instanceID
}
return service
}

// trimAndReplace trims the prefix and suffix separator and replaces the separator in the middle.
Expand Down
82 changes: 72 additions & 10 deletions contrib/registry/polaris/polaris_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ func (w *Watcher) Proceed() ([]gsvc.Service, error) {
}
// handle DeleteEvent
if instanceEvent.DeleteEvent != nil {
var endpointStr bytes.Buffer
for _, instance := range instanceEvent.DeleteEvent.Instances {
// Iterate through existing service instances, deleting them if they exist
for i, serviceInstance := range w.ServiceInstances {
if serviceInstance.(*Service).ID == instance.GetId() {
// remove equal
endpointStr.WriteString(fmt.Sprintf("%s:%d%s", instance.GetHost(), instance.GetPort(), gsvc.EndpointsDelimiter))
if len(w.ServiceInstances) <= 1 {
w.ServiceInstances = w.ServiceInstances[0:0]
continue
Expand All @@ -80,32 +82,92 @@ func (w *Watcher) Proceed() ([]gsvc.Service, error) {
}
}
}
if endpointStr.Len() > 0 && len(w.ServiceInstances) > 0 {
var (
newEndpointStr bytes.Buffer
serviceEndpointStr = w.ServiceInstances[0].(*Service).GetEndpoints().String()
)
for _, address := range gstr.SplitAndTrim(serviceEndpointStr, gsvc.EndpointsDelimiter) {
if !gstr.Contains(endpointStr.String(), address) {
newEndpointStr.WriteString(fmt.Sprintf("%s%s", address, gsvc.EndpointsDelimiter))
}
}

for i := 0; i < len(w.ServiceInstances); i++ {
w.ServiceInstances[i] = instanceToServiceInstance(instanceEvent.DeleteEvent.Instances[0], gstr.TrimRight(newEndpointStr.String(), gsvc.EndpointsDelimiter), w.ServiceInstances[i].(*Service).ID)
}
}
}
// handle UpdateEvent
if instanceEvent.UpdateEvent != nil {
for i, serviceInstance := range w.ServiceInstances {
var endpointStr bytes.Buffer
var (
updateEndpointStr bytes.Buffer
newEndpointStr bytes.Buffer
)
for _, serviceInstance := range w.ServiceInstances {
// update the current department or all instances
for _, update := range instanceEvent.UpdateEvent.UpdateList {
if serviceInstance.(*Service).ID == update.Before.GetId() {
endpointStr.WriteString(fmt.Sprintf("%s:%d%s", update.After.GetHost(), update.After.GetPort(), gsvc.EndpointsDelimiter))
// update equal
if update.After.IsHealthy() {
newEndpointStr.WriteString(fmt.Sprintf("%s:%d%s", update.After.GetHost(), update.After.GetPort(), gsvc.EndpointsDelimiter))
}
updateEndpointStr.WriteString(fmt.Sprintf("%s:%d%s", update.Before.GetHost(), update.Before.GetPort(), gsvc.EndpointsDelimiter))
}
}
for _, update := range instanceEvent.UpdateEvent.UpdateList {
if serviceInstance.(*Service).ID == update.Before.GetId() {
w.ServiceInstances[i] = instanceToServiceInstance(update.After, gstr.TrimRight(endpointStr.String(), gsvc.EndpointsDelimiter))
}
if len(w.ServiceInstances) > 0 {
var serviceEndpointStr = w.ServiceInstances[0].(*Service).GetEndpoints().String()
// old instance addresses are culled
if updateEndpointStr.Len() > 0 {
for _, address := range gstr.SplitAndTrim(serviceEndpointStr, gsvc.EndpointsDelimiter) {
// If the historical instance is not in the change instance, it remains
if !gstr.Contains(updateEndpointStr.String(), address) {
newEndpointStr.WriteString(fmt.Sprintf("%s%s", address, gsvc.EndpointsDelimiter))
}
}
}
instance := instanceEvent.UpdateEvent.UpdateList[0].After
for i := 0; i < len(w.ServiceInstances); i++ {
w.ServiceInstances[i] = instanceToServiceInstance(instance, gstr.TrimRight(newEndpointStr.String(), gsvc.EndpointsDelimiter), w.ServiceInstances[i].(*Service).ID)
}
}
}
// handle AddEvent
if instanceEvent.AddEvent != nil {
w.ServiceInstances = append(
w.ServiceInstances,
instancesToServiceInstances(instanceEvent.AddEvent.Instances)...,
var (
newEndpointStr bytes.Buffer
allEndpointStr string
)
if len(w.ServiceInstances) > 0 {
allEndpointStr = w.ServiceInstances[0].(*Service).GetEndpoints().String()
}
for i := 0; i < len(instanceEvent.AddEvent.Instances); i++ {
instance := instanceEvent.AddEvent.Instances[i]
if instance.IsHealthy() {
address := fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort())
if !gstr.Contains(allEndpointStr, address) {
newEndpointStr.WriteString(fmt.Sprintf("%s%s", address, gsvc.EndpointsDelimiter))
}
}
}
if newEndpointStr.Len() > 0 {
allEndpointStr = fmt.Sprintf("%s%s", newEndpointStr.String(), allEndpointStr)
}
for i := 0; i < len(w.ServiceInstances); i++ {
w.ServiceInstances[i] = instanceToServiceInstance(instanceEvent.AddEvent.Instances[0], gstr.TrimRight(allEndpointStr, gsvc.EndpointsDelimiter), w.ServiceInstances[i].(*Service).ID)
}

for i := 0; i < len(instanceEvent.AddEvent.Instances); i++ {
instance := instanceEvent.AddEvent.Instances[i]
if instance.IsHealthy() {
w.ServiceInstances = append(w.ServiceInstances, instanceToServiceInstance(instance, gstr.TrimRight(allEndpointStr, gsvc.EndpointsDelimiter), ""))
}
}
}
}
}

return w.ServiceInstances, nil
}

Expand Down