Skip to content

Commit

Permalink
Merge pull request #555 from rgooch/master
Browse files Browse the repository at this point in the history
Add VM copy capability.
  • Loading branch information
rgooch committed Feb 1, 2019
2 parents 455c4f1 + 7d882ed commit 4c2e524
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 10 deletions.
3 changes: 3 additions & 0 deletions cmd/vm-control/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ vm-control -h
Some of the sub-commands available are:

- **become-primary-vm-owner**: become the primary owner of a VM
- **change-vm-destroy-protection**: enable/disable destroy protect for a VM
- **change-vm-owner-users**: change the extra owners for a VM
- **change-vm-tags**: change the tags for a VM
- **copy-vm**: make a copy of a VM
- **create-vm**: create a VM
- **delete-vm-volume**: delete a specified volume from a VM
- **destroy-vm**: destroy a VM (all ephemeral data and metadata are lost)
- **discard-vm-old-image**: discard the previous root image for a VM
- **discard-vm-old-user-data**: discard the previous user data for a VM
Expand Down
107 changes: 107 additions & 0 deletions cmd/vm-control/copyVm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package main

import (
"encoding/gob"
"errors"
"fmt"
"net"
"os"

"github.com/Symantec/Dominator/lib/log"
"github.com/Symantec/Dominator/lib/srpc"
hyper_proto "github.com/Symantec/Dominator/proto/hypervisor"
)

func copyVmSubcommand(args []string, logger log.DebugLogger) {
if err := copyVm(args[0], logger); err != nil {
fmt.Fprintf(os.Stderr, "Error copying VM: %s\n", err)
os.Exit(1)
}
os.Exit(0)
}

func copyVm(vmHostname string, logger log.DebugLogger) error {
if vmIP, hypervisor, err := searchVmAndHypervisor(vmHostname); err != nil {
return err
} else {
return copyVmFromHypervisor(hypervisor, vmIP, logger)
}
}

func callCopyVm(client *srpc.Client, request hyper_proto.CopyVmRequest,
reply *hyper_proto.CopyVmResponse, logger log.DebugLogger) error {
conn, err := client.Call("Hypervisor.CopyVm")
if err != nil {
return err
}
defer conn.Close()
encoder := gob.NewEncoder(conn)
decoder := gob.NewDecoder(conn)
if err := encoder.Encode(request); err != nil {
return err
}
if err := conn.Flush(); err != nil {
return err
}
for {
var response hyper_proto.CopyVmResponse
if err := decoder.Decode(&response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
if response.ProgressMessage != "" {
logger.Debugln(0, response.ProgressMessage)
}
if response.Final {
*reply = response
return nil
}
}
}

func copyVmFromHypervisor(sourceHypervisorAddress string, vmIP net.IP,
logger log.DebugLogger) error {
destHypervisorAddress, err := getHypervisorAddress()
if err != nil {
return err
}
sourceHypervisor, err := dialHypervisor(sourceHypervisorAddress)
if err != nil {
return err
}
defer sourceHypervisor.Close()
accessToken, err := getVmAccessTokenClient(sourceHypervisor, vmIP)
if err != nil {
return err
}
defer discardAccessToken(sourceHypervisor, vmIP)
destHypervisor, err := dialHypervisor(destHypervisorAddress)
if err != nil {
return err
}
defer destHypervisor.Close()
request := hyper_proto.CopyVmRequest{
AccessToken: accessToken,
DhcpTimeout: *dhcpTimeout,
IpAddress: vmIP,
SourceHypervisor: sourceHypervisorAddress,
}
var reply hyper_proto.CopyVmResponse
if err := callCopyVm(destHypervisor, request, &reply, logger); err != nil {
return err
}
if err := acknowledgeVm(destHypervisor, reply.IpAddress); err != nil {
return fmt.Errorf("error acknowledging VM: %s", err)
}
fmt.Println(reply.IpAddress)
if reply.DhcpTimedOut {
return errors.New("DHCP ACK timed out")
}
if *dhcpTimeout > 0 {
logger.Debugln(0, "Received DHCP ACK")
}
return maybeWatchVm(destHypervisor, destHypervisorAddress, reply.IpAddress,
logger)
}
2 changes: 2 additions & 0 deletions cmd/vm-control/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func printUsage() {
fmt.Fprintln(os.Stderr, " change-vm-destroy-protection IPaddr")
fmt.Fprintln(os.Stderr, " change-vm-owner-users IPaddr")
fmt.Fprintln(os.Stderr, " change-vm-tags IPaddr")
fmt.Fprintln(os.Stderr, " copy-vm IPaddr")
fmt.Fprintln(os.Stderr, " create-vm")
fmt.Fprintln(os.Stderr, " delete-vm-volume IPaddr")
fmt.Fprintln(os.Stderr, " destroy-vm IPaddr")
Expand Down Expand Up @@ -143,6 +144,7 @@ var subcommands = []subcommand{
{"change-vm-destroy-protection", 1, 1, changeVmDestroyProtectionSubcommand},
{"change-vm-owner-users", 1, 1, changeVmOwnerUsersSubcommand},
{"change-vm-tags", 1, 1, changeVmTagsSubcommand},
{"copy-vm", 1, 1, copyVmSubcommand},
{"create-vm", 0, 0, createVmSubcommand},
{"delete-vm-volume", 1, 1, deleteVmVolumeSubcommand},
{"destroy-vm", 1, 1, destroyVmSubcommand},
Expand Down
5 changes: 5 additions & 0 deletions hypervisor/manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func (m *Manager) CommitImportedVm(ipAddr net.IP,
return m.commitImportedVm(ipAddr, authInfo)
}

func (m *Manager) CopyVm(conn *srpc.Conn, request proto.CopyVmRequest,
encoder srpc.Encoder) error {
return m.copyVm(conn, request, encoder)
}

func (m *Manager) CreateVm(conn *srpc.Conn, decoder srpc.Decoder,
encoder srpc.Encoder) error {
return m.createVm(conn, decoder, encoder)
Expand Down
153 changes: 143 additions & 10 deletions hypervisor/manager/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,124 @@ func (m *Manager) commitImportedVm(ipAddr net.IP,
return nil
}

func (m *Manager) copyVm(conn *srpc.Conn, request proto.CopyVmRequest,
encoder srpc.Encoder) error {
m.Logger.Debugln(1, "CopyVm() starting")
hypervisor, err := srpc.DialHTTP("tcp", request.SourceHypervisor, 0)
if err != nil {
return err
}
defer hypervisor.Close()
defer func() {
req := proto.DiscardVmAccessTokenRequest{
AccessToken: request.AccessToken,
IpAddress: request.IpAddress}
var reply proto.DiscardVmAccessTokenResponse
hypervisor.RequestReply("Hypervisor.DiscardVmAccessToken",
req, &reply)
}()
getInfoRequest := proto.GetVmInfoRequest{request.IpAddress}
var getInfoReply proto.GetVmInfoResponse
err = hypervisor.RequestReply("Hypervisor.GetVmInfo", getInfoRequest,
&getInfoReply)
if err != nil {
return err
}
accessToken := request.AccessToken
vmInfo := getInfoReply.VmInfo
vmInfo.Address = proto.Address{}
vmInfo.SecondaryAddresses = nil
vmInfo.Uncommitted = false
vm, err := m.allocateVm(proto.CreateVmRequest{
DhcpTimeout: request.DhcpTimeout,
VmInfo: vmInfo,
}, conn.GetAuthInformation())
vm.OwnerUsers = getInfoReply.VmInfo.OwnerUsers
vm.Volumes = vmInfo.Volumes
if err := <-tryAllocateMemory(vmInfo.MemoryInMiB); err != nil {
return err
}
var secondaryVolumes []proto.Volume
for index, volume := range vmInfo.Volumes {
if index > 0 {
secondaryVolumes = append(secondaryVolumes, volume)
}
}
err = vm.setupVolumes(vmInfo.Volumes[0].Size, secondaryVolumes,
vmInfo.SpreadVolumes)
if err != nil {
return err
}
defer func() { // Evaluate vm at return time, not defer time.
if vm == nil {
return
}
vm.cleanup()
if getInfoReply.VmInfo.State == proto.StateRunning {
hyperclient.StartVm(hypervisor, request.IpAddress, accessToken)
}
}()
vm.ownerUsers = make(map[string]struct{}, len(vm.OwnerUsers))
for _, username := range vm.OwnerUsers {
vm.ownerUsers[username] = struct{}{}
}
if err := os.MkdirAll(vm.dirname, dirPerms); err != nil {
return err
}
// Begin copying over the volumes.
err = sendVmCopyMessage(conn, encoder, "initial volume(s) copy")
if err != nil {
return err
}
err = vm.migrateVmVolumes(hypervisor, request.IpAddress, accessToken)
if err != nil {
return err
}
if getInfoReply.VmInfo.State != proto.StateStopped {
err = sendVmCopyMessage(conn, encoder, "stopping VM")
if err != nil {
return err
}
err := hyperclient.StopVm(hypervisor, request.IpAddress,
request.AccessToken)
if err != nil {
return err
}
err = sendVmCopyMessage(conn, encoder, "update volume(s)")
if err != nil {
return err
}
err = vm.migrateVmVolumes(hypervisor, request.IpAddress, accessToken)
if err != nil {
return err
}
}
err = migratevmUserData(hypervisor, path.Join(vm.dirname, "user-data.raw"),
request.IpAddress, accessToken)
if err != nil {
return err
}
if err := sendVmCopyMessage(conn, encoder, "starting VM"); err != nil {
return err
}
dhcpTimedOut, err := vm.startManaging(request.DhcpTimeout, false)
if err != nil {
return err
}
vm.destroyTimer = time.AfterFunc(time.Second*15, vm.autoDestroy)
response := proto.CopyVmResponse{
DhcpTimedOut: dhcpTimedOut,
Final: true,
IpAddress: vm.Address.IpAddress,
}
if err := encoder.Encode(response); err != nil {
return err
}
vm = nil // Cancel cleanup.
m.Logger.Debugln(1, "CopyVm() finished")
return nil
}

func (m *Manager) createVm(conn *srpc.Conn, decoder srpc.Decoder,
encoder srpc.Encoder) error {

Expand Down Expand Up @@ -388,7 +506,9 @@ func (m *Manager) createVm(conn *srpc.Conn, decoder srpc.Decoder,
vm.ImageName = imageName
size := computeSize(request.MinimumFreeBytes, request.RoundupPower,
fs.EstimateUsage(0))
if err := vm.setupVolumes(size, request); err != nil {
err = vm.setupVolumes(size, request.SecondaryVolumes,
request.SpreadVolumes)
if err != nil {
return sendError(conn, encoder, err)
}
err = sendUpdate(conn, encoder, "unpacking image: "+imageName)
Expand Down Expand Up @@ -1036,7 +1156,8 @@ func (m *Manager) migrateVm(conn *srpc.Conn, decoder srpc.Decoder,
if err != nil {
return err
}
if err := vm.migrateVmVolumes(hypervisor, accessToken); err != nil {
err = vm.migrateVmVolumes(hypervisor, vm.Address.IpAddress, accessToken)
if err != nil {
return err
}
if vmInfo.State != proto.StateStopped {
Expand All @@ -1058,7 +1179,8 @@ func (m *Manager) migrateVm(conn *srpc.Conn, decoder srpc.Decoder,
if err != nil {
return err
}
if err := vm.migrateVmVolumes(hypervisor, accessToken); err != nil {
err = vm.migrateVmVolumes(hypervisor, vm.Address.IpAddress, accessToken)
if err != nil {
return err
}
}
Expand Down Expand Up @@ -1114,6 +1236,15 @@ func (m *Manager) migrateVm(conn *srpc.Conn, decoder srpc.Decoder,
return nil
}

func sendVmCopyMessage(conn *srpc.Conn, encoder srpc.Encoder,
message string) error {
request := proto.CopyVmResponse{ProgressMessage: message}
if err := encoder.Encode(request); err != nil {
return err
}
return conn.Flush()
}

func sendVmMigrationMessage(conn *srpc.Conn, encoder srpc.Encoder,
message string) error {
request := proto.MigrateVmResponse{ProgressMessage: message}
Expand Down Expand Up @@ -1197,10 +1328,10 @@ func migratevmUserData(hypervisor *srpc.Client, filename string,
}

func (vm *vmInfoType) migrateVmVolumes(hypervisor *srpc.Client,
accessToken []byte) error {
sourceIpAddr net.IP, accessToken []byte) error {
for index, volume := range vm.VolumeLocations {
_, err := migrateVmVolume(hypervisor, volume.Filename, uint(index),
vm.Volumes[index].Size, vm.Address.IpAddress, accessToken)
vm.Volumes[index].Size, sourceIpAddr, accessToken)
if err != nil {
return err
}
Expand Down Expand Up @@ -1861,10 +1992,12 @@ func (vm *vmInfoType) copyRootVolume(request proto.CreateVmRequest,
reader io.Reader, dataSize uint64) error {
size := computeSize(request.MinimumFreeBytes, request.RoundupPower,
dataSize)
if err := vm.setupVolumes(size, request); err != nil {
err := vm.setupVolumes(size, request.SecondaryVolumes,
request.SpreadVolumes)
if err != nil {
return err
}
err := copyData(vm.VolumeLocations[0].Filename, reader, dataSize,
err = copyData(vm.VolumeLocations[0].Filename, reader, dataSize,
privateFilePerms)
if err != nil {
return err
Expand Down Expand Up @@ -2049,9 +2182,9 @@ func (vm *vmInfoType) setState(state proto.State) {
}

func (vm *vmInfoType) setupVolumes(rootSize uint64,
request proto.CreateVmRequest) error {
secondaryVolumes []proto.Volume, spreadVolumes bool) error {
volumeDirectories, err := vm.manager.getVolumeDirectories(rootSize,
request.SecondaryVolumes, request.SpreadVolumes)
secondaryVolumes, spreadVolumes)
if err != nil {
return err
}
Expand All @@ -2063,7 +2196,7 @@ func (vm *vmInfoType) setupVolumes(rootSize uint64,
filename := path.Join(volumeDirectory, "root")
vm.VolumeLocations = append(vm.VolumeLocations,
volumeType{volumeDirectory, filename})
for index := range request.SecondaryVolumes {
for index := range secondaryVolumes {
volumeDirectory := path.Join(volumeDirectories[index+1], vm.ipAddress)
os.RemoveAll(volumeDirectory)
if err := os.MkdirAll(volumeDirectory, dirPerms); err != nil {
Expand Down
1 change: 1 addition & 0 deletions hypervisor/rpcd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func Setup(manager *manager.Manager, dhcpServer DhcpServer,
"ChangeVmOwnerUsers",
"ChangeVmTags",
"CommitImportedVm",
"CopyVm",
"CreateVm",
"DeleteVmVolume",
"DestroyVm",
Expand Down
Loading

0 comments on commit 4c2e524

Please sign in to comment.