Skip to content

Commit

Permalink
Merge pull request apache#5 from zenlint/developer
Browse files Browse the repository at this point in the history
Refactoring syncer arch, mv storage to syncer server, remove extra ev…
  • Loading branch information
zenlint committed May 14, 2019
2 parents e0e36ca + af6b674 commit 5639438
Show file tree
Hide file tree
Showing 30 changed files with 158 additions and 1,077 deletions.
11 changes: 4 additions & 7 deletions syncer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package config

import (
"fmt"
"github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
"os"

"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins/repository/servicecenter"
"github.com/apache/servicecomb-service-center/syncer/plugins/storage/memory"
"github.com/apache/servicecomb-service-center/syncer/serf"
)

var (
Expand Down Expand Up @@ -53,8 +52,7 @@ type Config struct {
Profile string `yaml:"profile"`
EnableCompression bool `yaml:"enable_compression"`
AutoSync bool `yaml:"auto_sync"`
StoragePlugin string `json:"storage_plugin"`
RepositoryPlugin string `json:"repository_plugin"`
DatacenterPlugin string `json:"datacenter_plugin"`
}

// DefaultConfig returns the default config
Expand All @@ -72,8 +70,7 @@ func DefaultConfig() *Config {
DCAddr: fmt.Sprintf("127.0.0.1:%d", DefaultDCPort),
TickerInterval: DefaultTickerInterval,
Config: serfConf,
StoragePlugin: memory.PluginName,
RepositoryPlugin: servicecenter.PluginName,
DatacenterPlugin: servicecenter.PluginName,
}
}

Expand Down
72 changes: 16 additions & 56 deletions syncer/datacenter/datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,87 +17,47 @@
package datacenter

import (
"github.com/apache/servicecomb-service-center/pkg/log"
"context"

"github.com/apache/servicecomb-service-center/syncer/notify"
"github.com/apache/servicecomb-service-center/syncer/pkg/events"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/plugins"
"github.com/apache/servicecomb-service-center/syncer/plugins/repository"
"github.com/apache/servicecomb-service-center/syncer/plugins/storage"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)

// Store interface of datacenter
type DataCenter interface {
//
OnEvent(event events.ContextEvent)
LocalInfo() *pb.SyncData
Stop()
GetSyncData(allMapping pb.SyncMapping) (*pb.SyncData, error)
SetSyncData(*pb.SyncData, pb.SyncMapping) (pb.SyncMapping, error)
}

type store struct {
repo repository.Repository
cache storage.Repository
datacenter plugins.Datacenter
}

// NewStore new store with endpoints
func NewDataCenter(endpoints []string) (DataCenter, error) {
repo, err := plugins.Plugins().Repository().New(endpoints)
datacenter, err := plugins.Plugins().Datacenter().New(endpoints)
if err != nil {
return nil, err
}

return &store{
repo: repo,
cache: plugins.Plugins().Storage(),
datacenter: datacenter,
}, nil
}

// Stop store
func (s *store) Stop() {
if s.cache == nil {
return
}
s.cache.Stop()
}

// LocalInfo Get local datacenter information from cache
func (s *store) LocalInfo() *pb.SyncData {
return s.cache.GetSyncData()
}

// OnEvent Handles events with internal type "ticker_trigger" or "pull_by_serf"
func (s *store) OnEvent(event events.ContextEvent) {
switch event.Type() {
case notify.EventTicker:
s.getLocalDataInfo(event)
case notify.EventPullBySerf:
s.syncSerfDataInfo(event)
default:
}
}

// getLocalDataInfo get local datacenter information form repo
func (s *store) getLocalDataInfo(event events.ContextEvent) {
ctx := event.Context()
data, err := s.repo.GetAll(ctx)
// GetSyncData Get data from datacenter instance, excluded the data from other syncer
func (s *store) GetSyncData(allMapping pb.SyncMapping) (*pb.SyncData, error) {
data, err := s.datacenter.GetAll(context.Background())
if err != nil {
log.Errorf(err, "Syncer discover instances failed")
return
return nil, err
}
s.exclude(data)
s.cache.SaveSyncData(data)

events.Dispatch(events.NewContextEvent(notify.EventDiscovery, nil))
s.exclude(data, allMapping)
return data, nil
}

// syncSerfDataInfo sync the datacenter information of other to local repo
func (s *store) syncSerfDataInfo(event events.ContextEvent) {
ctx := event.Context()
nodeData, ok := ctx.Value(event.Type()).(*pb.NodeDataInfo)
if !ok {
log.Error("save serf info failed", nil)
return
}
s.sync(nodeData)
// GetSyncData Get current datacenter information
func (s *store) SetSyncData(data *pb.SyncData, mapping pb.SyncMapping) (pb.SyncMapping, error) {
return s.sync(data, mapping)
}
93 changes: 0 additions & 93 deletions syncer/datacenter/datacenter_test.go

This file was deleted.

5 changes: 2 additions & 3 deletions syncer/datacenter/exclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
)

// exclude find out services data belonging to self
func (s *store) exclude(data *pb.SyncData) {
mapping := s.cache.GetAllMapping()
func (s *store) exclude(data *pb.SyncData, allMapping pb.SyncMapping) {
services := make([]*pb.SyncService, 0, 10)
for _, svc := range data.Services {
ins := s.excludeInstances(svc.Instances, mapping)
ins := s.excludeInstances(svc.Instances, allMapping)
if len(ins) > 0 {
svc.Instances = ins
services = append(services, svc)
Expand Down
23 changes: 11 additions & 12 deletions syncer/datacenter/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ package datacenter

import (
"context"
"github.com/apache/servicecomb-service-center/pkg/log"

"github.com/apache/servicecomb-service-center/pkg/log"
scpb "github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)

// sync synchronize information from other datacenters
func (s *store) sync(nodeData *pb.NodeDataInfo) {
allInstances, mapping := s.syncServiceInstances(nodeData.DataInfo.Services, s.cache.GetSyncMapping(nodeData.NodeName))
mapping = s.deleteInstances(allInstances, mapping)
s.cache.SaveSyncMapping(nodeData.NodeName, mapping)
func (s *store) sync(data *pb.SyncData, mapping pb.SyncMapping) (pb.SyncMapping, error) {
allInstances, mapping := s.syncServiceInstances(data.Services, mapping)
return s.deleteInstances(allInstances, mapping), nil
}

// syncServiceInstances register instances from other datacenter
Expand All @@ -43,8 +42,8 @@ func (s *store) syncServiceInstances(services []*pb.SyncService, mapping pb.Sync

// Send an instance heartbeat if the instance has already been registered
syncKey, ok := mapping[inst.InstanceId]
if ok && syncKey.InstanceID != ""{
err = s.repo.Heartbeat(ctx, svc.DomainProject, syncKey.ServiceID, syncKey.InstanceID)
if ok && syncKey.InstanceID != "" {
err = s.datacenter.Heartbeat(ctx, svc.DomainProject, syncKey.ServiceID, syncKey.InstanceID)
if err != nil {
log.Errorf(err, "Syncer heartbeat instance failed")
}
Expand All @@ -53,16 +52,16 @@ func (s *store) syncServiceInstances(services []*pb.SyncService, mapping pb.Sync

// Create microservice if the service to which the instance belongs does not exist
if serviceID == "" {
if serviceID, _ = s.repo.ServiceExistence(ctx, svc.DomainProject, svc.Service); serviceID == "" {
serviceID, err = s.repo.CreateService(ctx, svc.DomainProject, svc.Service)
if serviceID, _ = s.datacenter.ServiceExistence(ctx, svc.DomainProject, svc.Service); serviceID == "" {
serviceID, err = s.datacenter.CreateService(ctx, svc.DomainProject, svc.Service)
if err != nil {
log.Errorf(err, "Syncer create service failed")
}
}
}

// Register instance information when the instance does not exist
instanceID, err := s.repo.RegisterInstance(ctx, svc.DomainProject, serviceID, inst)
instanceID, err := s.datacenter.RegisterInstance(ctx, svc.DomainProject, serviceID, inst)
if err != nil {
log.Errorf(err, "Syncer create service failed")
continue
Expand Down Expand Up @@ -95,9 +94,9 @@ func (s *store) deleteInstances(ins []*scpb.MicroServiceInstance, mapping pb.Syn
continue
}

err := s.repo.UnregisterInstance(ctx, val.DomainProject, val.ServiceID, val.InstanceID)
err := s.datacenter.UnregisterInstance(ctx, val.DomainProject, val.ServiceID, val.InstanceID)
if err != nil {
log.Errorf(err,"Syncer delete service failed")
log.Errorf(err, "Syncer delete service failed")
}
}
return nm
Expand Down
78 changes: 0 additions & 78 deletions syncer/grpc/broker.go

This file was deleted.

Loading

0 comments on commit 5639438

Please sign in to comment.