Skip to content
This repository
tree: dc2b130b72
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 138 lines (114 sloc) 4.628 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
(*
Copyright © 2011 MLstate

This file is part of OPA.

OPA is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License, version 3, as published by
the Free Software Foundation.

OPA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
more details.

You should have received a copy of the GNU Affero General Public License
along with OPA. If not, see <http://www.gnu.org/licenses/>.
*)
(*
@author Louis Gesbert
**)

module D = Badop_lib
module Dialog = Badop_lib.Dialog
open Cps.Ops

include Badop_protocol.F
  (struct
     type spoken = Dialog.query
     type understood = Dialog.response
     type revision (* Abstract *)
   end)

module N = Hlnet

let open_database options k =
  let scheduler, remote, on_disconnect = match options with
    | Badop.Options_Client (scheduler,(addr,port), on_disconnect) -> scheduler, N.Tcp (addr,port), on_disconnect
    | _ -> assert false
  in
  let on_disconnect () =
    Logger.error "Disconnected from %s" (N.endpoint_to_string remote);
    on_disconnect()
  in
  N.open_channel scheduler remote ~on_disconnect database_channel_spec @> k

let close_database db k = N.close_channel db |> k

let status db k =
  match N.local_of_channel db, N.remote_of_channel db with
  | N.Tcp (local_addr,_), N.Tcp (remote_addr,remote_port) ->
      (N.sendreceive' db (Status (Dialog.query ()))
         (fun _ -> Badop.Client (local_addr, (remote_addr, remote_port), Badop.Other "disconnected") |> k)
       @> function
       | Status (Dialog.Response st) ->
           Badop.Client (local_addr, (remote_addr, remote_port), st) |> k
       | _ -> assert false)
  | _ -> assert false

let tr_next tr k =
  if tr.last then
    let tr' = { tr with version = succ tr.version } in tr.last <- false; tr' |> k
  else
    N.sendreceive tr.channel (Fork (Dialog.query tr.version))
    @> function
    | Fork (Dialog.Response tr_channel) ->
        { channel = tr_channel; version = succ tr.version; last = true } |> k
    | _ -> N.panic tr.channel

module Tr = struct

  let start db errk k =
    N.sendreceive' db (Transaction (Dialog.query ())) errk
    @> function
    | Transaction (Dialog.Response tr_channel) ->
        N.on_disconnect tr_channel (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
        { channel = tr_channel; version = 0; last = true } |> k
    | _ -> N.panic db

  let start_at_revision db rev errk k =
    N.sendreceive' db (Transaction_at (Dialog.query rev)) errk
    @> function
    | Transaction_at (Dialog.Response tr_channel) ->
        N.on_disconnect tr_channel (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
        { channel = tr_channel; version = 0; last = true } |> k
    | _ -> N.panic db

  let prepare tr k =
    tr_next tr
    @> fun tr ->
      N.sendreceive tr.channel (Prepare (Dialog.query tr.version))
    @> function
    | Prepare (D.Response success) -> (tr, success) |> k
    | _ -> N.panic tr.channel

  let commit tr k =
    N.sendreceive tr.channel (Commit (Dialog.query tr.version))
    @> function
    | Commit (D.Response success) -> success |> k
    | _ -> N.panic tr.channel

  let abort tr k =
    N.sendreceive tr.channel (Abort (Dialog.query tr.version))
    @> function
    | Abort (D.Response ()) -> () |> k
    | _ -> N.panic tr.channel

end

let read tr path query k =
  N.sendreceive tr.channel (Read (path, Dialog.query (tr.version, query)))
  @> function
  | Read (path', D.Response result) -> assert (path = path'); result |> k
  | _ -> N.panic tr.channel

let write tr path query k =
  Badop.Aux.map_write_op ~transaction:(fun _tr k -> () |> k) ~revision:(fun x k -> x |> k) query
  @> fun query1 -> tr_next tr
  @> fun tr ->
    N.send tr.channel (Write (path, tr.version, query1));
    Badop.Aux.respond_set_transaction query tr |> k

let write_list tr l_path_query k =
  let paths,queries = List.split l_path_query in
  Badop.Aux.map_write_list_op ~transaction:(fun _tr k -> () |> k) ~revision:(fun x k -> x |> k) queries
  @> fun queries1 -> tr_next tr
  @> fun tr ->
    N.send tr.channel (WriteList (tr.version, Dialog.query (List.combine paths queries1)));
    tr |> k

let node_properties _db _config k =
  #<If:TESTING> () |> k #<Else>
  Printf.eprintf " Can't set node properties on client \n%!"; () |> k #<End>

module Debug = struct
  let revision_to_string r = DebugPrint.print r
  let path_to_string = Path.to_string
end
Something went wrong with that request. Please try again.