Skip to content

Commit

Permalink
Merge pull request #6 from jojimt/I256-vtep
Browse files Browse the repository at this point in the history
Fix #256
  • Loading branch information
shaleman committed Feb 2, 2016
2 parents 4bb428f + d36b6be commit c03ba09
Showing 1 changed file with 45 additions and 10 deletions.
55 changes: 45 additions & 10 deletions etcdService.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,31 @@ func (self *etcdPlugin) RegisterService(serviceInfo ServiceInfo) error {
return nil
}

// List all end points for a service
// GetService lists all end points for a service
func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
keyName := "/contiv.io/service/" + name + "/"

_, srvcList, err := self.getServiceState(keyName)
return srvcList, err
}

func (self *etcdPlugin) getServiceState(key string) (uint64, []ServiceInfo, error) {

// Get the object from etcd client
resp, err := self.client.Get(keyName, true, true)
resp, err := self.client.Get(key, true, true)
if err != nil {
if strings.Contains(err.Error(), "Key not found") {
return nil, nil
return 0, nil, nil
} else {
log.Errorf("Error getting key %s. Err: %v", keyName, err)
return nil, err
log.Errorf("Error getting key %s. Err: %v", key, err)
return 0, nil, err
}

}

if !resp.Node.Dir {
log.Errorf("Err. Response is not a directory: %+v", resp.Node)
return nil, errors.New("Invalid Response from etcd")
return 0, nil, errors.New("Invalid Response from etcd")
}

srvcList := make([]ServiceInfo, 0)
Expand All @@ -91,13 +97,35 @@ func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
err = json.Unmarshal([]byte(node.Value), &respSrvc)
if err != nil {
log.Errorf("Error parsing object %s, Err %v", node.Value, err)
return nil, err
return 0, nil, err
}

srvcList = append(srvcList, respSrvc)
}

return srvcList, nil
watchIndex := resp.EtcdIndex + 1
return watchIndex, srvcList, nil
}

// initServiceState reads the current state and injects it to the channel
// additionally, it returns the next index to watch
func (self *etcdPlugin) initServiceState(key string, eventCh chan WatchServiceEvent) (uint64, error) {
mIndex, srvcList, err := self.getServiceState(key)
if err != nil {
return mIndex, err
}

// walk each service and inject it as an add event
for _, srvInfo := range srvcList {
log.Infof("Sending service add event: %+v", srvInfo)
// Send Add event
eventCh <- WatchServiceEvent{
EventType: WatchServiceEventAdd,
ServiceInfo: srvInfo,
}
}

return mIndex, nil
}

// Watch for a service
Expand All @@ -111,9 +139,16 @@ func (self *etcdPlugin) WatchService(name string,

// Start the watch thread
go func() {
log.Infof("Watching for service: %s", keyName)
// Get current state and etcd index to watch
watchIndex, err := self.initServiceState(keyName, eventCh)
if err != nil {
log.Fatalf("Unable to watch service key: %s - %v", keyName,
err)
}

log.Infof("Watching for service: %s at index %v", keyName, watchIndex)
// Start the watch
_, err := self.client.Watch(keyName, 0, true, watchCh, watchStopCh)
_, err = self.client.Watch(keyName, watchIndex, true, watchCh, watchStopCh)
if (err != nil) && (err != etcd.ErrWatchStoppedByUser) {
log.Errorf("Error watching service %s. Err: %v", keyName, err)

Expand Down

0 comments on commit c03ba09

Please sign in to comment.