-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
qmpcommon.py
255 lines (225 loc) · 8.9 KB
/
qmpcommon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python3
"""
Copyright (C) 2022 Michael Ablassmeier
Authors:
Michael Ablassmeier <abi@grinser.de>
This work is licensed under the terms of the GNU GPL, version 3. See
the LICENSE file in the top-level directory.
"""
import os
import logging
from time import sleep
from qemu.qmp import EventListener
from libqmpbackup import fs
class QmpCommon:
"""Common functions"""
def __init__(self, qmp):
self.qmp = qmp
self.log = logging.getLogger(__name__)
async def show_vm_state(self):
"""Show and check if virtual machine is in required
state"""
status = await self.qmp.execute("query-status")
if status["running"] is False and not status["status"] in (
"prelaunch",
"paused",
):
raise RuntimeError(f"VM not ready for backup, state: [{status}]")
self.log.info("VM is in state: [%s]", status["status"])
async def show_name(self):
"""Show qemu version"""
name = await self.qmp.execute("query-name")
if name:
self.log.info("VM Name: [%s]", name["name"])
def show_version(self):
"""Show name of VM; if setn"""
hv_version = self.qmp._greeting._raw["QMP"] # pylint: disable=W0212
qemu = hv_version["version"]["qemu"]
self.log.info(
"Qemu version: [%s.%s.%s] [%s]",
qemu["major"],
qemu["micro"],
qemu["minor"],
hv_version["version"]["package"],
)
@staticmethod
def transaction_action(action, **kwargs):
"""Return transaction action object"""
return {
"type": action,
"data": dict((k.replace("_", "-"), v) for k, v in kwargs.items()),
}
def transaction_bitmap_clear(self, node, name, **kwargs):
"""Return transaction action object for bitmap clear"""
return self.transaction_action(
"block-dirty-bitmap-clear", node=node, name=name, **kwargs
)
def transaction_bitmap_add(self, node, name, **kwargs):
"""Return transaction action object for bitmap add"""
return self.transaction_action(
"block-dirty-bitmap-add", node=node, name=name, **kwargs
)
async def prepare_target_devices(self, devices, target_files):
"""Create the required target devices for blockev-backup
operation"""
self.log.info("Prepare backup target devices")
for device in devices:
target = target_files[device.node]
targetdev = f"qmpbackup-{device.node}"
await self.qmp.execute(
"blockdev-add",
arguments={
"driver": device.format,
"node-name": targetdev,
"file": {"driver": "file", "filename": target},
},
)
async def remove_target_devices(self, devices):
"""Cleanup named devices after executing blockdev-backup
operation"""
self.log.info("Cleanup added backup target devices")
for device in devices:
targetdev = f"qmpbackup-{device.node}"
await self.qmp.execute(
"blockdev-del",
arguments={
"node-name": targetdev,
},
)
def prepare_transaction(self, argv, devices):
"""Prepare transaction steps"""
sync = "full"
if argv.level == "inc":
sync = "incremental"
bitmap_prefix = "qmpbackup"
persistent = True
if argv.level == "copy":
self.log.info("Copy backup: no persistent bitmap will be created.")
bitmap_prefix = f"qmpbackup-{argv.level}"
persistent = False
actions = []
for device in devices:
targetdev = f"qmpbackup-{device.node}"
bitmap = f"{bitmap_prefix}-{device.node}"
job_id = f"{device.node}"
if (
not device.has_bitmap
and device.format != "raw"
and argv.level in ("full", "copy")
or device.has_bitmap
and argv.level in ("copy")
):
self.log.info(
"Creating new bitmap: [%s] for device [%s]", bitmap, device.node
)
actions.append(
self.transaction_bitmap_add(
device.node, bitmap, persistent=persistent
)
)
if device.has_bitmap and argv.level in ("full") and device.format != "raw":
self.log.info("Clearing existing bitmap for device: [%s]", device.node)
actions.append(self.transaction_bitmap_clear(device.node, bitmap))
compress = argv.compress
if device.format == "raw" and compress:
compress = False
self.log.info("Disabling compression for raw device: [%s]", device.node)
if argv.level in ("full", "copy") or (
argv.level == "inc" and device.format == "raw"
):
actions.append(
self.transaction_action(
"blockdev-backup",
device=device.node,
target=targetdev,
sync="full",
job_id=job_id,
speed=argv.speed_limit,
compress=compress,
)
)
else:
actions.append(
self.transaction_action(
"blockdev-backup",
bitmap=bitmap,
device=device.node,
target=targetdev,
sync=sync,
job_id=job_id,
speed=argv.speed_limit,
compress=argv.compress,
)
)
self.log.debug("Created transaction: %s", actions)
return actions
async def backup(self, argv, devices, qga):
"""Start backup transaction, while backup is active,
watch for block status"""
actions = self.prepare_transaction(argv, devices)
listener = EventListener(
(
"BLOCK_JOB_COMPLETED",
"BLOCK_JOB_CANCELLED",
"BLOCK_JOB_ERROR",
"BLOCK_JOB_READY",
"BLOCK_JOB_PENDING",
"JOB_STATUS_CHANGE",
)
)
with self.qmp.listen(listener):
await self.qmp.execute("transaction", arguments={"actions": actions})
if qga is not False:
fs.thaw(qga)
async for event in listener:
if event["event"] == "BLOCK_JOB_COMPLETED":
self.log.info("Saved all disks")
break
if event["event"] in ("BLOCK_JOB_ERROR", "BLOCK_JOB_CANCELLED"):
raise RuntimeError(
f"Error during backup operation: {event['event']}"
)
while True:
jobs = await self.qmp.execute("query-block-jobs")
if not jobs:
break
self.progress(jobs, devices)
sleep(1)
async def do_query_block(self):
"""Return list of attached block devices"""
return await self.qmp.execute("query-block")
async def remove_bitmaps(self, blockdev, prefix="qmpbackup"):
"""Remove existing bitmaps for block devices"""
for dev in blockdev:
if not dev.has_bitmap:
self.log.info("No bitmap set for device %s", dev.node)
continue
for bitmap in dev.bitmaps:
bitmap_name = bitmap["name"]
self.log.debug("Bitmap name: %s", bitmap_name)
if prefix not in bitmap_name:
self.log.debug("Ignoring bitmap: %s", bitmap_name)
continue
self.log.info("Removing bitmap: %s", bitmap_name)
await self.qmp.execute(
"block-dirty-bitmap-remove",
arguments={"node": dev.node, "name": bitmap_name},
)
def progress(self, jobs, devices):
"""Report progress for active block job"""
for device in devices:
for job in jobs:
if job["device"] == device.node:
prog = [
round(job["offset"] / job["len"] * 100)
if job["offset"] != 0
else 0
]
self.log.info(
"[%s:%s] Wrote Offset: %s%% (%s of %s)",
job["device"],
os.path.basename(device.filename),
prog[0],
job["offset"],
job["len"],
)