Skip to content

Commit

Permalink
CA-83940: generalise "with_migrating_away" to "with_events_suppressed"
Browse files Browse the repository at this point in the history
This function temporarily stops events from xenopsd from being processed
for a particular VM. It is useful to prevent races between a foreground
thread and the background event thread e.g.

In Xapi_xenops.start:
  1. VBDs are set to currently_attached = true
  2. metadata is pushed to xenopsd
     <- at this point an event on the xenopsd VM would cause
        the VBD currently_attached field to be set to false so
        we block events
  3. the VM is started
     <- at this point it is safe to re-enable syncing.

Signed-off-by: David Scott <dave.scott@eu.citrix.com>
Whenever we want to stop events from xenopsd for a particula
  • Loading branch information
David Scott committed Jul 24, 2012
1 parent f0c78bc commit b4f7971
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 118 deletions.
6 changes: 3 additions & 3 deletions ocaml/xapi/xapi_vm_migrate.ml
Expand Up @@ -76,7 +76,7 @@ let pool_migrate ~__context ~vm ~host ~options =
let vm' = Db.VM.get_uuid ~__context ~self:vm in
begin try
Xapi_network.with_networks_attached_for_vm ~__context ~vm ~host (fun () ->
Xapi_xenops.with_migrating_away vm' (fun () ->
Xapi_xenops.with_events_suppressed ~__context ~self:vm (fun () ->
(* XXX: PR-1255: the live flag *)
info "xenops: VM.migrate %s to %s" vm' xenops_url;
XenopsAPI.VM.migrate dbg vm' [] [] xenops_url |> wait_for_task dbg |> success_task dbg |> ignore;
Expand Down Expand Up @@ -424,7 +424,7 @@ let migrate_send' ~__context ~vm ~dest ~live ~vdi_map ~vif_map ~options =
(* It's acceptable for the VM not to exist at this point; shutdown commutes with storage migrate *)
begin
try
Xapi_xenops.with_migrating_away vm_uuid
Xapi_xenops.with_events_suppressed ~__context ~self:vm
(fun () ->
XenopsAPI.VM.migrate dbg vm_uuid xenops_vdi_map xenops_vif_map xenops |> wait_for_task dbg |> success_task dbg |> ignore;
Xapi_xenops.Xenopsd_metadata.delete ~__context vm_uuid;
Expand Down Expand Up @@ -463,7 +463,7 @@ let migrate_send' ~__context ~vm ~dest ~live ~vdi_map ~vif_map ~options =
List.iter (fun vdi ->
if not (Xapi_fist.storage_motion_keep_vdi ())
then begin
(* In a cross-pool migrate, due to the Xapi_xenops.with_migrating_away call above,
(* In a cross-pool migrate, due to the Xapi_xenops.with_events_suppressed call above,
the VBDs are left 'currently-attached=true', because they haven't been resynced
by the destination host.
Expand Down
235 changes: 120 additions & 115 deletions ocaml/xapi/xapi_xenops.ml
Expand Up @@ -731,22 +731,18 @@ module Event = struct
) t
end

(* Ignore events on VMs which are migrating away *)
let migrating_away = Hashtbl.create 10
let migrating_away_m = Mutex.create ()

let with_migrating_away uuid f =
Mutex.execute migrating_away_m (fun () -> Hashtbl.replace migrating_away uuid ());
finally f
(fun () -> Mutex.execute migrating_away_m (fun () -> Hashtbl.remove migrating_away uuid))

let is_migrating_away uuid =
Mutex.execute migrating_away_m (fun () -> Hashtbl.mem migrating_away uuid)
let events_suppressed_on = Hashtbl.create 10
let events_suppressed_on_m = Mutex.create ()
let events_suppressed vm =
Mutex.execute events_suppressed_on_m
(fun () ->
Hashtbl.mem events_suppressed_on vm
)

let update_vm ~__context id =
try
let open Vm in
if is_migrating_away id
if events_suppressed id
then debug "xenopsd event: ignoring event for VM (VM %s migrating away)" id
else
let self = Db.VM.get_by_uuid ~__context ~uuid:id in
Expand Down Expand Up @@ -983,7 +979,7 @@ let update_vm ~__context id =
let update_vbd ~__context (id: (string * string)) =
try
let open Vbd in
if is_migrating_away (fst id)
if events_suppressed (fst id)
then debug "xenopsd event: ignoring event for VM (VM %s migrating away)" (fst id)
else
let vm = Db.VM.get_by_uuid ~__context ~uuid:(fst id) in
Expand Down Expand Up @@ -1043,7 +1039,7 @@ let update_vbd ~__context (id: (string * string)) =

let update_vif ~__context id =
try
if is_migrating_away (fst id)
if events_suppressed (fst id)
then debug "xenopsd event: ignoring event for VIF (VM %s migrating away)" (fst id)
else
let vm = Db.VM.get_by_uuid ~__context ~uuid:(fst id) in
Expand Down Expand Up @@ -1093,7 +1089,7 @@ let update_vif ~__context id =

let update_pci ~__context id =
try
if is_migrating_away (fst id)
if events_suppressed (fst id)
then debug "xenopsd event: ignoring event for PCI (VM %s migrating away)" (fst id)
else
let vm = Db.VM.get_by_uuid ~__context ~uuid:(fst id) in
Expand Down Expand Up @@ -1171,17 +1167,33 @@ let rec events_watch ~__context from =
List.iter
(function
| Vm id ->
debug "xenops event on VM %s" id;
update_vm ~__context id;
if events_suppressed id
then debug "ignoring xenops event on VM %s" id
else begin
debug "xenops event on VM %s" id;
update_vm ~__context id;
end
| Vbd id ->
debug "xenops event on VBD %s.%s" (fst id) (snd id);
update_vbd ~__context id
if events_suppressed (fst id)
then debug "ignoring xenops event on VBD %s.%s" (fst id) (snd id)
else begin
debug "xenops event on VBD %s.%s" (fst id) (snd id);
update_vbd ~__context id
end
| Vif id ->
debug "xenops event on VIF %s.%s" (fst id) (snd id);
update_vif ~__context id
if events_suppressed (fst id)
then debug "ignoring xenops event on VIF %s.%s" (fst id) (snd id)
else begin
debug "xenops event on VIF %s.%s" (fst id) (snd id);
update_vif ~__context id
end
| Pci id ->
debug "xenops event on PCI %s.%s" (fst id) (snd id);
update_pci ~__context id
if events_suppressed (fst id)
then debug "ignoring xenops event on PCI %s.%s" (fst id) (snd id)
else begin
debug "xenops event on PCI %s.%s" (fst id) (snd id);
update_pci ~__context id
end
| Task id ->
debug "xenops event on Task %s" id;
update_task ~__context id
Expand Down Expand Up @@ -1293,18 +1305,6 @@ let events_from_xenopsd () =
done
)

(* Ignore events on VMs which are shutting down *)
let shutting_down = Hashtbl.create 10
let shutting_down_m = Mutex.create ()

let with_shutting_down uuid f =
Mutex.execute shutting_down_m (fun () -> Hashtbl.replace shutting_down uuid ());
finally f
(fun () -> Mutex.execute shutting_down_m (fun () -> Hashtbl.remove shutting_down uuid))

(* XXX; PR-1255: this needs to be combined with the migration lock *)
let is_shutting_down uuid = Hashtbl.mem shutting_down uuid

(* If a xapi event thread is blocked, wake it up and cause it to re-register. This should be
called after updating Host.resident_VMs *)
let trigger_xenapi_reregister =
Expand Down Expand Up @@ -1359,16 +1359,11 @@ let events_from_xapi () =
(function
| { ty = "vm"; reference = vm' } ->
let vm = Ref.of_string vm' in
Mutex.execute shutting_down_m
(fun () ->
let id = id_of_vm ~__context ~self:vm in
let resident_here = Db.VM.get_resident_on ~__context ~self:vm = localhost in
let shutting_down = is_shutting_down id in
debug "Event on VM %s; resident_here = %b; shutting_down = %b" id resident_here shutting_down;
if resident_here && not shutting_down then begin
Xenopsd_metadata.update ~__context ~self:vm |> ignore
end
)
let id = id_of_vm ~__context ~self:vm in
let resident_here = Db.VM.get_resident_on ~__context ~self:vm = localhost in
debug "Event on VM %s; resident_here = %b" id resident_here;
if resident_here
then Xenopsd_metadata.update ~__context ~self:vm |> ignore
| _ -> ()
) from.events;
token := string_of_token from.token;
Expand Down Expand Up @@ -1471,6 +1466,23 @@ let refresh_vm ~__context ~self =
Client.UPDATES.refresh_vm dbg id;
Event.wait dbg ()

let with_events_suppressed ~__context ~self f =
let vm = id_of_vm ~__context ~self in
debug "suppressing xenops events on VM: %s" vm;
Mutex.execute events_suppressed_on_m
(fun () ->
Hashtbl.replace events_suppressed_on vm ()
);
finally f
(fun () ->
Mutex.execute events_suppressed_on_m
(fun () ->
Hashtbl.remove events_suppressed_on vm
);
debug "re-enabled xenops events on VM: %s" vm;
)


(* After this function is called, locally-generated events will be reflected
in the xapi pool metadata. When this function returns we believe that the
VM state is in 'sync' with xenopsd and the pool master where by 'sync'
Expand Down Expand Up @@ -1606,16 +1618,19 @@ let start ~__context ~self paused =
List.iter (fun self -> Db.VIF.set_currently_attached ~__context ~self ~value:true) (Db.VM.get_VIFs ~__context ~self);

debug "Sending VM %s configuration to xenopsd" (Ref.string_of self);
let id = Xenopsd_metadata.push ~__context ~upgrade:false ~self in
try
Xapi_network.with_networks_attached_for_vm ~__context ~vm:self
with_events_suppressed ~__context ~self
(fun () ->
info "xenops: VM.start %s" id;
Client.VM.start dbg id |> sync_with_task __context;
if not paused then begin
info "xenops: VM.unpause %s" id;
Client.VM.unpause dbg id |> sync __context;
end;
let id = Xenopsd_metadata.push ~__context ~upgrade:false ~self in
Xapi_network.with_networks_attached_for_vm ~__context ~vm:self
(fun () ->
info "xenops: VM.start %s" id;
Client.VM.start dbg id |> sync_with_task __context;
if not paused then begin
info "xenops: VM.unpause %s" id;
Client.VM.unpause dbg id |> sync __context;
end;
)
);
set_resident_on ~__context ~self;
with e ->
Expand Down Expand Up @@ -1663,20 +1678,18 @@ let shutdown ~__context ~self timeout =
(fun () ->
assert_resident_on ~__context ~self;
let id = id_of_vm ~__context ~self in
with_shutting_down id
(fun () ->
info "xenops: VM.shutdown %s" id;
let dbg = Context.string_of_task __context in
Client.VM.shutdown dbg id timeout |> sync_with_task __context;
Event.wait dbg ();
assert (Db.VM.get_power_state ~__context ~self = `Halted);
(* force_state_reset called from the xenopsd event loop above *)
assert (Db.VM.get_resident_on ~__context ~self = Ref.null);
List.iter
(fun vbd ->
assert (not(Db.VBD.get_currently_attached ~__context ~self:vbd))
) (Db.VM.get_VBDs ~__context ~self)
)

info "xenops: VM.shutdown %s" id;
let dbg = Context.string_of_task __context in
Client.VM.shutdown dbg id timeout |> sync_with_task __context;
Event.wait dbg ();
assert (Db.VM.get_power_state ~__context ~self = `Halted);
(* force_state_reset called from the xenopsd event loop above *)
assert (Db.VM.get_resident_on ~__context ~self = Ref.null);
List.iter
(fun vbd ->
assert (not(Db.VBD.get_currently_attached ~__context ~self:vbd))
) (Db.VM.get_VBDs ~__context ~self)
)

let suspend ~__context ~self =
Expand Down Expand Up @@ -1728,24 +1741,28 @@ let resume ~__context ~self ~start_paused ~force =
(fun () ->
let vdi = Db.VM.get_suspend_VDI ~__context ~self in
let disk = disk_of_vdi ~__context ~self:vdi |> Opt.unbox in
debug "Sending VM %s configuration to xenopsd" (Ref.string_of self);
let id = Xenopsd_metadata.push ~__context ~upgrade:false ~self in
(* NB we don't set resident_on because we don't want to
modify the VM.power_state, {VBD,VIF}.currently_attached in the
failures cases. This means we must remove the metadata from
xenopsd on failure. *)
begin try
Xapi_network.with_networks_attached_for_vm ~__context ~vm:self
(fun () ->
info "xenops: VM.resume %s from %s" id (disk |> rpc_of_disk |> Jsonrpc.to_string);
Client.VM.resume dbg id disk |> sync_with_task __context;
if not start_paused then begin
info "xenops: VM.unpause %s" id;
Client.VM.unpause dbg id |> sync_with_task __context;
end;
)
with_events_suppressed ~__context ~self
(fun () ->
debug "Sending VM %s configuration to xenopsd" (Ref.string_of self);
let id = Xenopsd_metadata.push ~__context ~upgrade:false ~self in
Xapi_network.with_networks_attached_for_vm ~__context ~vm:self
(fun () ->
info "xenops: VM.resume %s from %s" id (disk |> rpc_of_disk |> Jsonrpc.to_string);
Client.VM.resume dbg id disk |> sync_with_task __context;
if not start_paused then begin
info "xenops: VM.unpause %s" id;
Client.VM.unpause dbg id |> sync_with_task __context;
end;
)
)
with e ->
error "Caught exception resuming VM: %s" (string_of_exn e);
let id = id_of_vm ~__context ~self in
Xenopsd_metadata.delete ~__context id;
raise e
end;
Expand Down Expand Up @@ -1794,21 +1811,11 @@ let vbd_eject ~__context ~self =
(fun () ->
let vm = Db.VBD.get_VM ~__context ~self in
assert_resident_on ~__context ~self:vm;
(* XXX: PR-1255: move the offline stuff to the master/message_forwarding? *)
if Db.VM.get_power_state ~__context ~self:vm = `Halted then begin
Db.VBD.set_empty ~__context ~self ~value:true;
Db.VBD.set_VDI ~__context ~self ~value:Ref.null
end else begin
let vbd = md_of_vbd ~__context ~self in
info "xenops: VBD.eject %s.%s" (fst vbd.Vbd.id) (snd vbd.Vbd.id);
let dbg = Context.string_of_task __context in
Client.VBD.eject dbg vbd.Vbd.id |> sync_with_task __context;
Event.wait dbg ();
(* XXX: PR-1255: this is because a PV eject is an unplug, so the
event is different *)
Db.VBD.set_empty ~__context ~self ~value:true;
Db.VBD.set_VDI ~__context ~self ~value:Ref.null
end;
let vbd = md_of_vbd ~__context ~self in
info "xenops: VBD.eject %s.%s" (fst vbd.Vbd.id) (snd vbd.Vbd.id);
let dbg = Context.string_of_task __context in
Client.VBD.eject dbg vbd.Vbd.id |> sync_with_task __context;
Event.wait dbg ();
assert (Db.VBD.get_empty ~__context ~self);
assert (Db.VBD.get_VDI ~__context ~self = Ref.null)
)
Expand All @@ -1818,18 +1825,12 @@ let vbd_insert ~__context ~self ~vdi =
(fun () ->
let vm = Db.VBD.get_VM ~__context ~self in
assert_resident_on ~__context ~self:vm;
(* XXX: PR-1255: move the offline stuff to the master/message_forwarding? *)
if Db.VM.get_power_state ~__context ~self:vm = `Halted then begin
Db.VBD.set_VDI ~__context ~self ~value:vdi;
Db.VBD.set_empty ~__context ~self ~value:false
end else begin
let vbd = md_of_vbd ~__context ~self in
let disk = disk_of_vdi ~__context ~self:vdi |> Opt.unbox in
info "xenops: VBD.insert %s.%s %s" (fst vbd.Vbd.id) (snd vbd.Vbd.id) (disk |> rpc_of_disk |> Jsonrpc.to_string);
let dbg = Context.string_of_task __context in
Client.VBD.insert dbg vbd.Vbd.id disk |> sync_with_task __context;
Event.wait dbg ()
end;
let vbd = md_of_vbd ~__context ~self in
let disk = disk_of_vdi ~__context ~self:vdi |> Opt.unbox in
info "xenops: VBD.insert %s.%s %s" (fst vbd.Vbd.id) (snd vbd.Vbd.id) (disk |> rpc_of_disk |> Jsonrpc.to_string);
let dbg = Context.string_of_task __context in
Client.VBD.insert dbg vbd.Vbd.id disk |> sync_with_task __context;
Event.wait dbg ();
assert (not(Db.VBD.get_empty ~__context ~self));
assert (Db.VBD.get_VDI ~__context ~self = vdi)
)
Expand All @@ -1845,12 +1846,14 @@ let vbd_plug ~__context ~self =
(* Refresh the VBD metadata: *)
(try Client.VBD.remove dbg vbd.Vbd.id with Does_not_exist _ -> ());
info "xenops: VBD.add %s.%s" (fst vbd.Vbd.id) (snd vbd.Vbd.id);
let id = Client.VBD.add dbg vbd in
info "xenops: VBD.plug %s.%s" (fst vbd.Vbd.id) (snd vbd.Vbd.id);
finally
with_events_suppressed ~__context ~self:vm
(fun () ->
let id = Client.VBD.add dbg vbd in
info "xenops: VBD.plug %s.%s" (fst vbd.Vbd.id) (snd vbd.Vbd.id);
Client.VBD.plug dbg id |> sync_with_task __context;
) (fun () -> Event.wait dbg ());
);
refresh_vm ~__context ~self:vm;
Event.wait dbg ();
assert (Db.VBD.get_currently_attached ~__context ~self)
)

Expand Down Expand Up @@ -1886,16 +1889,18 @@ let vif_plug ~__context ~self =
let dbg = Context.string_of_task __context in
(* Refresh the VIF metadata: *)
(try Client.VIF.remove dbg vif.Vif.id with Does_not_exist _ -> ());
info "xenops: VIF.add %s.%s" (fst vif.Vif.id) (snd vif.Vif.id);
let id = Client.VIF.add dbg vif in
Xapi_network.with_networks_attached_for_vm ~__context ~vm (fun () ->
info "xenops: VIF.plug %s.%s" (fst vif.Vif.id) (snd vif.Vif.id);
finally
with_events_suppressed ~__context ~self:vm
(fun () ->
info "xenops: VIF.add %s.%s" (fst vif.Vif.id) (snd vif.Vif.id);
let id = Client.VIF.add dbg vif in
Client.VIF.plug dbg id |> sync_with_task __context;
) (fun () -> Event.wait dbg ());
assert (Db.VIF.get_currently_attached ~__context ~self)
)
);
);
refresh_vm ~__context ~self:vm;
Event.wait dbg ();
assert (Db.VIF.get_currently_attached ~__context ~self)
)

let vm_set_vm_data ~__context ~self =
Expand Down

0 comments on commit b4f7971

Please sign in to comment.