From 96397766d22e080f27ca813fb4bd339450d2e638 Mon Sep 17 00:00:00 2001 From: Richard Gooch Date: Tue, 29 Jan 2019 06:47:50 -0800 Subject: [PATCH 1/4] Add proto/hypervisor.CopyVm messages. --- proto/hypervisor/messages.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/proto/hypervisor/messages.go b/proto/hypervisor/messages.go index c8658a12..9c2b9d71 100644 --- a/proto/hypervisor/messages.go +++ b/proto/hypervisor/messages.go @@ -95,6 +95,21 @@ type CommitImportedVmResponse struct { Error string } +type CopyVmRequest struct { + AccessToken []byte + DhcpTimeout time.Duration + IpAddress net.IP + SourceHypervisor string +} + +type CopyVmResponse struct { // Multiple responses are sent. + DhcpTimedOut bool + Error string + Final bool // If true, this is the final response. + IpAddress net.IP + ProgressMessage string +} + type CreateVmRequest struct { DhcpTimeout time.Duration ImageDataSize uint64 From 4959de0ebc18cf9f7671fe800ddecba222efcd44 Mon Sep 17 00:00:00 2001 From: Richard Gooch Date: Wed, 30 Jan 2019 08:17:51 -0800 Subject: [PATCH 2/4] Add hypervisor/manager.Manager.CopyVm() method. --- hypervisor/manager/api.go | 5 ++ hypervisor/manager/vm.go | 153 +++++++++++++++++++++++++++++++++++--- 2 files changed, 148 insertions(+), 10 deletions(-) diff --git a/hypervisor/manager/api.go b/hypervisor/manager/api.go index a95c1dcc..f8ca37aa 100644 --- a/hypervisor/manager/api.go +++ b/hypervisor/manager/api.go @@ -137,6 +137,11 @@ func (m *Manager) CommitImportedVm(ipAddr net.IP, return m.commitImportedVm(ipAddr, authInfo) } +func (m *Manager) CopyVm(conn *srpc.Conn, request proto.CopyVmRequest, + encoder srpc.Encoder) error { + return m.copyVm(conn, request, encoder) +} + func (m *Manager) CreateVm(conn *srpc.Conn, decoder srpc.Decoder, encoder srpc.Encoder) error { return m.createVm(conn, decoder, encoder) diff --git a/hypervisor/manager/vm.go b/hypervisor/manager/vm.go index 74f46e5a..7eeb038f 100644 --- a/hypervisor/manager/vm.go +++ b/hypervisor/manager/vm.go @@ -323,6 +323,124 @@ func (m *Manager) commitImportedVm(ipAddr net.IP, return nil } +func (m *Manager) copyVm(conn *srpc.Conn, request proto.CopyVmRequest, + encoder srpc.Encoder) error { + m.Logger.Debugln(1, "CopyVm() starting") + hypervisor, err := srpc.DialHTTP("tcp", request.SourceHypervisor, 0) + if err != nil { + return err + } + defer hypervisor.Close() + defer func() { + req := proto.DiscardVmAccessTokenRequest{ + AccessToken: request.AccessToken, + IpAddress: request.IpAddress} + var reply proto.DiscardVmAccessTokenResponse + hypervisor.RequestReply("Hypervisor.DiscardVmAccessToken", + req, &reply) + }() + getInfoRequest := proto.GetVmInfoRequest{request.IpAddress} + var getInfoReply proto.GetVmInfoResponse + err = hypervisor.RequestReply("Hypervisor.GetVmInfo", getInfoRequest, + &getInfoReply) + if err != nil { + return err + } + accessToken := request.AccessToken + vmInfo := getInfoReply.VmInfo + vmInfo.Address = proto.Address{} + vmInfo.SecondaryAddresses = nil + vmInfo.Uncommitted = false + vm, err := m.allocateVm(proto.CreateVmRequest{ + DhcpTimeout: request.DhcpTimeout, + VmInfo: vmInfo, + }, conn.GetAuthInformation()) + vm.OwnerUsers = getInfoReply.VmInfo.OwnerUsers + vm.Volumes = vmInfo.Volumes + if err := <-tryAllocateMemory(vmInfo.MemoryInMiB); err != nil { + return err + } + var secondaryVolumes []proto.Volume + for index, volume := range vmInfo.Volumes { + if index > 0 { + secondaryVolumes = append(secondaryVolumes, volume) + } + } + err = vm.setupVolumes(vmInfo.Volumes[0].Size, secondaryVolumes, + vmInfo.SpreadVolumes) + if err != nil { + return err + } + defer func() { // Evaluate vm at return time, not defer time. + if vm == nil { + return + } + vm.cleanup() + if getInfoReply.VmInfo.State == proto.StateRunning { + hyperclient.StartVm(hypervisor, request.IpAddress, accessToken) + } + }() + vm.ownerUsers = make(map[string]struct{}, len(vm.OwnerUsers)) + for _, username := range vm.OwnerUsers { + vm.ownerUsers[username] = struct{}{} + } + if err := os.MkdirAll(vm.dirname, dirPerms); err != nil { + return err + } + // Begin copying over the volumes. + err = sendVmCopyMessage(conn, encoder, "initial volume(s) copy") + if err != nil { + return err + } + err = vm.migrateVmVolumes(hypervisor, request.IpAddress, accessToken) + if err != nil { + return err + } + if getInfoReply.VmInfo.State != proto.StateStopped { + err = sendVmCopyMessage(conn, encoder, "stopping VM") + if err != nil { + return err + } + err := hyperclient.StopVm(hypervisor, request.IpAddress, + request.AccessToken) + if err != nil { + return err + } + err = sendVmCopyMessage(conn, encoder, "update volume(s)") + if err != nil { + return err + } + err = vm.migrateVmVolumes(hypervisor, request.IpAddress, accessToken) + if err != nil { + return err + } + } + err = migratevmUserData(hypervisor, path.Join(vm.dirname, "user-data.raw"), + request.IpAddress, accessToken) + if err != nil { + return err + } + if err := sendVmCopyMessage(conn, encoder, "starting VM"); err != nil { + return err + } + dhcpTimedOut, err := vm.startManaging(request.DhcpTimeout, false) + if err != nil { + return err + } + vm.destroyTimer = time.AfterFunc(time.Second*15, vm.autoDestroy) + response := proto.CopyVmResponse{ + DhcpTimedOut: dhcpTimedOut, + Final: true, + IpAddress: vm.Address.IpAddress, + } + if err := encoder.Encode(response); err != nil { + return err + } + vm = nil // Cancel cleanup. + m.Logger.Debugln(1, "CopyVm() finished") + return nil +} + func (m *Manager) createVm(conn *srpc.Conn, decoder srpc.Decoder, encoder srpc.Encoder) error { @@ -388,7 +506,9 @@ func (m *Manager) createVm(conn *srpc.Conn, decoder srpc.Decoder, vm.ImageName = imageName size := computeSize(request.MinimumFreeBytes, request.RoundupPower, fs.EstimateUsage(0)) - if err := vm.setupVolumes(size, request); err != nil { + err = vm.setupVolumes(size, request.SecondaryVolumes, + request.SpreadVolumes) + if err != nil { return sendError(conn, encoder, err) } err = sendUpdate(conn, encoder, "unpacking image: "+imageName) @@ -1036,7 +1156,8 @@ func (m *Manager) migrateVm(conn *srpc.Conn, decoder srpc.Decoder, if err != nil { return err } - if err := vm.migrateVmVolumes(hypervisor, accessToken); err != nil { + err = vm.migrateVmVolumes(hypervisor, vm.Address.IpAddress, accessToken) + if err != nil { return err } if vmInfo.State != proto.StateStopped { @@ -1058,7 +1179,8 @@ func (m *Manager) migrateVm(conn *srpc.Conn, decoder srpc.Decoder, if err != nil { return err } - if err := vm.migrateVmVolumes(hypervisor, accessToken); err != nil { + err = vm.migrateVmVolumes(hypervisor, vm.Address.IpAddress, accessToken) + if err != nil { return err } } @@ -1114,6 +1236,15 @@ func (m *Manager) migrateVm(conn *srpc.Conn, decoder srpc.Decoder, return nil } +func sendVmCopyMessage(conn *srpc.Conn, encoder srpc.Encoder, + message string) error { + request := proto.CopyVmResponse{ProgressMessage: message} + if err := encoder.Encode(request); err != nil { + return err + } + return conn.Flush() +} + func sendVmMigrationMessage(conn *srpc.Conn, encoder srpc.Encoder, message string) error { request := proto.MigrateVmResponse{ProgressMessage: message} @@ -1197,10 +1328,10 @@ func migratevmUserData(hypervisor *srpc.Client, filename string, } func (vm *vmInfoType) migrateVmVolumes(hypervisor *srpc.Client, - accessToken []byte) error { + sourceIpAddr net.IP, accessToken []byte) error { for index, volume := range vm.VolumeLocations { _, err := migrateVmVolume(hypervisor, volume.Filename, uint(index), - vm.Volumes[index].Size, vm.Address.IpAddress, accessToken) + vm.Volumes[index].Size, sourceIpAddr, accessToken) if err != nil { return err } @@ -1861,10 +1992,12 @@ func (vm *vmInfoType) copyRootVolume(request proto.CreateVmRequest, reader io.Reader, dataSize uint64) error { size := computeSize(request.MinimumFreeBytes, request.RoundupPower, dataSize) - if err := vm.setupVolumes(size, request); err != nil { + err := vm.setupVolumes(size, request.SecondaryVolumes, + request.SpreadVolumes) + if err != nil { return err } - err := copyData(vm.VolumeLocations[0].Filename, reader, dataSize, + err = copyData(vm.VolumeLocations[0].Filename, reader, dataSize, privateFilePerms) if err != nil { return err @@ -2049,9 +2182,9 @@ func (vm *vmInfoType) setState(state proto.State) { } func (vm *vmInfoType) setupVolumes(rootSize uint64, - request proto.CreateVmRequest) error { + secondaryVolumes []proto.Volume, spreadVolumes bool) error { volumeDirectories, err := vm.manager.getVolumeDirectories(rootSize, - request.SecondaryVolumes, request.SpreadVolumes) + secondaryVolumes, spreadVolumes) if err != nil { return err } @@ -2063,7 +2196,7 @@ func (vm *vmInfoType) setupVolumes(rootSize uint64, filename := path.Join(volumeDirectory, "root") vm.VolumeLocations = append(vm.VolumeLocations, volumeType{volumeDirectory, filename}) - for index := range request.SecondaryVolumes { + for index := range secondaryVolumes { volumeDirectory := path.Join(volumeDirectories[index+1], vm.ipAddress) os.RemoveAll(volumeDirectory) if err := os.MkdirAll(volumeDirectory, dirPerms); err != nil { From 96f7cf275b618bbbc2045dd43fc2850ce1e0f80c Mon Sep 17 00:00:00 2001 From: Richard Gooch Date: Thu, 31 Jan 2019 07:13:42 -0800 Subject: [PATCH 3/4] Add Hypervisor.CopyVm SRPC method. --- hypervisor/rpcd/api.go | 1 + hypervisor/rpcd/copyVm.go | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 hypervisor/rpcd/copyVm.go diff --git a/hypervisor/rpcd/api.go b/hypervisor/rpcd/api.go index 3b3ce95e..687a94ff 100644 --- a/hypervisor/rpcd/api.go +++ b/hypervisor/rpcd/api.go @@ -56,6 +56,7 @@ func Setup(manager *manager.Manager, dhcpServer DhcpServer, "ChangeVmOwnerUsers", "ChangeVmTags", "CommitImportedVm", + "CopyVm", "CreateVm", "DeleteVmVolume", "DestroyVm", diff --git a/hypervisor/rpcd/copyVm.go b/hypervisor/rpcd/copyVm.go new file mode 100644 index 00000000..01be0a82 --- /dev/null +++ b/hypervisor/rpcd/copyVm.go @@ -0,0 +1,23 @@ +package rpcd + +import ( + "github.com/Symantec/Dominator/lib/srpc" + "github.com/Symantec/Dominator/proto/hypervisor" +) + +func (t *srpcType) CopyVm(conn *srpc.Conn, decoder srpc.Decoder, + encoder srpc.Encoder) error { + if err := t.copyVm(conn, decoder, encoder); err != nil { + return encoder.Encode(hypervisor.CopyVmResponse{Error: err.Error()}) + } + return nil +} + +func (t *srpcType) copyVm(conn *srpc.Conn, decoder srpc.Decoder, + encoder srpc.Encoder) error { + var request hypervisor.CopyVmRequest + if err := decoder.Decode(&request); err != nil { + return err + } + return t.manager.CopyVm(conn, request, encoder) +} From 7d882ed00b6edd5409ce57a812a73fd84e33632d Mon Sep 17 00:00:00 2001 From: Richard Gooch Date: Fri, 1 Feb 2019 07:49:13 -0800 Subject: [PATCH 4/4] Add copy-vm subcommand to vm-control tool. --- cmd/vm-control/README.md | 3 ++ cmd/vm-control/copyVm.go | 107 +++++++++++++++++++++++++++++++++++++++ cmd/vm-control/main.go | 2 + 3 files changed, 112 insertions(+) create mode 100644 cmd/vm-control/copyVm.go diff --git a/cmd/vm-control/README.md b/cmd/vm-control/README.md index 42c481c6..df840060 100644 --- a/cmd/vm-control/README.md +++ b/cmd/vm-control/README.md @@ -24,9 +24,12 @@ vm-control -h Some of the sub-commands available are: - **become-primary-vm-owner**: become the primary owner of a VM +- **change-vm-destroy-protection**: enable/disable destroy protect for a VM - **change-vm-owner-users**: change the extra owners for a VM - **change-vm-tags**: change the tags for a VM +- **copy-vm**: make a copy of a VM - **create-vm**: create a VM +- **delete-vm-volume**: delete a specified volume from a VM - **destroy-vm**: destroy a VM (all ephemeral data and metadata are lost) - **discard-vm-old-image**: discard the previous root image for a VM - **discard-vm-old-user-data**: discard the previous user data for a VM diff --git a/cmd/vm-control/copyVm.go b/cmd/vm-control/copyVm.go new file mode 100644 index 00000000..ea4088de --- /dev/null +++ b/cmd/vm-control/copyVm.go @@ -0,0 +1,107 @@ +package main + +import ( + "encoding/gob" + "errors" + "fmt" + "net" + "os" + + "github.com/Symantec/Dominator/lib/log" + "github.com/Symantec/Dominator/lib/srpc" + hyper_proto "github.com/Symantec/Dominator/proto/hypervisor" +) + +func copyVmSubcommand(args []string, logger log.DebugLogger) { + if err := copyVm(args[0], logger); err != nil { + fmt.Fprintf(os.Stderr, "Error copying VM: %s\n", err) + os.Exit(1) + } + os.Exit(0) +} + +func copyVm(vmHostname string, logger log.DebugLogger) error { + if vmIP, hypervisor, err := searchVmAndHypervisor(vmHostname); err != nil { + return err + } else { + return copyVmFromHypervisor(hypervisor, vmIP, logger) + } +} + +func callCopyVm(client *srpc.Client, request hyper_proto.CopyVmRequest, + reply *hyper_proto.CopyVmResponse, logger log.DebugLogger) error { + conn, err := client.Call("Hypervisor.CopyVm") + if err != nil { + return err + } + defer conn.Close() + encoder := gob.NewEncoder(conn) + decoder := gob.NewDecoder(conn) + if err := encoder.Encode(request); err != nil { + return err + } + if err := conn.Flush(); err != nil { + return err + } + for { + var response hyper_proto.CopyVmResponse + if err := decoder.Decode(&response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + if response.ProgressMessage != "" { + logger.Debugln(0, response.ProgressMessage) + } + if response.Final { + *reply = response + return nil + } + } +} + +func copyVmFromHypervisor(sourceHypervisorAddress string, vmIP net.IP, + logger log.DebugLogger) error { + destHypervisorAddress, err := getHypervisorAddress() + if err != nil { + return err + } + sourceHypervisor, err := dialHypervisor(sourceHypervisorAddress) + if err != nil { + return err + } + defer sourceHypervisor.Close() + accessToken, err := getVmAccessTokenClient(sourceHypervisor, vmIP) + if err != nil { + return err + } + defer discardAccessToken(sourceHypervisor, vmIP) + destHypervisor, err := dialHypervisor(destHypervisorAddress) + if err != nil { + return err + } + defer destHypervisor.Close() + request := hyper_proto.CopyVmRequest{ + AccessToken: accessToken, + DhcpTimeout: *dhcpTimeout, + IpAddress: vmIP, + SourceHypervisor: sourceHypervisorAddress, + } + var reply hyper_proto.CopyVmResponse + if err := callCopyVm(destHypervisor, request, &reply, logger); err != nil { + return err + } + if err := acknowledgeVm(destHypervisor, reply.IpAddress); err != nil { + return fmt.Errorf("error acknowledging VM: %s", err) + } + fmt.Println(reply.IpAddress) + if reply.DhcpTimedOut { + return errors.New("DHCP ACK timed out") + } + if *dhcpTimeout > 0 { + logger.Debugln(0, "Received DHCP ACK") + } + return maybeWatchVm(destHypervisor, destHypervisorAddress, reply.IpAddress, + logger) +} diff --git a/cmd/vm-control/main.go b/cmd/vm-control/main.go index 28684ada..c5fc6543 100644 --- a/cmd/vm-control/main.go +++ b/cmd/vm-control/main.go @@ -100,6 +100,7 @@ func printUsage() { fmt.Fprintln(os.Stderr, " change-vm-destroy-protection IPaddr") fmt.Fprintln(os.Stderr, " change-vm-owner-users IPaddr") fmt.Fprintln(os.Stderr, " change-vm-tags IPaddr") + fmt.Fprintln(os.Stderr, " copy-vm IPaddr") fmt.Fprintln(os.Stderr, " create-vm") fmt.Fprintln(os.Stderr, " delete-vm-volume IPaddr") fmt.Fprintln(os.Stderr, " destroy-vm IPaddr") @@ -143,6 +144,7 @@ var subcommands = []subcommand{ {"change-vm-destroy-protection", 1, 1, changeVmDestroyProtectionSubcommand}, {"change-vm-owner-users", 1, 1, changeVmOwnerUsersSubcommand}, {"change-vm-tags", 1, 1, changeVmTagsSubcommand}, + {"copy-vm", 1, 1, copyVmSubcommand}, {"create-vm", 0, 0, createVmSubcommand}, {"delete-vm-volume", 1, 1, deleteVmVolumeSubcommand}, {"destroy-vm", 1, 1, destroyVmSubcommand},