Skip to content

Commit

Permalink
Merge pull request #139 from frenetic-lang/crashes
Browse files Browse the repository at this point in the history
stability-related changes
  • Loading branch information
seliopou committed Jul 14, 2014
2 parents 104e0e1 + 6a00de6 commit baea625
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 75 deletions.
20 changes: 17 additions & 3 deletions async/Async_OpenFlow.mli
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ module Platform : sig
-> ?verbose:bool
-> ?log_disconnects:bool
-> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ]
-> ?monitor_connections:bool
-> port:int
-> unit
-> t Deferred.t
Expand All @@ -65,6 +66,8 @@ module Platform : sig
-> m
-> [ `Drop of exn | `Sent of Time.t ] Deferred.t

val send_ignore_errors : t -> Client_id.t -> m -> unit

val send_to_all : t -> m -> unit

val client_addr_port
Expand Down Expand Up @@ -144,7 +147,11 @@ module Chunk : sig
| `Message of Client_id.t * m
]

val echo : (t, e, e) Stage.t
val set_monitor_interval : t -> Time.Span.t -> unit
val set_idle_wait : t -> Time.Span.t -> unit
val set_kill_wait : t -> Time.Span.t -> unit

val echo : (t, h, h) Stage.t
val handshake : int -> (t, e, h) Stage.t
end

Expand All @@ -165,8 +172,15 @@ module OpenFlow0x01 : sig
| `Message of Client_id.t * m
]

val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId
val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t
val switch_id_of_client_exn : t -> Client_id.t -> SDN_Types.switchId
val client_id_of_switch_exn : t -> SDN_Types.switchId -> Client_id.t

val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId option
val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t option

val set_monitor_interval : t -> Time.Span.t -> unit
val set_idle_wait : t -> Time.Span.t -> unit
val set_kill_wait : t -> Time.Span.t -> unit

val features : (t, e, f) Stage.t
end
Expand Down
45 changes: 32 additions & 13 deletions async/Async_OpenFlow0x01.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,41 @@ module Controller = struct
let close t = ChunkController.close t.sub
let has_client_id t = ChunkController.has_client_id t.sub
let send t s_id msg = ChunkController.send t.sub s_id (Message.marshal' msg)
let send_ignore_errors t s_id msg = ChunkController.send_ignore_errors t.sub s_id (Message.marshal' msg)
let send_to_all t msg = ChunkController.send_to_all t.sub (Message.marshal' msg)
let client_addr_port t = ChunkController.client_addr_port t.sub
let listening_port t = ChunkController.listening_port t.sub

(* XXX(seliopou): Raises `Not_found` if the client is no longer connected. *)
let switch_id_of_client t c_id = ClientMap.find_exn t.switches c_id
let client_id_of_switch t sw_id = SwitchMap.find_exn t.clients sw_id
let switch_id_of_client_exn t c_id = ClientMap.find_exn t.switches c_id
let client_id_of_switch_exn t sw_id = SwitchMap.find_exn t.clients sw_id

let switch_id_of_client t c_id = ClientMap.find t.switches c_id
let client_id_of_switch t sw_id = SwitchMap.find t.clients sw_id

let set_monitor_interval (t:t) (s:Time.Span.t) : unit =
ChunkController.set_monitor_interval t.sub s

let set_idle_wait (t:t) (s:Time.Span.t) : unit =
ChunkController.set_idle_wait t.sub s

let set_kill_wait (t:t) (s:Time.Span.t) : unit =
ChunkController.set_kill_wait t.sub s

let create ?max_pending_connections
?verbose
?log_disconnects
?buffer_age_limit ~port () =
?buffer_age_limit
?monitor_connections ~port () =
ChunkController.create ?max_pending_connections ?verbose ?log_disconnects
?buffer_age_limit ~port ()
?buffer_age_limit ?monitor_connections ~port ()
>>| function t ->
{ sub = t
; shakes = ClientSet.empty
; switches = ClientMap.empty
; clients = SwitchMap.empty
}

let _send t c_id m =
send t c_id (0l, m) >>| function
| `Drop exn -> raise exn
| `Sent _ -> ()

let openflow0x01 t evt =
match evt with
| `Connect (c_id, version) ->
Expand All @@ -104,15 +113,24 @@ module Controller = struct
let features t evt =
match evt with
| `Connect (c_id) ->
assert (not (ClientSet.mem t.shakes c_id));
t.shakes <- ClientSet.add t.shakes c_id;
send t c_id (0l, M.SwitchFeaturesRequest) >>| ChunkController.ensure
send t c_id (0l, M.SwitchFeaturesRequest)
(* XXX(seliopou): This swallows any errors that might have occurred
* while attemping the handshake. Any such error should not be raised,
* since as far as the user is concerned the connection never existed.
* At the very least, the exception should be logged, which it will be
* as long as the log_disconnects option is not disabled when creating
* the controller.
* *)
>>| (function _ -> [])
| `Message (c_id, (xid, msg)) when ClientSet.mem t.shakes c_id ->
begin match msg with
| M.SwitchFeaturesReply fs ->
let switch_id = fs.OpenFlow0x01.SwitchFeatures.switch_id in
t.switches <- ClientMap.add t.switches c_id switch_id;
t.clients <- SwitchMap.add t.clients switch_id c_id;
t.shakes <- ClientSet.remove t.shakes c_id;
t.clients <- SwitchMap.add t.clients switch_id c_id;
t.shakes <- ClientSet.remove t.shakes c_id;
return [`Connect(c_id, fs)]
| _ ->
Log.of_lazy ~tags ~level:`Debug (lazy
Expand All @@ -126,11 +144,12 @@ module Controller = struct
let m_sw_id = ClientMap.find t.switches c_id in
match m_sw_id with
| None -> (* features request did not complete *)
assert (ClientSet.mem t.shakes c_id);
t.shakes <- ClientSet.remove t.shakes c_id;
return []
| Some(sw_id) -> (* features request did complete *)
t.clients <- SwitchMap.remove t.clients sw_id;
t.switches <- ClientMap.remove t.switches c_id;
t.clients <- SwitchMap.remove t.clients sw_id;
return [`Disconnect(c_id, sw_id, exn)]

let listen t =
Expand Down
6 changes: 4 additions & 2 deletions async/Async_OpenFlow0x04.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ module Controller = struct
let create ?max_pending_connections
?verbose
?log_disconnects
?buffer_age_limit ~port () =
?buffer_age_limit
?monitor_connections ~port () =
ChunkController.create ?max_pending_connections ?verbose ?log_disconnects
?buffer_age_limit ~port ()
?buffer_age_limit ?monitor_connections ~port ()
>>| function t -> { sub = t }

let listen t =
Expand All @@ -77,6 +78,7 @@ module Controller = struct
let close t = ChunkController.close t.sub
let has_client_id t = ChunkController.has_client_id t.sub
let send t s_id msg = ChunkController.send t.sub s_id (Message.marshal' msg)
let send_ignore_errors t s_id msg = ChunkController.send_ignore_errors t.sub s_id (Message.marshal' msg)
let send_to_all t msg = ChunkController.send_to_all t.sub (Message.marshal' msg)
let client_addr_port t = ChunkController.client_addr_port t.sub
let listening_port t = ChunkController.listening_port t.sub
Expand Down
Loading

0 comments on commit baea625

Please sign in to comment.