/
blockdevice.py
1824 lines (1528 loc) · 63.1 KB
/
blockdevice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- test-case-name: flocker.node.agents.test.test_blockdevice -*-
# Copyright Hybrid Logic Ltd. See LICENSE file for details.
"""
This module implements the parts of a block-device based dataset
convergence agent that can be re-used against many different kinds of block
devices.
"""
from uuid import UUID, uuid4
from subprocess import CalledProcessError, check_output, STDOUT
from stat import S_IRWXU, S_IRWXG, S_IRWXO
from errno import EEXIST
from bitmath import GiB
from eliot import MessageType, ActionType, Field, Logger
from eliot.serializers import identity
from zope.interface import implementer, Interface
from pyrsistent import PRecord, field
from characteristic import attributes
import psutil
from twisted.python.reflect import safe_repr
from twisted.internet.defer import succeed, fail, gatherResults
from twisted.python.filepath import FilePath
from twisted.python.components import proxyForInterface
from .. import (
IDeployer, IStateChange, sequentially, in_parallel, run_state_change
)
from .._deploy import NotInUseDatasets
from ...control import NodeState, Manifestation, Dataset, NonManifestDatasets
from ...common import auto_threaded
# Eliot is transitioning away from the "Logger instances all over the place"
# approach. And it's hard to put Logger instances on PRecord subclasses which
# we have a lot of. So just use this global logger for now.
_logger = Logger()
# The size which will be assigned to datasets with an unspecified
# maximum_size.
# XXX: Make this configurable. FLOC-2044
DEFAULT_DATASET_SIZE = int(GiB(100).to_Byte().value)
@attributes(["dataset_id"])
class DatasetWithoutVolume(Exception):
"""
An operation was attempted on a dataset that involves manipulating the
dataset's volume but that volume could not be found.
:ivar UUID dataset_id: The unique identifier of the dataset the operation
was meant to affect.
"""
class VolumeException(Exception):
"""
A base class for exceptions raised by ``IBlockDeviceAPI`` operations.
:param unicode blockdevice_id: The unique identifier of the block device.
"""
def __init__(self, blockdevice_id):
if not isinstance(blockdevice_id, unicode):
raise TypeError(
'Unexpected blockdevice_id type. '
'Expected unicode. '
'Got {!r}.'.format(blockdevice_id)
)
Exception.__init__(self, blockdevice_id)
self.blockdevice_id = blockdevice_id
class UnknownVolume(VolumeException):
"""
The block device could not be found.
"""
class AlreadyAttachedVolume(VolumeException):
"""
A failed attempt to attach a block device that is already attached.
"""
class UnattachedVolume(VolumeException):
"""
An attempt was made to operate on an unattached volume but the operation
requires the volume to be attached.
"""
class DatasetExists(Exception):
"""
A ``BlockDeviceVolume`` with the requested dataset_id already exists.
"""
def __init__(self, blockdevice):
Exception.__init__(self, blockdevice)
self.blockdevice = blockdevice
class FilesystemExists(Exception):
"""
A failed attempt to create a filesystem on a block device that already has
one.
"""
def __init__(self, device):
Exception.__init__(self, device)
self.device = device
DATASET = Field(
u"dataset",
lambda dataset: dataset.dataset_id,
u"The unique identifier of a dataset."
)
VOLUME = Field(
u"volume",
lambda volume: volume.blockdevice_id,
u"The unique identifier of a volume."
)
FILESYSTEM_TYPE = Field.forTypes(
u"filesystem_type",
[unicode],
u"The name of a filesystem."
)
DATASET_ID = Field(
u"dataset_id",
lambda dataset_id: unicode(dataset_id),
u"The unique identifier of a dataset."
)
MOUNTPOINT = Field(
u"mountpoint",
lambda path: path.path,
u"The absolute path to the location on the node where the dataset will be "
u"mounted.",
)
DEVICE_PATH = Field(
u"block_device_path",
lambda path: path.path,
u"The absolute path to the block device file on the node where the "
u"dataset is attached.",
)
BLOCK_DEVICE_ID = Field(
u"block_device_id",
lambda id: unicode(id),
u"The unique identifier if the underlying block device."
)
BLOCK_DEVICE_SIZE = Field(
u"block_device_size",
identity,
u"The size of the underlying block device."
)
BLOCK_DEVICE_COMPUTE_INSTANCE_ID = Field(
u"block_device_compute_instance_id",
identity,
u"An identifier for the host to which the underlying block device is "
u"attached.",
)
BLOCK_DEVICE_PATH = Field(
u"block_device_path",
lambda path: path.path,
u"The system device file for an attached block device."
)
CREATE_BLOCK_DEVICE_DATASET = ActionType(
u"agent:blockdevice:create",
[DATASET, MOUNTPOINT],
[],
u"A block-device-backed dataset is being created.",
)
# Really this is the successful completion of CREATE_BLOCK_DEVICE_DATASET. It
# might be nice if these fields could just be added to the running action
# instead of being logged as a separate message (but still in the correct
# context). Or maybe this is fine as-is.
BLOCK_DEVICE_DATASET_CREATED = MessageType(
u"agent:blockdevice:created",
[DEVICE_PATH, BLOCK_DEVICE_ID, DATASET_ID, BLOCK_DEVICE_SIZE,
BLOCK_DEVICE_COMPUTE_INSTANCE_ID],
u"A block-device-backed dataset has been created.",
)
DESTROY_BLOCK_DEVICE_DATASET = ActionType(
u"agent:blockdevice:destroy",
[DATASET_ID],
[],
u"A block-device-backed dataset is being destroyed.",
)
UNMOUNT_BLOCK_DEVICE = ActionType(
u"agent:blockdevice:unmount",
[DATASET_ID],
[],
u"A block-device-backed dataset is being unmounted.",
)
UNMOUNT_BLOCK_DEVICE_DETAILS = MessageType(
u"agent:blockdevice:unmount:details",
[VOLUME, BLOCK_DEVICE_PATH],
u"The device file for a block-device-backed dataset has been discovered."
)
MOUNT_BLOCK_DEVICE = ActionType(
u"agent:blockdevice:mount",
[DATASET_ID],
[],
u"A block-device-backed dataset is being mounted.",
)
MOUNT_BLOCK_DEVICE_DETAILS = MessageType(
u"agent:blockdevice:mount:details",
[VOLUME, BLOCK_DEVICE_PATH],
u"The device file for a block-device-backed dataset has been discovered."
)
ATTACH_VOLUME = ActionType(
u"agent:blockdevice:attach_volume",
[DATASET_ID],
[],
u"The volume for a block-device-backed dataset is being attached."
)
ATTACH_VOLUME_DETAILS = MessageType(
u"agent:blockdevice:attach_volume:details",
[VOLUME],
u"The volume for a block-device-backed dataset has been discovered."
)
DETACH_VOLUME = ActionType(
u"agent:blockdevice:detach_volume",
[DATASET_ID],
[],
u"The volume for a block-device-backed dataset is being detached."
)
DETACH_VOLUME_DETAILS = MessageType(
u"agent:blockdevice:detach_volume:details",
[VOLUME],
u"The volume for a block-device-backed dataset has been discovered."
)
DESTROY_VOLUME = ActionType(
u"agent:blockdevice:destroy_volume",
[VOLUME],
[],
u"The volume for a block-device-backed dataset is being destroyed."
)
CREATE_FILESYSTEM = ActionType(
u"agent:blockdevice:create_filesystem",
[VOLUME, FILESYSTEM_TYPE],
[],
u"A block device is being initialized with a filesystem.",
)
INVALID_DEVICE_PATH_VALUE = Field(
u"invalid_value",
lambda value: safe_repr(value),
u"A value returned from IBlockDeviceAPI.get_device_path which could not "
u"possibly be correct. This likely indicates a bug in the "
"IBlockDeviceAPI implementation.",
)
INVALID_DEVICE_PATH = MessageType(
u"agent:blockdevice:discover_state:invalid_device_path",
[DATASET_ID, INVALID_DEVICE_PATH_VALUE],
u"The device path given by the IBlockDeviceAPI implementation was "
u"invalid.",
)
def _volume_field():
"""
Create and return a ``PRecord`` ``field`` to hold a ``BlockDeviceVolume``.
"""
return field(
type=BlockDeviceVolume, mandatory=True,
# Disable the automatic PRecord.create factory. Callers can just
# supply the right type, we don't need the magic coercion behavior
# supplied by default.
factory=lambda x: x
)
class BlockDeviceVolume(PRecord):
"""
A block device that may be attached to a host.
:ivar unicode blockdevice_id: An identifier for the block device which is
unique across the entire cluster. For example, an EBS volume
identifier (``vol-4282672b``). This is used to address the block
device for operations like attach and detach.
:ivar int size: The size, in bytes, of the block device.
:ivar unicode attached_to: An opaque identifier for the node to which the
volume is attached or ``None`` if it is currently unattached. The
identifier is supplied by the ``IBlockDeviceAPI.compute_instance_id``
method based on the underlying infrastructure services (for example, if
the cluster runs on AWS, this is very likely an EC2 instance id).
:ivar UUID dataset_id: The Flocker dataset ID associated with this volume.
"""
blockdevice_id = field(type=unicode, mandatory=True)
size = field(type=int, mandatory=True)
attached_to = field(
type=(unicode, type(None)), initial=None, mandatory=True
)
dataset_id = field(type=UUID, mandatory=True)
def _blockdevice_volume_from_datasetid(volumes, dataset_id):
"""
A helper to get the volume for a given dataset_id.
:param list volumes: The ``BlockDeviceVolume`` instances to inspect for a
match.
:param UUID dataset_id: The identifier of the dataset the volume of which
to find.
:return: Either a ``BlockDeviceVolume`` matching the given ``dataset_id``
or ``None`` if no such volume can be found.
"""
for volume in volumes:
if volume.dataset_id == dataset_id:
return volume
# Get rid of this in favor of calculating each individual operation in
# BlockDeviceDeployer.calculate_changes. FLOC-1772
@implementer(IStateChange)
class DestroyBlockDeviceDataset(PRecord):
"""
Destroy the volume for a dataset with a primary manifestation on the node
where this state change runs.
:ivar UUID dataset_id: The unique identifier of the dataset to which the
volume to be destroyed belongs.
"""
dataset_id = field(type=UUID, mandatory=True)
# This can be replaced with a regular attribute when the `_logger` argument
# is no longer required by Eliot.
@property
def eliot_action(self):
return DESTROY_BLOCK_DEVICE_DATASET(
_logger, dataset_id=self.dataset_id
)
def run(self, deployer):
volume = _blockdevice_volume_from_datasetid(
deployer.block_device_api.list_volumes(), self.dataset_id
)
if volume is None:
return succeed(None)
return run_state_change(
sequentially(
changes=[
UnmountBlockDevice(dataset_id=self.dataset_id),
DetachVolume(dataset_id=self.dataset_id),
DestroyVolume(volume=volume),
]
),
deployer,
)
@implementer(IStateChange)
class CreateFilesystem(PRecord):
"""
Create a filesystem on a block device.
:ivar BlockDeviceVolume volume: The volume in which to create the
filesystem.
:ivar unicode filesystem: The name of the filesystem type to create. For
example, ``u"ext4"``.
"""
volume = _volume_field()
filesystem = field(type=unicode, mandatory=True)
@property
def eliot_action(self):
return CREATE_FILESYSTEM(
_logger, volume=self.volume, filesystem_type=self.filesystem
)
def run(self, deployer):
# FLOC-1816 Make this asynchronous
device = deployer.block_device_api.get_device_path(
self.volume.blockdevice_id
)
try:
_ensure_no_filesystem(device)
check_output([
b"mkfs", b"-t", self.filesystem.encode("ascii"),
# This is ext4 specific, and ensures mke2fs doesn't ask
# user interactively about whether they really meant to
# format whole device rather than partition. It will be
# removed once upstream bug is fixed. See FLOC-2085.
b"-F",
device.path
])
except:
return fail()
return succeed(None)
def _ensure_no_filesystem(device):
"""
Raises an error if there's already a filesystem on ``device``.
:raises: ``FilesystemExists`` if there is already a filesystem on
``device``.
:return: ``None``
"""
try:
check_output(
[b"blkid", b"-p", b"-u", b"filesystem", device.path],
stderr=STDOUT,
)
except CalledProcessError as e:
# According to the man page:
# the specified token was not found, or no (specified) devices
# could be identified
#
# Experimentation shows that there is no output in the case of the
# former, and an error printed to stderr in the case of the
# latter.
#
# FLOC-2388: We're assuming an interface. We should test this
# assumption.
if e.returncode == 2 and not e.output:
# There is no filesystem on this device.
return
raise
raise FilesystemExists(device)
def _valid_size(size):
"""
Pyrsistent invariant for filesystem size, which must be a multiple of 1024
bytes.
"""
if size % 1024 == 0:
return (True, "")
return (
False, "Filesystem size must be multiple of 1024, not %d" % (size,)
)
@implementer(IStateChange)
class MountBlockDevice(PRecord):
"""
Mount the filesystem mounted from the block device backed by a particular
volume.
:ivar UUID dataset_id: The unique identifier of the dataset associated with
the filesystem to mount.
:ivar FilePath mountpoint: The filesystem location at which to mount the
volume's filesystem. If this does not exist, it is created.
"""
dataset_id = field(type=UUID, mandatory=True)
mountpoint = field(type=FilePath, mandatory=True)
@property
def eliot_action(self):
return MOUNT_BLOCK_DEVICE(_logger, dataset_id=self.dataset_id)
def run(self, deployer):
"""
Run the system ``mount`` tool to mount this change's volume's block
device. The volume must be attached to this node.
"""
api = deployer.block_device_api
volume = _blockdevice_volume_from_datasetid(
api.list_volumes(), self.dataset_id
)
device = api.get_device_path(volume.blockdevice_id)
MOUNT_BLOCK_DEVICE_DETAILS(
volume=volume, block_device_path=device,
).write(_logger)
# Create the directory where a device will be mounted.
# The directory's parent's permissions will be set to only allow access
# by owner, to limit access by other users on the node.
try:
self.mountpoint.makedirs()
except OSError as e:
if e.errno != EEXIST:
return fail()
self.mountpoint.parent().chmod(S_IRWXU)
# This should be asynchronous. FLOC-1797
check_output([b"mount", device.path, self.mountpoint.path])
# Remove lost+found to ensure filesystems always start out empty.
# Mounted filesystem is also made world
# writeable/readable/executable since we can't predict what user a
# container will run as. We make sure we change mounted
# filesystem's root directory permissions, so we only do this
# after the filesystem is mounted. If other files exist we don't
# bother with either change, since at that point user has modified
# the volume and we don't want to undo their changes by mistake
# (e.g. postgres doesn't like world-writeable directories once
# it's initialized).
# A better way is described in
# https://clusterhq.atlassian.net/browse/FLOC-2074
lostfound = self.mountpoint.child(b"lost+found")
if self.mountpoint.children() == [lostfound]:
lostfound.remove()
self.mountpoint.chmod(S_IRWXU | S_IRWXG | S_IRWXO)
self.mountpoint.restat()
return succeed(None)
@implementer(IStateChange)
class UnmountBlockDevice(PRecord):
"""
Unmount the filesystem mounted from the block device backed by a particular
volume.
:ivar UUID dataset_id: The unique identifier of the dataset associated with
the filesystem to unmount.
"""
dataset_id = field(type=UUID, mandatory=True)
@property
def eliot_action(self):
return UNMOUNT_BLOCK_DEVICE(_logger, dataset_id=self.dataset_id)
def run(self, deployer):
"""
Run the system ``unmount`` tool to unmount this change's volume's block
device. The volume must be attached to this node and the corresponding
block device mounted.
"""
api = deployer.async_block_device_api
listing = api.list_volumes()
listing.addCallback(
_blockdevice_volume_from_datasetid, self.dataset_id
)
def found(volume):
if volume is None:
# It was not actually found.
raise DatasetWithoutVolume(dataset_id=self.dataset_id)
d = api.get_device_path(volume.blockdevice_id)
d.addCallback(lambda device: (volume, device))
return d
listing.addCallback(found)
def got_device((volume, device)):
UNMOUNT_BLOCK_DEVICE_DETAILS(
volume=volume, block_device_path=device
).write(_logger)
# This should be asynchronous. FLOC-1797
check_output([b"umount", device.path])
listing.addCallback(got_device)
return listing
@implementer(IStateChange)
class AttachVolume(PRecord):
"""
Attach an unattached volume to this node (the node of the deployer it is
run with).
:ivar UUID dataset_id: The unique identifier of the dataset associated with
the volume to attach.
"""
dataset_id = field(type=UUID, mandatory=True)
@property
def eliot_action(self):
return ATTACH_VOLUME(_logger, dataset_id=self.dataset_id)
def run(self, deployer):
"""
Use the deployer's ``IBlockDeviceAPI`` to attach the volume.
"""
api = deployer.async_block_device_api
listing = api.list_volumes()
listing.addCallback(
_blockdevice_volume_from_datasetid, self.dataset_id
)
getting_id = api.compute_instance_id()
d = gatherResults([listing, getting_id])
def found((volume, compute_instance_id)):
if volume is None:
# It was not actually found.
raise DatasetWithoutVolume(dataset_id=self.dataset_id)
ATTACH_VOLUME_DETAILS(volume=volume).write(_logger)
return api.attach_volume(
volume.blockdevice_id,
attach_to=compute_instance_id,
)
attaching = d.addCallback(found)
return attaching
@implementer(IStateChange)
class DetachVolume(PRecord):
"""
Detach a volume from the node it is currently attached to.
:ivar UUID dataset_id: The unique identifier of the dataset associated with
the volume to detach.
"""
dataset_id = field(type=UUID, mandatory=True)
@property
def eliot_action(self):
return DETACH_VOLUME(_logger, dataset_id=self.dataset_id)
def run(self, deployer):
"""
Use the deployer's ``IBlockDeviceAPI`` to detach the volume.
"""
api = deployer.async_block_device_api
listing = api.list_volumes()
listing.addCallback(
_blockdevice_volume_from_datasetid, self.dataset_id
)
def found(volume):
if volume is None:
# It was not actually found.
raise DatasetWithoutVolume(dataset_id=self.dataset_id)
DETACH_VOLUME_DETAILS(volume=volume).write(_logger)
return api.detach_volume(volume.blockdevice_id)
detaching = listing.addCallback(found)
return detaching
@implementer(IStateChange)
class DestroyVolume(PRecord):
"""
Destroy the storage (and therefore contents) of a volume.
:ivar BlockDeviceVolume volume: The volume to destroy.
"""
volume = _volume_field()
@property
def eliot_action(self):
return DESTROY_VOLUME(_logger, volume=self.volume)
def run(self, deployer):
"""
Use the deployer's ``IBlockDeviceAPI`` to destroy the volume.
"""
# FLOC-1818 Make this asynchronous
deployer.block_device_api.destroy_volume(self.volume.blockdevice_id)
return succeed(None)
def allocated_size(allocation_unit, requested_size):
"""
Round ``requested_size`` up to the nearest ``allocation_unit``.
:param int allocation_unit: The interval in ``bytes`` to which
``requested_size`` will be rounded up.
:param int requested_size: The size in ``bytes`` that is required.
:return: The ``allocated_size`` in ``bytes``.
"""
allocation_unit = int(allocation_unit)
requested_size = int(requested_size)
previous_interval_size = (
(requested_size // allocation_unit)
* allocation_unit
)
if previous_interval_size < requested_size:
return previous_interval_size + allocation_unit
else:
return requested_size
def check_allocatable_size(allocation_unit, requested_size):
"""
:param int allocation_unit: The interval in ``bytes`` to which
``requested_size`` will be rounded up.
:param int requested_size: The size in ``bytes`` that is required.
:raises: ``ValueError`` unless ``requested_size`` is exactly
divisible by ``allocation_unit``.
"""
actual_size = allocated_size(allocation_unit, requested_size)
if requested_size != actual_size:
raise ValueError(
'Requested size {!r} is not divisible by {!r}'.format(
requested_size, allocation_unit
)
)
# Get rid of this in favor of calculating each individual operation in
# BlockDeviceDeployer.calculate_changes. FLOC-1771
@implementer(IStateChange)
class CreateBlockDeviceDataset(PRecord):
"""
An operation to create a new dataset on a newly created volume with a newly
initialized filesystem.
:ivar Dataset dataset: The dataset for which to create a block device.
:ivar FilePath mountpoint: The path at which to mount the created device.
"""
dataset = field(mandatory=True, type=Dataset)
mountpoint = field(mandatory=True, type=FilePath)
@property
def eliot_action(self):
return CREATE_BLOCK_DEVICE_DATASET(
_logger,
dataset=self.dataset, mountpoint=self.mountpoint
)
def run(self, deployer):
"""
Create a block device, attach it to the local host, create an ``ext4``
filesystem on the device and mount it.
Operations are performed synchronously.
See ``IStateChange.run`` for general argument and return type
documentation.
:returns: An already fired ``Deferred`` with result ``None`` or a
failed ``Deferred`` with a ``DatasetExists`` exception if a
blockdevice with the required dataset_id already exists.
"""
api = deployer.block_device_api
try:
check_for_existing_dataset(api, UUID(hex=self.dataset.dataset_id))
except:
return fail()
volume = api.create_volume(
dataset_id=UUID(self.dataset.dataset_id),
size=allocated_size(
allocation_unit=api.allocation_unit(),
requested_size=self.dataset.maximum_size,
),
)
# This duplicates AttachVolume now.
volume = api.attach_volume(
volume.blockdevice_id,
attach_to=api.compute_instance_id(),
)
device = api.get_device_path(volume.blockdevice_id)
create = CreateFilesystem(volume=volume, filesystem=u"ext4")
d = run_state_change(create, deployer)
mount = MountBlockDevice(dataset_id=UUID(hex=self.dataset.dataset_id),
mountpoint=self.mountpoint)
d.addCallback(lambda _: run_state_change(mount, deployer))
def passthrough(result):
BLOCK_DEVICE_DATASET_CREATED(
block_device_path=device,
block_device_id=volume.blockdevice_id,
dataset_id=volume.dataset_id,
block_device_size=volume.size,
block_device_compute_instance_id=volume.attached_to,
).write(_logger)
return result
d.addCallback(passthrough)
return d
class IBlockDeviceAsyncAPI(Interface):
"""
Common operations provided by all block device backends, exposed via
asynchronous methods.
"""
def allocation_unit():
"""
See ``IBlockDeviceAPI.allocation_unit``.
:returns: A ``Deferred`` that fires with ``int`` size of the
allocation_unit.
"""
def compute_instance_id():
"""
See ``IBlockDeviceAPI.compute_instance_id``.
:returns: A ``Deferred`` that fires with ``unicode`` of a
provider-specific node identifier which identifies the node where
the method is run.
"""
def create_volume(dataset_id, size):
"""
See ``IBlockDeviceAPI.create_volume``.
:returns: A ``Deferred`` that fires with a ``BlockDeviceVolume`` when
the volume has been created.
"""
def destroy_volume(blockdevice_id):
"""
See ``IBlockDeviceAPI.destroy_volume``.
:return: A ``Deferred`` that fires when the volume has been destroyed.
"""
def attach_volume(blockdevice_id, attach_to):
"""
See ``IBlockDeviceAPI.attach_volume``.
:returns: A ``Deferred`` that fires with a ``BlockDeviceVolume`` with a
``attached_to`` attribute set to ``attach_to``.
"""
def detach_volume(blockdevice_id):
"""
See ``BlockDeviceAPI.detach_volume``.
:returns: A ``Deferred`` that fires when the volume has been detached.
"""
def list_volumes():
"""
See ``BlockDeviceAPI.list_volume``.
:returns: A ``Deferred`` that fires with a ``list`` of
``BlockDeviceVolume``\ s.
"""
def get_device_path(blockdevice_id):
"""
See ``BlockDeviceAPI.get_device_path``.
:returns: A ``Deferred`` that fires with a ``FilePath`` for the device.
"""
class IBlockDeviceAPI(Interface):
"""
Common operations provided by all block device backends, exposed via
synchronous methods.
Note: This is an early sketch of the interface and it'll be refined as we
real blockdevice providers are implemented.
"""
def allocation_unit():
"""
The size, in bytes up to which ``IDeployer`` will round volume
sizes before calling ``IBlockDeviceAPI.create_volume``.
:rtype: ``int``
"""
def compute_instance_id():
"""
Get an identifier for this node.
This will be compared against ``BlockDeviceVolume.attached_to``
to determine which volumes are locally attached and it will be used
with ``attach_volume`` to locally attach volumes.
:returns: A ``unicode`` object giving a provider-specific node
identifier which identifies the node where the method is run.
"""
def create_volume(dataset_id, size):
"""
Create a new volume.
When called by ``IDeployer``, the supplied size will be
rounded up to the nearest
``IBlockDeviceAPI.allocation_unit()``
:param UUID dataset_id: The Flocker dataset ID of the dataset on this
volume.
:param int size: The size of the new volume in bytes.
:returns: A ``BlockDeviceVolume``.
"""
def destroy_volume(blockdevice_id):
"""
Destroy an existing volume.
:param unicode blockdevice_id: The unique identifier for the volume to
destroy.
:raises UnknownVolume: If the supplied ``blockdevice_id`` does not
exist.
:return: ``None``
"""
def attach_volume(blockdevice_id, attach_to):
"""
Attach ``blockdevice_id`` to the node indicated by ``attach_to``.
:param unicode blockdevice_id: The unique identifier for the block
device being attached.
:param unicode attach_to: An identifier like the one returned by the
``compute_instance_id`` method indicating the node to which to
attach the volume.
:raises UnknownVolume: If the supplied ``blockdevice_id`` does not
exist.
:raises AlreadyAttachedVolume: If the supplied ``blockdevice_id`` is
already attached.
:returns: A ``BlockDeviceVolume`` with a ``attached_to`` attribute set
to ``attach_to``.
"""
def detach_volume(blockdevice_id):
"""
Detach ``blockdevice_id`` from whatever host it is attached to.
:param unicode blockdevice_id: The unique identifier for the block
device being detached.
:raises UnknownVolume: If the supplied ``blockdevice_id`` does not
exist.
:raises UnattachedVolume: If the supplied ``blockdevice_id`` is
not attached to anything.
:returns: ``None``
"""
def list_volumes():
"""
List all the block devices available via the back end API.
:returns: A ``list`` of ``BlockDeviceVolume``s.
"""
def get_device_path(blockdevice_id):
"""
Return the device path that has been allocated to the block device on
the host to which it is currently attached.
:param unicode blockdevice_id: The unique identifier for the block
device.
:raises UnknownVolume: If the supplied ``blockdevice_id`` does not
exist.
:raises UnattachedVolume: If the supplied ``blockdevice_id`` is
not attached to a host.
:returns: A ``FilePath`` for the device.
"""
@implementer(IBlockDeviceAsyncAPI)
@auto_threaded(IBlockDeviceAPI, "_reactor", "_sync", "_threadpool")
class _SyncToThreadedAsyncAPIAdapter(PRecord):
"""
Adapt any ``IBlockDeviceAPI`` to ``IBlockDeviceAsyncAPI`` by running its
methods in threads of a thread pool.
"""
_reactor = field()
_sync = field()
_threadpool = field()
def _blockdevicevolume_from_dataset_id(dataset_id, size,
attached_to=None):
"""
Create a new ``BlockDeviceVolume`` with a ``blockdevice_id`` derived
from the given ``dataset_id``.
This is for convenience of implementation of the loopback backend (to
avoid needing a separate data store for mapping dataset ids to block
device ids and back again).
Parameters accepted have the same meaning as the attributes of
``BlockDeviceVolume``.
"""
return BlockDeviceVolume(
size=size, attached_to=attached_to,
dataset_id=dataset_id, blockdevice_id=u"block-{0}".format(dataset_id),
)
def _blockdevicevolume_from_blockdevice_id(blockdevice_id, size,
attached_to=None):