Skip to content

Commit

Permalink
Merge pull request #560 from rgooch/master
Browse files Browse the repository at this point in the history
Speed up pre-checking hypervisors in hyper-control rollout-image.
  • Loading branch information
rgooch committed Feb 12, 2019
2 parents 22205fb + e28c7a1 commit b65ccc6
Showing 1 changed file with 85 additions and 55 deletions.
140 changes: 85 additions & 55 deletions cmd/hyper-control/rolloutImage.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,48 +106,34 @@ func rolloutImage(imageName string, logger log.DebugLogger) error {
if err != nil {
return fmt.Errorf("failure getting tags: %s", err)
}
hypervisorsChannel := make(chan *hypervisorType, len(hypervisorAddresses))
for _, address := range hypervisorAddresses {
if hostname, _, err := net.SplitHostPort(address); err != nil {
return err
} else {
logger := prefixlogger.New(hostname+": ", logger)
tgs := tagsForHypervisors[hostname]
currentRequiredImage := tgs["RequiredImage"]
if currentRequiredImage != "" &&
path.Dir(currentRequiredImage) != path.Dir(imageName) {
logger.Printf(
"image stream: current=%s != new=%s, skipping\n",
path.Dir(currentRequiredImage), path.Dir(imageName))
continue
}
h := &hypervisorType{
healthAgentClientResource: rpcclientpool.New("tcp",
fmt.Sprintf("%s:%d", hostname, 6910), true, ""),
hostname: hostname,
hypervisorClientResource: srpc.NewClientResource("tcp",
fmt.Sprintf("%s:%d", hostname,
constants.HypervisorPortNumber)),
initialTags: tgs,
initialUnhealthyList: make(map[string]struct{}),
logger: logger,
subClientResource: srpc.NewClientResource("tcp",
fmt.Sprintf("%s:%d", hostname, constants.SubPortNumber)),
}
if lastImage, err := h.getLastImageName(cpuSharer); err != nil {
logger.Printf("skipping %s: %s\n", hostname, err)
} else if lastImage == imageName {
logger.Printf("%s already updated, skipping\n", hostname)
} else {
err := h.updateTagForHypervisor(
fleetManagerClientResource, "PlannedImage", imageName)
if err != nil {
return fmt.Errorf("%s: failure updating tags: %s",
hostname, err)
}
hypervisors = append(hypervisors, h)
go func(hostname string) {
cpuSharer.GrabCpu()
defer cpuSharer.ReleaseCpu()
hypervisor := setupHypervisor(hostname, imageName,
tagsForHypervisors[hostname], cpuSharer, logger)
hypervisorsChannel <- hypervisor
}(hostname)
}
}
for range hypervisorAddresses {
if hypervisor := <-hypervisorsChannel; hypervisor != nil {
err := hypervisor.updateTagForHypervisor(
fleetManagerClientResource, "PlannedImage", imageName)
if err != nil {
return fmt.Errorf("%s: failure updating tags: %s",
hypervisor.hostname, err)
}
hypervisors = append(hypervisors, hypervisor)
}
}
if len(hypervisors) < 1 {
return errors.New("no hypervisors to update")
}
logger.Debugln(0, "splitting unused/used Hypervisors")
unusedHypervisors, usedHypervisors := markUnusedHypervisors(hypervisors,
cpuSharer)
Expand Down Expand Up @@ -303,7 +289,7 @@ func markUnusedHypervisors(hypervisors []*hypervisorType,
client, err := h.hypervisorClientResource.GetHTTPWithDialer(nil,
dialer)
if err != nil {
h.logger.Println(err)
h.logger.Printf("error connecting to hypervisor: %s\n", err)
return
}
defer client.Put()
Expand Down Expand Up @@ -332,6 +318,42 @@ func markUnusedHypervisors(hypervisors []*hypervisorType,
return unusedHypervisors, usedHypervisors
}

func setupHypervisor(hostname string, imageName string, tgs tags.Tags,
cpuSharer *cpusharer.FifoCpuSharer,
logger log.DebugLogger) *hypervisorType {
logger = prefixlogger.New(hostname+": ", logger)
currentRequiredImage := tgs["RequiredImage"]
if currentRequiredImage != "" &&
path.Dir(currentRequiredImage) != path.Dir(imageName) {
logger.Printf(
"image stream: current=%s != new=%s, skipping\n",
path.Dir(currentRequiredImage), path.Dir(imageName))
return nil
}
h := &hypervisorType{
healthAgentClientResource: rpcclientpool.New("tcp",
fmt.Sprintf("%s:%d", hostname, 6910), true, ""),
hostname: hostname,
hypervisorClientResource: srpc.NewClientResource("tcp",
fmt.Sprintf("%s:%d", hostname,
constants.HypervisorPortNumber)),
initialTags: tgs,
initialUnhealthyList: make(map[string]struct{}),
logger: logger,
subClientResource: srpc.NewClientResource("tcp",
fmt.Sprintf("%s:%d", hostname, constants.SubPortNumber)),
}
if lastImage, err := h.getLastImageName(cpuSharer); err != nil {
logger.Printf("skipping: %s\n", err)
return nil
} else if lastImage == imageName {
logger.Println("already updated, skipping")
return nil
} else {
return h
}
}

func upgradeOneThenAll(fleetManagerClientResource *srpc.ClientResource,
imageName string, hypervisors map[*hypervisorType]struct{},
cpuSharer *cpusharer.FifoCpuSharer, maxConcurrent uint) error {
Expand Down Expand Up @@ -396,26 +418,20 @@ func (h *hypervisorType) getFailingHealthChecksOnce() (

func (h *hypervisorType) getLastImageName(cpuSharer *cpusharer.FifoCpuSharer) (
string, error) {
stopTime := time.Now().Add(time.Minute * 5)
for ; time.Until(stopTime) > 0; cpuSharer.Sleep(time.Second) {
client, err := h.subClientResource.GetHTTP(nil, 0)
if err != nil {
h.logger.Debugln(0, err)
continue
}
request := sub_proto.PollRequest{ShortPollOnly: true}
var reply sub_proto.PollResponse
if err := subclient.CallPoll(client, request, &reply); err != nil {
client.Close()
if err != io.EOF {
h.logger.Debugf(0, "error polling sub: %s\n", err)
}
continue
client, err := h.subClientResource.GetHTTP(nil, time.Second*15)
if err != nil {
return "", fmt.Errorf("error connecting to sub: %s", err)
}
defer client.Put()
request := sub_proto.PollRequest{ShortPollOnly: true}
var reply sub_proto.PollResponse
if err := subclient.CallPoll(client, request, &reply); err != nil {
client.Close()
if err != io.EOF {
return "", fmt.Errorf("error polling sub: %s", err)
}
client.Put()
return reply.LastSuccessfulImageName, nil
}
return "", errors.New("timed out getting last image name")
return reply.LastSuccessfulImageName, nil
}

func (h *hypervisorType) updateTagForHypervisor(
Expand Down Expand Up @@ -467,7 +483,7 @@ func (h *hypervisorType) upgrade(clientResource *srpc.ClientResource,
stopTime := time.Now().Add(time.Minute * 15)
updateCompleted := false
for ; time.Until(stopTime) > 0; cpuSharer.Sleep(time.Second) {
if syncedImage, err := h.getLastImageName(cpuSharer); err != nil {
if syncedImage, err := h.waitLastImageName(cpuSharer); err != nil {
return err
} else if syncedImage == imageName {
updateCompleted = true
Expand All @@ -492,3 +508,17 @@ func (h *hypervisorType) upgrade(clientResource *srpc.ClientResource,
h.logger.Debugln(0, "still healthy")
return nil
}

func (h *hypervisorType) waitLastImageName(cpuSharer *cpusharer.FifoCpuSharer) (
string, error) {
stopTime := time.Now().Add(time.Minute)
for ; time.Until(stopTime) > 0; cpuSharer.Sleep(time.Second * 5) {
imageName, err := h.getLastImageName(cpuSharer)
if err != nil {
h.logger.Debugln(0, err)
continue
}
return imageName, nil
}
return "", errors.New("timed out getting last image name")
}

0 comments on commit b65ccc6

Please sign in to comment.