Skip to content

Commit

Permalink
Fix restore of thin-provisioned disks to VM
Browse files Browse the repository at this point in the history
The restore via imageio requires the exact number of bytes
to be transferred, this is now implemented by saving the
number of bytes backed in restoreobjects.

Also fixed a typo so that getting the correct actual_size from
OVF data now works.

Finally added transfer rate statistics for each disk image
in the job log.
  • Loading branch information
sduehr authored and pstorz committed Dec 16, 2019
1 parent 9d14b65 commit 6d55aaf
Showing 1 changed file with 127 additions and 9 deletions.
136 changes: 127 additions & 9 deletions core/src/plugins/filed/BareosFdPluginOvirt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io
import time
import uuid
import json

import lxml.etree

Expand Down Expand Up @@ -60,9 +61,7 @@ def __init__(self, context, plugindef):

def parse_plugin_definition(self, context, plugindef):
'''
We have default options that should work out of the box in the most use cases
that the mysql/mariadb is on the same host and can be accessed without user/password information,
e.g. with a valid my.cnf for user root.
Parses the plugin arguments
'''
super(BareosFdPluginOvirt, self).parse_plugin_definition(context, plugindef)

Expand Down Expand Up @@ -153,7 +152,32 @@ def start_backup_file(self, context, savepkt):
"BareosFdPluginOvirt:start_backup_file() Error: %s" % str(e))
self.ovirt.end_transfer(context)
return bRCs['bRC_Error']

elif 'disk_metadata' in self.backup_obj:
# save disk metadata as restoreobject

disk_alias = self.backup_obj['disk_metadata']['alias']
disk_id = self.backup_obj['disk_metadata']['id']
snapshot_id = self.backup_obj['snapshot_id']
disk_metadata_json = json.dumps({'disk_metadata': self.backup_obj['disk_metadata']})

bareosfd.DebugMessage(
context, 100,
"BareosFdPluginOvirt:start_backup_file() backup disk metadata '%s'('%s'/'%s') of VM '%s': %s\n" % (
disk_alias, disk_id, snapshot_id, self.backup_obj['vmname'], disk_metadata_json))

savepkt.type = bFileType['FT_RESTORE_FIRST']
savepkt.fname = "/VMS/%s-%s/%s-%s/%s.metadata" % (self.backup_obj['vmname'], self.backup_obj['vmid'], disk_alias, disk_id, snapshot_id)
savepkt.object_name = savepkt.fname
savepkt.object = bytearray(disk_metadata_json)
savepkt.object_len = len(savepkt.object)
savepkt.object_index = int(time.time())

else:
bareosfd.JobMessage(
context, bJobMessageType['M_FATAL'],
"BareosFdPluginOvirt:start_backup_file(): Invalid data in backup_obj, keys: %s\n" %
(self.backup_obj.keys()))
return bRCs['bRC_Error']

bareosfd.JobMessage(
Expand Down Expand Up @@ -418,13 +442,73 @@ def handle_plugin_event(self, context, event):
return bRCs['bRC_OK']

def end_backup_file(self, context):
bareosfd.DebugMessage(
context, 100,
"BareosFdPluginOvirt::end_backup_file() entry point in Python called\n")
if self.ovirt.transfer_start_time:
elapsed_seconds = round(time.time() - self.ovirt.transfer_start_time, 2)
download_rate = round(self.ovirt.init_bytes_to_transf / 1000.0 / elapsed_seconds, 1)
bareosfd.JobMessage(
context, bJobMessageType['M_INFO'],
" Transfer time: %s s bytes: %s rate: %s KB/s\n" % (elapsed_seconds, self.ovirt.init_bytes_to_transf, download_rate))
self.ovirt.transfer_start_time = None

if self.ovirt.backup_objects:
return bRCs['bRC_More']
else:
return bRCs['bRC_OK']

# def end_restore_file(self, context):
# def restore_object_data(self, context, ROP):
def end_restore_file(self, context):
bareosfd.DebugMessage(
context, 100,
"BareosFdPluginOvirt::end_restore_file() entry point in Python called\n")
if self.ovirt.transfer_start_time:
elapsed_seconds = round(time.time() - self.ovirt.transfer_start_time, 2)
download_rate = round(self.ovirt.init_bytes_to_transf / 1000.0 / elapsed_seconds, 1)
bareosfd.JobMessage(
context, bJobMessageType['M_INFO'],
" Upload time: %s s bytes: %s rate: %s KB/s\n" % (elapsed_seconds, self.ovirt.init_bytes_to_transf, download_rate))
self.ovirt.transfer_start_time = None
return bRCs['bRC_OK']

def restore_object_data(self, context, ROP):
"""
Note:
This is called in two cases:
- on diff/inc backup (should be called only once)
- on restore (for every job id being restored)
But at the point in time called, it is not possible
to distinguish which of them it is, because job type
is "I" until the bEventStartBackupJob event
"""
bareosfd.DebugMessage(
context, 100,
"BareosFdPluginOvirt:restore_object_data() called with ROP:%s\n" %
(ROP))
bareosfd.DebugMessage(
context, 100,
"ROP.object_name(%s): %s\n" %
(type(ROP.object_name), ROP.object_name))
bareosfd.DebugMessage(
context, 100,
"ROP.plugin_name(%s): %s\n" %
(type(ROP.plugin_name), ROP.plugin_name))
bareosfd.DebugMessage(
context, 100,
"ROP.object_len(%s): %s\n" %
(type(ROP.object_len), ROP.object_len))
bareosfd.DebugMessage(
context, 100,
"ROP.object_full_len(%s): %s\n" %
(type(ROP.object_full_len), ROP.object_full_len))
bareosfd.DebugMessage(
context, 100,
"ROP.object(%s): %s\n" %
(type(ROP.object), ROP.object))
ro_data = json.loads(str(ROP.object))
self.ovirt.disk_metadata_by_id[ro_data['disk_metadata']['id']] = ro_data['disk_metadata']

return bRCs['bRC_OK']


class BareosOvirtWrapper(object):
Expand Down Expand Up @@ -452,9 +536,12 @@ def __init__(self):

self.proxy_connection = None
self.bytes_to_transf = None
self.init_bytes_to_transf = None
self.transfer_start_time = None

self.backup_objects = None
self.restore_objects = None
self.disk_metadata_by_id = {}

self.old_new_ids = {}

Expand Down Expand Up @@ -778,10 +865,24 @@ def start_download(self, context, snapshot, disk):

self.bytes_to_transf = int(self.response.getheader('Content-Length'))

self.backup_objects.insert(0, {
'vmname': self.vm.name,
'vmid': self.vm.id,
'snapshot_id': snapshot.id,
'disk_metadata': {
'id': disk.id,
'alias': disk.alias,
'effective_size': self.bytes_to_transf
}
})

bareosfd.JobMessage(
context, bJobMessageType['M_INFO'],
" Transfer disk snapshot of %s bytes\n" % (str(self.bytes_to_transf)))

self.init_bytes_to_transf = self.bytes_to_transf
self.transfer_start_time = time.time()

def process_download(self, context, chunk_size):

chunk = ""
Expand Down Expand Up @@ -1113,8 +1214,8 @@ def get_or_add_vm_disk(self, context, obj, disk_id=None):
size = int(obj['size']) * 2**30

actual_size = 0
if 'actual-size' in obj:
actual_size = int(obj['actual-size']) * 2**30
if 'actual_size' in obj:
actual_size = int(obj['actual_size']) * 2**30

disk_interface = types.DiskInterface.VIRTIO
if 'disk-interface' in obj:
Expand Down Expand Up @@ -1171,6 +1272,15 @@ def start_upload(self, context, disk):
bareosfd.JobMessage(
context, bJobMessageType['M_INFO'],
"Uploading disk '%s'('%s')\n" % (disk.alias, disk.id))
bareosfd.DebugMessage(
context, 100,
"Uploading disk '%s'('%s')\n" % (disk.alias, disk.id))
bareosfd.DebugMessage(
context, 200,
"old_new_ids: %s\n" % (self.old_new_ids))
bareosfd.DebugMessage(
context, 200,
"self.restore_objects: %s\n" % (self.restore_objects))

self.transfer_service = self.get_transfer_service(types.ImageTransfer(image=types.Image(id=disk.id),
direction=types.ImageTransferDirection.UPLOAD))
Expand All @@ -1182,7 +1292,14 @@ def start_upload(self, context, disk):
self.proxy_connection.putrequest("PUT", proxy_url.path)
self.proxy_connection.putheader('Authorization', transfer.signed_ticket)

self.init_bytes_to_transf = self.bytes_to_transf = int(disk.actual_size)
# To prevent from errors on transfer, the exact number of bytes that
# will be sent must be used, we call it effective_size here. It was
# saved at backup time in restoreobjects, see start_backup_file(),
# and restoreobjects are restored before any file.
new_old_ids = {v: k for k, v in self.old_new_ids.items()}
old_id = new_old_ids[disk.id]
effective_size = self.disk_metadata_by_id[old_id]['effective_size']
self.init_bytes_to_transf = self.bytes_to_transf = effective_size

content_range = "bytes %d-%d/%d" % (0, self.bytes_to_transf - 1, self.bytes_to_transf)
self.proxy_connection.putheader('Content-Range', content_range)
Expand All @@ -1192,6 +1309,7 @@ def start_upload(self, context, disk):
bareosfd.JobMessage(
context, bJobMessageType['M_INFO'],
" Upload disk of %s bytes\n" % (str(self.bytes_to_transf)))
self.transfer_start_time = time.time()

def process_ovf(self, context, chunk):
if self.ovf_data is None:
Expand Down Expand Up @@ -1299,7 +1417,7 @@ def end_vm_backup(self, context):
# Close the connection to the server:
self.connection.close()

def wait_for_snapshot_removal(self, snapshot_id, timeout=60, delay=10):
def wait_for_snapshot_removal(self, snapshot_id, timeout=30, delay=10):
t_start = int(time.time())
snaps_service = self.vm_service.snapshots_service()

Expand Down

0 comments on commit 6d55aaf

Please sign in to comment.