Permalink
Browse files

strip out the remote syncing and user code entirely, to simplify the …

…server. it'll return later in a better form..
  • Loading branch information...
1 parent e5c57e7 commit 0989f7ca0918b339d0dd0fbae290e94448a7d84a Anil Madhavapeddy committed Jul 30, 2009
Showing with 0 additions and 1,329 deletions.
  1. +0 −1 db_thread.ml
  2. +0 −2 db_thread_access.ml
  3. +0 −56 lifedb_dispatch.ml
  4. +0 −60 lifedb_filter.ml
  5. +0 −345 lifedb_out_tasks.ml
  6. +0 −17 lifedb_schema_generator.ml
  7. +0 −351 lifedb_user.ml
  8. +0 −5 server.ml
  9. +0 −399 sync_schema.ml
  10. +0 −93 sync_schema.mli
View
1 db_thread.ml
@@ -42,7 +42,6 @@ let db_thread () =
|`Lifedb -> Sql_mirror.do_scan lifedb syncdb throttle_check
|`Plugins -> Lifedb_plugin.do_scan lifedb'
|`Tasks -> Lifedb_tasks.do_scan ()
- |`Out_tasks -> Lifedb_out_tasks.do_scan ()
end;
maybe_signal copt;
done
View
2 db_thread_access.ml
@@ -22,7 +22,6 @@ type scan_request = [
|`Plugins
|`Lifedb
|`Tasks
- |`Out_tasks
]
let q = Queue.create ()
@@ -33,7 +32,6 @@ let string_of_scan_request = function
|`Plugins -> "plugins"
|`Lifedb -> "lifedb"
|`Tasks -> "in_tasks"
- |`Out_tasks -> "out_tasks"
let dump_q () =
printf "DB Queue: [";
View
56 lifedb_dispatch.ml
@@ -55,18 +55,6 @@ let dispatch (lifedb : Lifedb_schema.Init.t) (syncdb : Sync_schema.Init.t) env (
(* not authenticated *)
|false -> begin
match cgi#request_method, url_hd with
- |`POST, "sync" ->
- let username = if List.length url_list < 2 then "unknown" else List.nth url_list 1 in
- let arg = mark_post_rpc cgi in
- Lifedb_user.dispatch_sync lifedb syncdb cgi username arg
- |`PUT arg, "sync" -> begin
- match url_list with
- |["sync";useruid;"_att";fileuid] ->
- Lifedb_user.dispatch syncdb env cgi (`Attachment (arg, useruid, fileuid))
- |["sync";useruid;"_entry";fileuid] ->
- Lifedb_user.dispatch syncdb env cgi (`Entry (arg, useruid, fileuid))
- |_ -> raise (Lifedb_rpc.Resource_not_found "unknown PUT request")
- end
|_ ->
return_need_auth cgi
end
@@ -96,21 +84,6 @@ let dispatch (lifedb : Lifedb_schema.Init.t) (syncdb : Sync_schema.Init.t) env (
|"_all" -> `List
|name -> `Get name)
- |`POST, "outtask" ->
- let arg = mark_post_rpc cgi in
- let name = if List.length url_list < 2 then "_unknown" else List.nth url_list 1 in
- Lifedb_out_tasks.dispatch cgi (`Create (name,arg))
- |`DELETE, "outtask" ->
- mark_delete_rpc cgi;
- let name = if List.length url_list < 2 then "_unknown" else List.nth url_list 1 in
- Lifedb_out_tasks.dispatch cgi (`Destroy name)
- |`GET, "outtask" ->
- let tasksel = if List.length url_list < 2 then "_all" else List.nth url_list 1 in
- mark_get_rpc cgi;
- Lifedb_out_tasks.dispatch cgi (match tasksel with
- |"_all" -> `List
- |name -> `Get name)
-
|`GET, "plugin" ->
let tasksel = if List.length url_list < 2 then "_all" else List.nth url_list 1 in
mark_get_rpc cgi;
@@ -141,35 +114,6 @@ let dispatch (lifedb : Lifedb_schema.Init.t) (syncdb : Sync_schema.Init.t) env (
mark_get_rpc cgi;
Lifedb_query.dispatch lifedb syncdb env cgi (`Mtype (List.tl url_list))
- |`POST, "user" ->
- let arg = mark_post_rpc cgi in
- Lifedb_user.dispatch syncdb env cgi (`Create arg)
- |`DELETE, "user" ->
- mark_delete_rpc cgi;
- let name = if List.length url_list < 2 then "unknown" else List.nth url_list 1 in
- Lifedb_user.dispatch syncdb env cgi (`Delete name)
- |`GET, "user" ->
- let usersel = if List.length url_list < 2 then "_all" else List.nth url_list 1 in
- mark_get_rpc cgi;
- Lifedb_user.dispatch syncdb env cgi (match usersel with
- |"_all" -> `List
- |name -> `Get name)
-
- |`POST, "filter" ->
- let arg = mark_post_rpc cgi in
- let uid = if List.length url_list < 2 then "unknown" else List.nth url_list 1 in
- Lifedb_user.dispatch syncdb env cgi (`Create_filter (uid,arg))
- |`DELETE, "filter" ->
- let uid,name = match url_list with |[_;uid;name] -> uid,name |_ -> "","" in
- Lifedb_user.dispatch syncdb env cgi (`Delete_filter (uid,name))
- |`GET, "filter" -> begin
- mark_get_rpc cgi;
- match url_list with
- |["filter";useruid] -> Lifedb_user.dispatch syncdb env cgi (`List_filters useruid)
- |["filter";useruid;fname] -> Lifedb_user.dispatch syncdb env cgi (`Get_filter (useruid, fname))
- |_ -> raise (Resource_not_found "filter")
- end
-
|_ -> raise (Invalid_rpc "Unknown request")
end
View
60 lifedb_filter.ml
@@ -1,60 +0,0 @@
-(* Copyright (C) 2009 Anil Madhavapeddy <anil@recoil.org>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-*)
-
-open Utils
-open Printf
-module LS=Lifedb_schema
-module SS=Sync_schema
-
-(* return the set of uids which the remote user doesnt have *)
-let filter_new (user:SS.User.t) es =
- let has_uids = (guids_of_blob user#has_guids) @ (guids_of_blob user#sent_guids) in
- let f = List.filter (fun e -> not (List.mem e#uid has_uids)) es in
- Log.logmod "Filter" "Filtering new entries -> %s (%d orig, %d results)" user#uid (List.length es) (List.length f);
- f
-
-(* return the set of uids which are addressed to the remote user *)
-let filter_recipients user es =
- let f = List.filter (fun e ->
- List.length (
- List.find_all (fun s ->
- s#name = "email" && s#uid = user#uid
- ) e#recipients
- ) > 0
- ) es in
- Log.logmod "Filter" "Filtering entries addressed to -> %s (%d results)" user#uid (List.length f);
- f
-
-(* apply a single filter and return a set of entries *)
-let apply_filter lifedb syncdb (user:SS.User.t) (entries:LS.Entry.t list) (filter:SS.Filter_rule.t) =
- match filter#body with
- |"add *" -> begin
- (* no need to preserve incoming uids as this just adds them all to output *)
- filter_new user (LS.Entry.get lifedb)
- end
- |"add * where #remote in recipients" -> begin
- filter_recipients user (filter_new user (LS.Entry.get lifedb)) @ entries
- end
- |_ -> failwith "unknown filter rule"
-
-(* given a user record, return a list of entries which need to go to the user *)
-let apply_filters lifedb syncdb (user:SS.User.t) =
- (* by default, we do not send any data to the remote user *)
- let entries = [] in
- (* retrieve filters in descending zorder to apply *)
- let filters = List.sort (fun a b -> compare a#zorder b#zorder) user#filters in
- List.fold_left (apply_filter lifedb syncdb user) entries filters
View
345 lifedb_out_tasks.ml
@@ -1,345 +0,0 @@
-(* Copyright (C) 2009 Anil Madhavapeddy <anil@recoil.org>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-*)
-
-(* Task scheduling *)
-
-open Printf
-open Utils
-module LS=Lifedb_schema
-module SS=Sync_schema
-
-exception Task_error of string
-exception Internal_task_error of string
-
-let m = Mutex.create ()
-
-type task_state = {
- cmd: string;
- plugin: string;
- mtype: string;
- cwd: string;
- start_time: float;
- secret: (string * string) option;
- args : string list option;
- mutable outfd: Unix.file_descr option;
- mutable errfd: Unix.file_descr option;
- mutable running: Fork_helper.task;
- mutable uids: string list;
- mutable files: string list;
-}
-
-let task_list = Hashtbl.create 1
-let task_table_limit = 20
-let task_poll_period = ref 20.
-let task_throttle () = Thread.delay 0.1
-
-let json_of_task name t : Lifedb.Rpc.Task.out_r =
- let secret = match t.secret with |None -> None
- |Some (s,u) -> Some (object method service=s method username=u end) in
- object
- method name=name
- method plugin=t.plugin
- method pltype=t.mtype
- method secret=secret
- method args=t.args
- method duration=Unix.gettimeofday () -. t.start_time
- method pid=Fork_helper.pid_of_task t.running
- end
-
-let string_of_task t =
- let running = Fork_helper.string_of_task t.running in
- sprintf ": `%s` %s" t.cmd running
-
-let log_task_table () =
- Hashtbl.iter (fun name state ->
- Log.logmod "Tasks" "%s: %s" name (string_of_task state)
- ) task_list
-
-let find_task name =
- try
- Some (Hashtbl.find task_list name)
- with
- Not_found -> None
-
-let find_task_by_mtype mtype =
- let f = ref None in
- Hashtbl.iter (fun n t ->
- if t.mtype = mtype then
- f := Some (n,t)
- ) task_list;
- !f
-
-(* create task descriptor and leave it unstarted *)
-let create_task task_name (p:Lifedb.Rpc.Task.out_t) =
- assert(not (Mutex.try_lock m));
- if Hashtbl.length task_list >= task_table_limit then
- raise (Task_error "too many tasks already registered");
- if String.contains task_name '.' || (String.contains task_name '/') then
- raise (Task_error "task name cant contain . or /");
- let pl = match Lifedb_plugin.find_plugin p#plugin with
- |None -> raise (Task_error (sprintf "plugin %s not found" p#plugin))
- |Some x -> x in
- let secret = match p#secret with
- |None -> None
- |Some s -> Some (s#service, s#username) in
- let now_time = Unix.gettimeofday () in
- let task = { cmd=pl#cmd; outfd=None; errfd=None; cwd=pl#dir; plugin=pl#name; secret=secret; start_time=now_time; running=(Fork_helper.blank_task ()); args=p#args; uids=[]; files=[]; mtype=p#pltype } in
- Hashtbl.add task_list task_name task;
- Log.logmod "Tasks" "Created outbound task '%s' %s" task_name (string_of_task task)
-
-let find_or_create_task name (t:Lifedb.Rpc.Task.out_t) =
- match find_task name with
- |Some _ -> ()
- |None -> create_task name t
-
-(* close any logging fds, reset fields *)
-let reset_task name =
- assert(not (Mutex.try_lock m));
- let closeopt task = function
- |None -> ()
- |Some fd ->
- let lg = sprintf "[%s] Log closing: %s\n" (current_datetime()) (Fork_helper.string_of_task task.running) in
- ignore(Unix.handle_unix_error (Unix.write fd lg 0) (String.length lg));
- Unix.handle_unix_error Unix.close fd;
- in
- match find_task name with
- |Some task ->
- let time_taken = (Unix.gettimeofday ()) -. task.start_time in
- let exit_code = Fork_helper.exit_code_of_task task.running in
- Log.push (`Plugin (name, time_taken, exit_code));
- closeopt task task.outfd;
- closeopt task task.errfd;
- task.outfd <- None;
- task.errfd <- None;
- task.running <- Fork_helper.blank_task ();
- task.uids <- [];
- task.files <- [];
- |None -> ()
-
-(* remove the task entirely *)
-let destroy_task name =
- assert(not (Mutex.try_lock m));
- match find_task name with
- |Some task -> begin
- let final_status = Fork_helper.destroy task.running in
- reset_task name;
- Hashtbl.remove task_list name;
- Log.logmod "Tasks" "Outbound task %s destroyed: %s" name
- (Fork_helper.string_of_status final_status);
- end
- |None -> raise (Task_error "task not found")
-
-(* split up a list of entris into a hashtable of their respective mtypes *)
-let partition_entries_into_mtypes lifedb es =
- let h = Hashtbl.create 1 in
- List.iter (fun e ->
- if not (Hashtbl.mem h e#mtype#name) then
- Hashtbl.add h e#mtype#name [];
- Hashtbl.replace h e#mtype#name (e :: (Hashtbl.find h e#mtype#name))
- ) es;
- h
-
-let start_task name =
- let t = Hashtbl.find task_list name in
- assert(not (Mutex.try_lock m));
- let env = match t.secret with
- |None -> [||]
- |Some (s,u) -> begin
- match Lifedb_passwd.lookup_passwd s u with
- |Some p -> [| ("LIFEDB_PASSWORD=" ^ p); ("LIFEDB_USERNAME="^u) |]
- |None -> Log.logmod "Tasks" "WARNING: unable to find passwd for task '%s'" name; [||]
- end in
- (* add environment arguments *)
- let args = match t.args with None -> [||] | Some a -> Array.of_list a in
- let env = Array.append env args in
- let logdir = Lifedb_config.Dir.log() in
- let logfile = sprintf "%s/%s.log" logdir name in
- let errlogfile = sprintf "%s/%s.err" logdir name in
- let openfdfn f = Unix.handle_unix_error (Unix.openfile f [ Unix.O_APPEND; Unix.O_CREAT; Unix.O_WRONLY]) 0o600 in
- let outfd = openfdfn logfile in
- let errfd = openfdfn errlogfile in
- let logfn fd s = ignore(Unix.write fd s 0 (String.length s)) in
- let tmstr = current_datetime () in
- logfn outfd (sprintf "[%s] Stdout log started\n" tmstr);
- logfn errfd (sprintf "[%s] Stderr log started\n" tmstr);
- let env = Array.append env [| "LIFEDB_SYNC_DIR=out"; (sprintf "LIFEDB_UID_MAP=%s" (Lifedb_config.Dir.uidmap ()));
- (sprintf "HOME=%s" (Sys.getenv "HOME"));
- (sprintf "USER=%s" (Sys.getenv "USER")) |] in
- let cmd =
- if Lifedb_config.test_mode () then
- sprintf "sleep %d" (Random.int 5 + 3)
- else
- (* XXX check shell escaping here!! *)
- sprintf "%s %s" t.cmd (String.concat " " (List.map String.escaped t.files))
- in
- let ts = Fork_helper.create cmd env t.cwd (logfn outfd) (logfn errfd) in
- Log.logmod "Tasks" "Executing outbound command '%s' (%s)" name cmd;
- task_throttle ();
- t.running <- ts;
- t.outfd <- Some outfd;
- t.errfd <- Some errfd
-
-(* Look for items in the INBOX with a pltype matching an active plugin, and schedule it
- if so *)
-let task_sweep lifedb syncdb () =
- (* for each user, look for entries in the inbox to them *)
- List.iter (fun (user:SS.User.t) ->
- let es = LS.Entry.get_by_inbox_delivered ~inbox:(Some user#uid) ~delivered:0L lifedb in
- match es with
- |[] -> ()
- |es -> begin
- (* constrain es to only 50 entries at a time to avoid overloading output plugin *)
- let es = list_max_size 50 es in
- (* we have inbox entries, look for a plugin to handle each mtype *)
- let h = partition_entries_into_mtypes lifedb es in
- Hashtbl.iter (fun mtype_name es ->
- (* look for an output task to handle this mtype name *)
- match find_task_by_mtype mtype_name with
- |None ->
- Log.logmod "Task" "Unable to find output task for <- %s : %s" mtype_name user#uid
- |Some (name,t) -> begin
- match Fork_helper.status_of_task t.running with
- |Fork_helper.Not_started ->
- (* set the entry UIDs and kick the command off *)
- t.uids <- List.map (fun e -> e#uid) es;
- t.files <- List.map (fun e -> e#file_name) es;
- start_task name
- |_ ->
- Log.logmod "Task" "Pending INBOX items, but already running %s" name
- end
- ) h
- end
- ) (SS.User.get syncdb);
- Hashtbl.iter (fun name task ->
- let td = string_of_task task in
- match Fork_helper.status_of_task task.running with
- |Fork_helper.Running pid ->
- Log.logmod "Sweep" "%s ... %s" name td
- |Fork_helper.Not_started -> ()
- |Fork_helper.Done exit_code ->
- Log.logmod "Sweep" "%s ... finished %s" name td;
- if exit_code = 0 then begin
- (* successfully delivered msgs, so mark them in the DB as delivered *)
- List.iter (fun uid ->
- match LS.Entry.get_by_uid ~uid lifedb with
- |[e] ->
- Log.logmod "Task" "Successfully delivered: %s" e#file_name;
- e#set_delivered 1L;
- ignore(e#save)
- |_ -> ()
- ) task.uids;
- end;
- reset_task name;
- |Fork_helper.Killed signal ->
- Log.logmod "Sweep" "%s ... crashed %s" name td;
- reset_task name;
- ) task_list
-
-let dispatch cgi = function
- |`Create (name,p) ->
- let params = Lifedb.Rpc.Task.out_t_of_json (Json_io.json_of_string p) in
- with_lock m (fun () ->
- match find_task name with
- |Some state ->
- Lifedb_rpc.return_error cgi `Bad_request "Task already exists" "Use a different id"
- |None -> begin
- try
- create_task name params
- with
- |Task_error err ->
- Lifedb_rpc.return_error cgi `Bad_request "Task error" err
- end
- )
- |`Get name ->
- with_lock m (fun () ->
- match find_task name with
- |Some state ->
- cgi#output#output_string (Json_io.string_of_json (Lifedb.Rpc.Task.json_of_out_r (json_of_task name state)))
- |None ->
- Lifedb_rpc.return_error cgi `Not_found "Task error" "Task not found"
- )
- |`List ->
- with_lock m (fun () ->
- let r = Hashtbl.fold (fun name state a -> json_of_task name state :: a) task_list [] in
- let res = object method results=List.length r method rows=r end in
- cgi#output#output_string (Json_io.string_of_json (Lifedb.Rpc.Task.json_of_out_rs res))
- )
- |`Destroy name ->
- with_lock m (fun () ->
- try
- destroy_task name
- with |Task_error err ->
- Lifedb_rpc.return_error cgi `Bad_request "Task error" err
- )
-
-(* task thread which waits on a condition to do a sweep. is signalled regularly
- or via a process exiting and delivering a SIGCHLD *)
-let c = Condition.create ()
-let cm = Mutex.create ()
-let task_thread () =
- let lifedb = LS.Init.t (Lifedb_config.Dir.lifedb_db ()) in
- let syncdb = SS.Init.t (Lifedb_config.Dir.sync_db ()) in
- while true do
- with_lock cm (fun () ->
- Condition.wait c cm;
- with_lock m (task_sweep lifedb syncdb);
- )
- done
-
-(* thread to kick the sweeping thread regularly to update task status. *)
-let task_regular_kick () =
- while true do
- with_lock cm (fun () ->
- Condition.signal c
- );
- Thread.delay !task_poll_period
- done
-
-(* scan the config directory and spawn tasks *)
-let config_file_extension = ".outconf"
-let scan_config_file config_file =
- Log.logmod "Tasks" "Scanning config file %s" config_file;
- let task = Lifedb.Rpc.Task.out_t_of_json (Json_io.load_json config_file) in
- let task_name = Filename.chop_suffix (Filename.basename config_file) config_file_extension in
- match Lifedb_plugin.find_plugin task#plugin with
- |None -> Log.logmod "Tasks" "Plugin '%s' not found for task '%s', skipping it" task#plugin task_name;
- |Some _ ->
- Log.logmod "Tasks" "Added '%s' (plugin %s)" task_name task#plugin;
- let task : Lifedb.Rpc.Task.out_t = object
- method plugin=task#plugin
- method secret=task#secret
- method args=task#args
- method pltype=task#pltype
- end in
- with_lock m (fun () -> find_or_create_task task_name task)
-
-let do_scan () =
- let config_dir = Lifedb_config.Dir.config () in
- let dh = Unix.opendir config_dir in
- try_final (fun () ->
- repeat_until_eof (fun () ->
- let next_entry = Unix.readdir dh in
- if Filename.check_suffix next_entry config_file_extension then
- scan_config_file (Filename.concat config_dir next_entry)
- )
- ) (fun () -> Unix.closedir dh)
-
-let init () =
- let _ = Thread.create task_thread () in
- let _ = Thread.create task_regular_kick () in
- Sys.set_signal Sys.sigchld (Sys.Signal_handle (fun _ ->
- with_lock cm (fun () -> Condition.signal c)))
View
17 lifedb_schema_generator.ml
@@ -75,23 +75,6 @@ let sync = make [
date "mtime";
],[], default_opts;
- "filter_rule", [
- text "name";
- text "body";
- integer "zorder";
- ],[], default_opts;
-
- "user", [
- text ~flags:[`Unique; `Index] "uid";
- text "ip";
- integer "port";
- text "key";
- date "last_sync";
- blob "has_guids";
- blob "sent_guids";
- foreign_many "filter_rule" "filters";
- ], [], default_opts;
-
]
let log = make [
View
351 lifedb_user.ml
@@ -1,351 +0,0 @@
-(* Copyright (C) 2009 Anil Madhavapeddy <anil@recoil.org>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-*)
-
-open Utils
-open Printf
-open Lifedb
-module LS=Lifedb_schema
-module SS=Sync_schema
-open Http_client.Convenience
-
-let process fn =
- let string_of_chan cin =
- let buf = Buffer.create 2048 in
- repeat_until_eof (fun () ->
- Buffer.add_string buf (cin#input_line ());
- );
- Buffer.contents buf
- in
- try
- let res = fn () in
- let cin = res#response_body#open_value_rd () in
- Netchannels.with_in_obj_channel cin (fun cin ->
- match res#response_status with
- |`Ok -> `Success (string_of_chan cin)
- |_ -> `Failure (string_of_chan cin)
- )
- with
- |Http_client.Http_protocol _ -> `Failure "unknown"
-
-(* send an RPC to a remote user with the specified json string *)
-let send_rpc (user:SS.User.t) json =
- let uri = sprintf "http://%s:%Lu/sync/%s" user#ip user#port (Lifedb_config.Dir.username ()) in
- let post_raw body =
- process (fun () ->
- http_post_raw_message ~callfn:(fun p ->
- let rh = p#request_header `Base in
- rh#update_field "content-type" "application/json";
- p#set_request_header rh)
- uri body
- ) in
- match post_raw json with
- |`Success res -> Log.logmod "RPC" "-> %s: success (res: %s)" user#uid res
- |`Failure res -> Log.logmod "RPC" "-> %s: epic fail (res: %s)" user#uid res
-
-let succ_sent_guids = Hashtbl.create 1
-
-(* HTTP PUT some content to a remote user *)
-let put_rpc syncdb (p:Http_client.pipeline) (user:SS.User.t) (entry:LS.Entry.t) =
- let uri = sprintf "http://%s:%Lu/sync/%s/_entry/%s" user#ip user#port (Lifedb_config.Dir.username ()) entry#uid in
- let atturi att = sprintf "http://%s:%Lu/sync/%s/_att/%s" user#ip user#port (Lifedb_config.Dir.username ()) (Filename.basename att#file_name) in
- let failure = ref false in
- (* drop the attachments in first *)
- List.iter (fun att ->
- let res = process (fun () ->
- let sz = (Unix.stat att#file_name).Unix.st_size in
- let buf = Buffer.create sz in
- let fin = open_in att#file_name in
- (try
- Buffer.add_channel buf fin sz;
- with err ->
- close_in fin;
- raise err
- );
- close_in fin;
- let http_call = new Http_client.put (atturi att) (Buffer.contents buf) in
- let hdr = http_call#request_header `Base in
- hdr#update_field "Content-type" att#mime_type;
- hdr#update_field "Content-length" (sprintf "%d" sz);
- http_call#set_request_header hdr;
- p#reset ();
- p#add http_call;
- p#run ();
- http_call
- ) in
- match res with
- |`Success res -> Log.logmod "User" "Success uploading attachment %s" att#file_name
- |`Failure res -> Log.logmod "User" "FAILED uploading attachment %s" att#file_name; failure := true
- ) entry#atts;
- if !failure then
- Log.logmod "User" "Had failures uploading attachments, so not doing entry %s" entry#uid
- else begin
- let fin = open_in entry#file_name in
- try_final (fun () ->
- let res = process (fun () ->
- let buf = Buffer.create 2048 in
- repeat_until_eof (fun () -> Buffer.add_string buf (input_line fin));
- let json = Buffer.contents buf in
- let http_call = new Http_client.put uri json in
- let hdr = http_call#request_header `Base in
- hdr#update_field "Content-type" "application/json";
- hdr#update_field "Content-length" (sprintf "%d" (String.length json));
- http_call#set_request_header hdr;
- p#reset ();
- p#add http_call;
- p#run ();
- http_call
- ) in
- match res with
- |`Success res -> Log.logmod "Upload" "Success to %s (%s): %s" user#uid entry#file_name res
- |`Failure res -> Log.logmod "Upload" "Failure to %s (%s): %s" user#uid entry#file_name res)
- (fun () -> close_in fin);
- user#set_sent_guids (add_guids_to_blob user#sent_guids [entry#uid]);
- Hashtbl.replace succ_sent_guids (user#uid, entry#uid) (Unix.gettimeofday ());
- ignore(user#save);
- end
-
-(* Lookup a user UID and apply the function over it *)
-let find_user db useruid fn =
- match SS.User.get ~uid:(Some useruid) db with
- |[user] -> fn user
- |_ -> raise (Lifedb_rpc.Resource_not_found "unknown user")
-
-(* Netchannel convenience function to make sure in/out channels are both cleaned up *)
-let with_in_and_out_obj_channel cin cout fn =
- Netchannels.with_out_obj_channel cout (fun cout ->
- Netchannels.with_in_obj_channel cin (fun cin ->
- fn cin cout
- )
- )
-
-(* User handling fn, to deal with incoming user create/delete and entry create/delete from
- remote sources *)
-let dispatch db env cgi = function
-|`Create arg -> begin
- Log.logmod "DebugUser" "%s" arg;
- let u = Rpc.User.t_of_json (Json_io.json_of_string arg) in
- match SS.User.get ~uid:(Some u#uid) db with
- |[] ->
- Log.logmod "DebugUser" "adding ";
- let user = SS.User.t ~uid:u#uid ~ip:u#ip ~port:(Int64.of_int u#port) ~key:u#key ~sent_guids:(blob_of_guids []) ~has_guids:(blob_of_guids []) ~filters:[] ~last_sync:0. db in
- ignore(user#save);
- |[user] ->
- Log.logmod "DebugUser" "already exists so edit";
- user#set_uid u#uid;
- user#set_ip u#ip;
- user#set_port (Int64.of_int u#port);
- user#set_key u#key;
- ignore(user#save);
- |_ ->
- Lifedb_rpc.return_error cgi `Bad_request "Internal error" "Multiple users found with same uid";
-end
-|`Delete uid -> begin
- find_user db uid (fun user -> user#delete)
-end
-|`Create_filter (useruid,arg) ->
- let f = Rpc.User.filter_of_json (Json_io.json_of_string arg) in
- find_user db useruid (fun user ->
- (* get existing filter list without the currently created one *)
- let f = SS.Filter_rule.t ~name:f#name ~body:f#body ~zorder:(Int64.of_int f#zorder) db in
- ignore(f#save);
- let fs = f :: (List.filter (fun x -> x#name <> f#name) user#filters) in
- user#set_filters fs;
- ignore(user#save);
- )
-|`Delete_filter (useruid,name) ->
- find_user db useruid (fun user ->
- let pos,neg = List.partition (fun x -> x#name = name) user#filters in
- match pos with
- |[] -> raise (Lifedb_rpc.Resource_not_found "unknown filter")
- |_ -> user#set_filters neg; ignore(user#save)
- )
-|`Entry (arg, useruid, fileuid) -> begin
- find_user db useruid (fun user ->
- let entry_dir = String.concat "/" [Lifedb_config.Dir.inbox (); user#uid; "entries"] in
- let fname = Filename.concat entry_dir (fileuid ^ ".lifeentry") in
- if String.contains fileuid '/' then raise (Lifedb_rpc.Invalid_rpc "bad filename uid");
- if Sys.file_exists fname then raise (Lifedb_rpc.Resource_conflict "attachment already exists");
- make_dirs entry_dir;
- let cout = new Netchannels.output_channel (open_out fname) in
- let cin = arg#open_value_rd () in
- with_in_and_out_obj_channel cin cout (fun cin cout -> cout#output_channel cin);
- Db_thread_access.push `Lifedb;
- )
-end
-|`Attachment (arg,useruid,fileuid) -> begin
- find_user db useruid (fun user ->
- let att_dir = String.concat "/" [Lifedb_config.Dir.inbox (); user#uid; "_att"] in
- let fname = Filename.concat att_dir fileuid in
- if String.contains fileuid '/' then raise (Lifedb_rpc.Invalid_rpc "bad filename uid");
- make_dirs att_dir;
- let cout = new Netchannels.output_channel (open_out fname) in
- let cin = arg#open_value_rd () in
- with_in_and_out_obj_channel cin cout (fun cin cout -> cout#output_channel cin)
- )
-end
-|`Get useruid -> begin
- find_user db useruid (fun user ->
- let t = object method uid=user#uid method ip=user#ip method port=Int64.to_int user#port method key=user#key end in
- cgi#output#output_string (Json_io.string_of_json (Rpc.User.json_of_t t))
- )
-end
-|`List -> begin
- let r = List.map (fun user ->
- object method uid=user#uid method ip=user#ip method port=Int64.to_int user#port method key=user#key end) (SS.User.get db) in
- let tr = object method results=List.length r method rows=r end in
- cgi#output#output_string (Json_io.string_of_json (Rpc.User.json_of_ts tr))
-end
-|`List_filters useruid -> begin
- find_user db useruid (fun user ->
- let r = List.map (fun f -> object method name=f#name method body=f#body method zorder=(Int64.to_int f#zorder) end) user#filters in
- cgi#output#output_string (Json_io.string_of_json (Rpc.User.json_of_filters (results_of_search r)));
- )
-end
-|`Get_filter (useruid, filtername) -> begin
- find_user db useruid (fun user ->
- match List.filter (fun f -> f#name = filtername) user#filters with
- |[] -> raise (Lifedb_rpc.Resource_not_found ("unknown filter " ^ filtername))
- |[f] ->
- let r = object method name=f#name method body=f#body method zorder=(Int64.to_int f#zorder) end in
- cgi#output#output_string (Json_io.string_of_json (Rpc.User.json_of_filter r))
- |_ -> raise (Lifedb_rpc.Invalid_rpc "internal error: multiple filters with same name found")
- )
-end
-
-(* upload channel, send it a username/file to upload sequentially *)
-let uploadreq = Event.new_channel ()
-
-(* upload contents on the upload queue to remote hosts via HTTP PUT *)
-let upload_thread () =
- let lifedb = LS.Init.t (Lifedb_config.Dir.lifedb_db ()) in
- let syncdb = SS.Init.t (Lifedb_config.Dir.sync_db ()) in
- let p = new Http_client.pipeline in
- (*
- let set_verbose_pipeline () =
- let opt = p#get_options in
- p#set_options { opt with
- Http_client.verbose_status = true;
- verbose_request_header = true;
- verbose_response_header = true;
- verbose_response_contents = true;
- verbose_connection = false
- } in
- set_verbose_pipeline (); *)
- p#set_proxy_from_environment ();
- p#reset ();
- while true do
- let useruid, fileuid = Event.sync (Event.receive uploadreq) in
- Log.logmod "Upload" "Upload request for %s to %s" fileuid useruid;
- match (SS.User.get ~uid:(Some useruid) syncdb), (LS.Entry.get ~uid:(Some fileuid) lifedb) with
- |[user],[entry] -> begin
- try
- put_rpc syncdb p user entry
- with err ->
- Log.logmod "Sync" "Encountered error syncing %s->%s: %s" user#uid entry#uid (Printexc.to_string err)
- end
- |_ -> Log.logmod "Sync" "WARNING: User %s or entry %s not found" useruid fileuid
- done
-
-(* given a user object, synchronize any entries not present on the remote user host,
- by adding them to the upload thread. *)
-let sync_our_entries_to_user lifedb syncdb user =
- Log.logmod "Sync" "Our entries -> %s" user#uid;
- let uids = Lifedb_filter.apply_filters lifedb syncdb user in
- (* filter out recently sent GUIDs from the memory hash *)
- let uids = List.filter (fun e ->
- try
- let tm = Hashtbl.find succ_sent_guids (user#uid,e#uid) in
- Unix.gettimeofday () -. tm > 86400.
- with _ -> true
- ) uids in
- List.iter (fun x -> Log.logmod "Sync" "added upload -> %s: %s" x#uid x#file_name) uids;
- List.iter (fun x -> Event.sync (Event.send uploadreq (user#uid, x#uid))) uids
-
-(* given a user object, send it all the GUIDs we already have to keep it up to date with
- what we might need *)
-let sync_our_guids_to_user lifedb syncdb user =
- Log.logmod "Sync" "Our GUIDS -> %s" user#uid;
- let all_guids = LS.Entry.get_uid lifedb in
- let json = Rpc.User.json_of_sync (object method guids=all_guids end) in
- send_rpc user (Json_io.string_of_json json)
-
-(* thread to regularly iterate over all users and send off our guids *)
-let sync_guids_to_remote_users_thread lifedb syncdb =
- let sync_interval = 60. in
- let now = Unix.gettimeofday () in
- Db_thread_access.throttle_request "sync_guids_to_remote_users" (fun () ->
- List.iter (fun user ->
- if now -. user#last_sync > sync_interval then begin
- sync_our_guids_to_user lifedb syncdb user;
- user#set_last_sync (Unix.gettimeofday());
- ignore(user#save);
- end
- ) (SS.User.get syncdb)
- )
-
-(* thread to listen to received syncs from users and look for entries they
- need to add to the upload thread *)
-let sq = Queue.create ()
-let sm = Mutex.create ()
-let sc = Condition.create ()
-let sync_entries_to_remote_users_thread lifedb syncdb =
- with_lock sm (fun () ->
- if Queue.is_empty sq then
- Condition.wait sc sm;
- let useruid = Queue.take sq in
- Db_thread_access.throttle_request "sync_entries_to_remote" (fun () ->
- find_user syncdb useruid (sync_our_entries_to_user lifedb syncdb)
- )
- )
-
-(* received a sync request from another user, so update our has_guids list
- * for that user *)
-let dispatch_sync lifedb syncdb cgi uid arg =
- match SS.User.get ~uid:(Some uid) syncdb with
- |[] -> Lifedb_rpc.return_error cgi `Forbidden "Unknown user" ""
- |[user] ->
- let sync = Rpc.User.sync_of_json (Json_io.json_of_string arg) in
- Log.logmod "Sync" "Received GUID update <- %s (%d UIDs)" uid (List.length sync#guids);
- user#set_has_guids (blob_of_guids sync#guids);
- (* XXX reset the sent guids here? what if remote user has deleted and doesnt want them back *)
- user#set_sent_guids (blob_of_guids []);
- ignore(user#save);
- with_lock sm (fun () ->
- Queue.push user#uid sq;
- Condition.signal sc;
- )
- |_ -> assert false
-
-let thread_with_dbs name fn =
- Thread.delay 5.;
- let lifedb = LS.Init.t (Lifedb_config.Dir.lifedb_db ()) in
- let syncdb = SS.Init.t (Lifedb_config.Dir.sync_db ()) in
- while true do
- (try
- fn lifedb syncdb
- with exn ->
- Log.logmod "Sync" "Got exception in thread '%s': %s" name (Printexc.to_string exn)
- );
- Thread.delay 20.
- done
-
-let init () =
- let _ = Thread.create (thread_with_dbs "remote_guids") sync_guids_to_remote_users_thread in
- let _ = Thread.create (thread_with_dbs "remote_entries") sync_entries_to_remote_users_thread in
- let _ = Thread.create upload_thread () in
- ()
-
View
5 server.ml
@@ -49,7 +49,6 @@ let _ =
(* the task manager thread *)
Lifedb_tasks.init ();
- Lifedb_out_tasks.init ();
(* make and display various directories used by the server *)
List.iter (fun (a,b) ->
@@ -62,10 +61,6 @@ let _ =
Db_thread.start ();
Db_thread_access.push `Plugins;
Db_thread_access.push `Tasks;
- Db_thread_access.push `Out_tasks;
-
- (* start the p2p sync thread *)
- Lifedb_user.init ();
(* start listening to HTTP connections *)
Http_server.init ()
View
399 sync_schema.ml
@@ -228,411 +228,12 @@ module Dircache = struct
end
-module Filter_rule = struct
- type t = <
- id : int64 option;
- set_id : int64 option -> unit;
- name : string;
- set_name : string -> unit;
- body : string;
- set_body : string -> unit;
- zorder : int64;
- set_zorder : int64 -> unit;
- save: int64; delete: unit
- >
-
- let init db =
- let sql = "create table if not exists filter_rule (id integer primary key autoincrement,name text,body text,zorder integer);" in
- db_must_ok db (fun () -> Sqlite3.exec db.db sql);
- ()
-
- (* object definition *)
- let t ?(id=None) ~name ~body ~zorder db : t = object
- (* get functions *)
- val mutable _id = id
- method id : int64 option = _id
- val mutable _name = name
- method name : string = _name
- val mutable _body = body
- method body : string = _body
- val mutable _zorder = zorder
- method zorder : int64 = _zorder
-
- (* set functions *)
- method set_id v =
- _id <- v
- method set_name v =
- _name <- v
- method set_body v =
- _body <- v
- method set_zorder v =
- _zorder <- v
-
- (* admin functions *)
- method delete =
- match _id with
- |None -> ()
- |Some id ->
- let sql = "DELETE FROM filter_rule WHERE id=?" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (Sqlite3.Data.INT id));
- ignore(step_fold db stmt (fun _ -> ()));
- _id <- None
-
- method save = transaction db (fun () ->
- (* insert any foreign-one fields into their table and get id *)
- let _curobj_id = match _id with
- |None -> (* insert new record *)
- let sql = "INSERT INTO filter_rule VALUES(NULL,?,?,?)" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (let v = _name in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 2 (let v = _body in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 3 (let v = _zorder in Sqlite3.Data.INT v));
- db_must_done db (fun () -> Sqlite3.step stmt);
- let __id = Sqlite3.last_insert_rowid db.db in
- _id <- Some __id;
- __id
- |Some id -> (* update *)
- let sql = "UPDATE filter_rule SET name=?,body=?,zorder=? WHERE id=?" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (let v = _name in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 2 (let v = _body in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 3 (let v = _zorder in Sqlite3.Data.INT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 4 (Sqlite3.Data.INT id));
- db_must_done db (fun () -> Sqlite3.step stmt);
- id
- in
- _curobj_id
- )
- end
-
- (* General get function for any of the columns *)
- let get ?(id=None) ?(name=None) ?(body=None) ?(zorder=None) ?(custom_where=("",[])) db =
- (* assemble the SQL query string *)
- let q = "" in
- let _first = ref true in
- let f () = match !_first with |true -> _first := false; " WHERE " |false -> " AND " in
- let q = match id with |None -> q |Some b -> q ^ (f()) ^ "filter_rule.id=?" in
- let q = match name with |None -> q |Some b -> q ^ (f()) ^ "filter_rule.name=?" in
- let q = match body with |None -> q |Some b -> q ^ (f()) ^ "filter_rule.body=?" in
- let q = match zorder with |None -> q |Some b -> q ^ (f()) ^ "filter_rule.zorder=?" in
- let q = match custom_where with |"",_ -> q |w,_ -> q ^ (f()) ^ "(" ^ w ^ ")" in
- let sql="SELECT filter_rule.id, filter_rule.name, filter_rule.body, filter_rule.zorder FROM filter_rule " ^ q in
- let stmt=Sqlite3.prepare db.db sql in
- (* bind the position variables to the statement *)
- let bindpos = ref 1 in
- ignore(match id with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.INT v));
- incr bindpos
- );
- ignore(match name with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.TEXT v));
- incr bindpos
- );
- ignore(match body with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.TEXT v));
- incr bindpos
- );
- ignore(match zorder with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.INT v));
- incr bindpos
- );
- ignore(match custom_where with |_,[] -> () |_,eb ->
- List.iter (fun b ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos b);
- incr bindpos
- ) eb);
- (* convert statement into an ocaml object *)
- let of_stmt stmt =
- t
- (* native fields *)
- ~id:(
- (match Sqlite3.column stmt 0 with
- |Sqlite3.Data.NULL -> None
- |x -> Some (match x with |Sqlite3.Data.INT i -> i |x -> (try Int64.of_string (Sqlite3.Data.to_string x) with _ -> failwith "error: filter_rule id")))
- )
- ~name:(
- (match Sqlite3.column stmt 1 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- ~body:(
- (match Sqlite3.column stmt 2 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- ~zorder:(
- (match Sqlite3.column stmt 3 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> match x with |Sqlite3.Data.INT i -> i |x -> (try Int64.of_string (Sqlite3.Data.to_string x) with _ -> failwith "error: filter_rule zorder"))
- )
- (* foreign fields *)
- db
- in
- (* execute the SQL query *)
- step_fold db stmt of_stmt
-
-end
-
-module User = struct
- type t = <
- id : int64 option;
- set_id : int64 option -> unit;
- uid : string;
- set_uid : string -> unit;
- ip : string;
- set_ip : string -> unit;
- port : int64;
- set_port : int64 -> unit;
- key : string;
- set_key : string -> unit;
- last_sync : float;
- set_last_sync : float -> unit;
- has_guids : string;
- set_has_guids : string -> unit;
- sent_guids : string;
- set_sent_guids : string -> unit;
- filters : Filter_rule.t list;
- set_filters : Filter_rule.t list -> unit;
- save: int64; delete: unit
- >
-
- let init db =
- let sql = "create table if not exists user (id integer primary key autoincrement,uid text,ip text,port integer,key text,last_sync real,has_guids blob,sent_guids blob);" in
- db_must_ok db (fun () -> Sqlite3.exec db.db sql);
- let sql = "create table if not exists map_filters_user_filter_rule (user_id integer, filter_rule_id integer, primary key(user_id, filter_rule_id));" in
- db_must_ok db (fun () -> Sqlite3.exec db.db sql);
- let sql = "CREATE UNIQUE INDEX IF NOT EXISTS user_uid_idx ON user (uid) " in
- db_must_ok db (fun () -> Sqlite3.exec db.db sql);
- ()
-
- (* object definition *)
- let t ?(id=None) ~uid ~ip ~port ~key ~last_sync ~has_guids ~sent_guids ~filters db : t = object
- (* get functions *)
- val mutable _id = id
- method id : int64 option = _id
- val mutable _uid = uid
- method uid : string = _uid
- val mutable _ip = ip
- method ip : string = _ip
- val mutable _port = port
- method port : int64 = _port
- val mutable _key = key
- method key : string = _key
- val mutable _last_sync = last_sync
- method last_sync : float = _last_sync
- val mutable _has_guids = has_guids
- method has_guids : string = _has_guids
- val mutable _sent_guids = sent_guids
- method sent_guids : string = _sent_guids
- val mutable _filters = filters
- method filters : Filter_rule.t list = _filters
-
- (* set functions *)
- method set_id v =
- _id <- v
- method set_uid v =
- _uid <- v
- method set_ip v =
- _ip <- v
- method set_port v =
- _port <- v
- method set_key v =
- _key <- v
- method set_last_sync v =
- _last_sync <- v
- method set_has_guids v =
- _has_guids <- v
- method set_sent_guids v =
- _sent_guids <- v
- method set_filters v =
- _filters <- v
-
- (* admin functions *)
- method delete =
- match _id with
- |None -> ()
- |Some id ->
- let sql = "DELETE FROM user WHERE id=?" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (Sqlite3.Data.INT id));
- ignore(step_fold db stmt (fun _ -> ()));
- _id <- None
-
- method save = transaction db (fun () ->
- (* insert any foreign-one fields into their table and get id *)
- let _curobj_id = match _id with
- |None -> (* insert new record *)
- let sql = "INSERT INTO user VALUES(NULL,?,?,?,?,?,?,?)" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (let v = _uid in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 2 (let v = _ip in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 3 (let v = _port in Sqlite3.Data.INT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 4 (let v = _key in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 5 (let v = _last_sync in Sqlite3.Data.FLOAT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 6 (let v = _has_guids in Sqlite3.Data.BLOB v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 7 (let v = _sent_guids in Sqlite3.Data.BLOB v));
- db_must_done db (fun () -> Sqlite3.step stmt);
- let __id = Sqlite3.last_insert_rowid db.db in
- _id <- Some __id;
- __id
- |Some id -> (* update *)
- let sql = "UPDATE user SET uid=?,ip=?,port=?,key=?,last_sync=?,has_guids=?,sent_guids=? WHERE id=?" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (let v = _uid in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 2 (let v = _ip in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 3 (let v = _port in Sqlite3.Data.INT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 4 (let v = _key in Sqlite3.Data.TEXT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 5 (let v = _last_sync in Sqlite3.Data.FLOAT v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 6 (let v = _has_guids in Sqlite3.Data.BLOB v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 7 (let v = _sent_guids in Sqlite3.Data.BLOB v));
- db_must_ok db (fun () -> Sqlite3.bind stmt 8 (Sqlite3.Data.INT id));
- db_must_done db (fun () -> Sqlite3.step stmt);
- id
- in
- List.iter (fun f ->
- let _refobj_id = f#save in
- let sql = "INSERT OR IGNORE INTO map_filters_user_filter_rule VALUES(?,?)" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (Sqlite3.Data.INT _curobj_id));
- db_must_ok db (fun () -> Sqlite3.bind stmt 2 (Sqlite3.Data.INT _refobj_id));
- ignore(step_fold db stmt (fun _ -> ()));
- ) _filters;
- let ids = String.concat "," (List.map (fun x -> match x#id with |None -> assert false |Some x -> Int64.to_string x) _filters) in
- let sql = "DELETE FROM map_filters_user_filter_rule WHERE user_id=? AND (filter_rule_id NOT IN (" ^ ids ^ "))" in
- let stmt = Sqlite3.prepare db.db sql in
- db_must_ok db (fun () -> Sqlite3.bind stmt 1 (Sqlite3.Data.INT _curobj_id));
- ignore(step_fold db stmt (fun _ -> ()));
- _curobj_id
- )
- end
-
- (* General get function for any of the columns *)
- let get ?(id=None) ?(uid=None) ?(ip=None) ?(port=None) ?(key=None) ?(last_sync=None) ?(has_guids=None) ?(sent_guids=None) ?(custom_where=("",[])) db =
- (* assemble the SQL query string *)
- let q = "" in
- let _first = ref true in
- let f () = match !_first with |true -> _first := false; " WHERE " |false -> " AND " in
- let q = match id with |None -> q |Some b -> q ^ (f()) ^ "user.id=?" in
- let q = match uid with |None -> q |Some b -> q ^ (f()) ^ "user.uid=?" in
- let q = match ip with |None -> q |Some b -> q ^ (f()) ^ "user.ip=?" in
- let q = match port with |None -> q |Some b -> q ^ (f()) ^ "user.port=?" in
- let q = match key with |None -> q |Some b -> q ^ (f()) ^ "user.key=?" in
- let q = match last_sync with |None -> q |Some b -> q ^ (f()) ^ "user.last_sync=?" in
- let q = match has_guids with |None -> q |Some b -> q ^ (f()) ^ "user.has_guids=?" in
- let q = match sent_guids with |None -> q |Some b -> q ^ (f()) ^ "user.sent_guids=?" in
- let q = match custom_where with |"",_ -> q |w,_ -> q ^ (f()) ^ "(" ^ w ^ ")" in
- let sql="SELECT user.id, user.uid, user.ip, user.port, user.key, user.last_sync, user.has_guids, user.sent_guids FROM user " ^ q in
- let stmt=Sqlite3.prepare db.db sql in
- (* bind the position variables to the statement *)
- let bindpos = ref 1 in
- ignore(match id with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.INT v));
- incr bindpos
- );
- ignore(match uid with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.TEXT v));
- incr bindpos
- );
- ignore(match ip with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.TEXT v));
- incr bindpos
- );
- ignore(match port with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.INT v));
- incr bindpos
- );
- ignore(match key with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.TEXT v));
- incr bindpos
- );
- ignore(match last_sync with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.FLOAT v));
- incr bindpos
- );
- ignore(match has_guids with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.BLOB v));
- incr bindpos
- );
- ignore(match sent_guids with |None -> () |Some v ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos (Sqlite3.Data.BLOB v));
- incr bindpos
- );
- ignore(match custom_where with |_,[] -> () |_,eb ->
- List.iter (fun b ->
- db_must_ok db (fun () -> Sqlite3.bind stmt !bindpos b);
- incr bindpos
- ) eb);
- (* convert statement into an ocaml object *)
- let of_stmt stmt =
- t
- (* native fields *)
- ~id:(
- (match Sqlite3.column stmt 0 with
- |Sqlite3.Data.NULL -> None
- |x -> Some (match x with |Sqlite3.Data.INT i -> i |x -> (try Int64.of_string (Sqlite3.Data.to_string x) with _ -> failwith "error: user id")))
- )
- ~uid:(
- (match Sqlite3.column stmt 1 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- ~ip:(
- (match Sqlite3.column stmt 2 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- ~port:(
- (match Sqlite3.column stmt 3 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> match x with |Sqlite3.Data.INT i -> i |x -> (try Int64.of_string (Sqlite3.Data.to_string x) with _ -> failwith "error: user port"))
- )
- ~key:(
- (match Sqlite3.column stmt 4 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- ~last_sync:(
- (match Sqlite3.column stmt 5 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> match x with |Sqlite3.Data.FLOAT i -> i|x -> (try float_of_string (Sqlite3.Data.to_string x) with _ -> failwith "error: user last_sync"))
- )
- ~has_guids:(
- (match Sqlite3.column stmt 6 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- ~sent_guids:(
- (match Sqlite3.column stmt 7 with
- |Sqlite3.Data.NULL -> failwith "null of_stmt"
- |x -> Sqlite3.Data.to_string x)
- )
- (* foreign fields *)
- ~filters:(
- (* foreign many-many mapping field *)
- let sql' = "select filter_rule_id from map_filters_user_filter_rule where user_id=?" in
- let stmt' = Sqlite3.prepare db.db sql' in
- let user__id = Sqlite3.column stmt 0 in
- db_must_ok db (fun () -> Sqlite3.bind stmt' 1 user__id);
- List.flatten (step_fold db stmt' (fun s ->
- let i = match Sqlite3.column s 0 with |Sqlite3.Data.INT i -> i |_ -> assert false in
- Filter_rule.get ~id:(Some i) db)
- ))
- db
- in
- (* execute the SQL query *)
- step_fold db stmt of_stmt
-
-end
-
module Init = struct
type t = state
type transaction_mode = [`Exclusive |`Deferred |`Immediate ]
let t ?(busyfn=default_busyfn) ?(mode=`Immediate) db_name =
let db = {db=Sqlite3.db_open db_name; in_transaction=0; mode=mode; busyfn=busyfn } in
Dircache.init db;
- Filter_rule.init db;
- User.init db;
db
let db handle = handle.db
View
93 sync_schema.mli
@@ -49,96 +49,3 @@ module Dircache : sig
*)
end
-module Filter_rule : sig
- type t = <
- id : int64 option;
- set_id : int64 option -> unit;
- name : string;
- set_name : string -> unit;
- body : string;
- set_body : string -> unit;
- zorder : int64;
- set_zorder : int64 -> unit;
- save: int64; delete: unit
- >
-
- (** An object which can be stored in the database with the [save] method call, or removed by calling [delete]. Fields can be accessed via the approriate named method and set via the [set_] methods. Changes are not committed to the database until [save] is invoked.
- *)
-
- val t :
- ?id:int64 option ->
- name:string ->
- body:string ->
- zorder:int64 ->
- Init.t -> t
- (** Can be used to construct a new object. If [id] is not specified, it will be automatically assigned the first time [save] is called on the object. The object is not committed to the database until [save] is invoked. The [save] method will also return the [id] assigned to the object.
- @raise Sql_error if a database error is encountered
- *)
-
- val get :
- ?id:int64 option ->
- ?name:string option ->
- ?body:string option ->
- ?zorder:int64 option ->
- ?custom_where:string * Sqlite3.Data.t list -> Init.t -> t list
- (** Used to retrieve objects from the database. If an argument is specified, it is included in the search criteria (all fields are ANDed together).
- @raise Sql_error if a database error is encountered
- *)
-
-end
-module User : sig
- type t = <
- id : int64 option;
- set_id : int64 option -> unit;
- uid : string;
- set_uid : string -> unit;
- ip : string;
- set_ip : string -> unit;
- port : int64;
- set_port : int64 -> unit;
- key : string;
- set_key : string -> unit;
- last_sync : float;
- set_last_sync : float -> unit;
- has_guids : string;
- set_has_guids : string -> unit;
- sent_guids : string;
- set_sent_guids : string -> unit;
- filters : Filter_rule.t list;
- set_filters : Filter_rule.t list -> unit;
- save: int64; delete: unit
- >
-
- (** An object which can be stored in the database with the [save] method call, or removed by calling [delete]. Fields can be accessed via the approriate named method and set via the [set_] methods. Changes are not committed to the database until [save] is invoked.
- *)
-
- val t :
- ?id:int64 option ->
- uid:string ->
- ip:string ->
- port:int64 ->
- key:string ->
- last_sync:float ->
- has_guids:string ->
- sent_guids:string ->
- filters:Filter_rule.t list ->
- Init.t -> t
- (** Can be used to construct a new object. If [id] is not specified, it will be automatically assigned the first time [save] is called on the object. The object is not committed to the database until [save] is invoked. The [save] method will also return the [id] assigned to the object.
- @raise Sql_error if a database error is encountered
- *)
-
- val get :
- ?id:int64 option ->
- ?uid:string option ->
- ?ip:string option ->
- ?port:int64 option ->
- ?key:string option ->
- ?last_sync:float option ->
- ?has_guids:string option ->
- ?sent_guids:string option ->
- ?custom_where:string * Sqlite3.Data.t list -> Init.t -> t list
- (** Used to retrieve objects from the database. If an argument is specified, it is included in the search criteria (all fields are ANDed together).
- @raise Sql_error if a database error is encountered
- *)
-
-end

0 comments on commit 0989f7c

Please sign in to comment.