Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Gestion des acteurs par un pool de threads

  • Loading branch information...
commit 447eb6987fcc1afbd1b893de89902b042903ccc1 1 parent 377cf1f
@SylvainGBR authored
View
17 Makefile
@@ -1,6 +1,8 @@
-SOURCES = my_queue.ml actorsType.ml actorsGlobal.ml actorssg.ml
+SOURCES1 = my_queue.ml actorsType.ml actorsGlobal.ml actorssg.ml
+SOURCES2 = my_queue.ml actorsType.ml actorsGlobal.ml actorssg.ml server.ml
EXEC1 = client
EXEC2 = server
+EXEC3 = pingpong
CAMLC = ocamlc
CAMLOPT = ocamlopt
CAMLDEP = ocamldep
@@ -12,10 +14,11 @@ WITHTHREADS =-thread threads.cma
# OBJS = $(SOURCES2:.ml=.cmo)
# OPTOBJS = $(SOURCES:.ml=.cmx)
-CLIENT_OBJS=$(SOURCES:.ml=.cmo) client.cmo
-SERVER_OBJS=$(SOURCES:.ml=.cmo) server.cmo
+CLIENT_OBJS=$(SOURCES1:.ml=.cmo) client.cmo
+SERVER_OBJS=$(SOURCES2:.ml=.cmo) ircserver.cmo
+PINGPONG_OBJS=$(SOURCES1:.ml=.cmo) test.cmo
-all: $(EXEC1) $(EXEC2)
+all: $(EXEC1) $(EXEC2) $(EXEC3)
$(EXEC1): $(CLIENT_OBJS)
$(CAMLC) -o $(EXEC1) $(LIBS) $(CLIENT_OBJS)
@@ -23,6 +26,9 @@ $(EXEC1): $(CLIENT_OBJS)
$(EXEC2): $(SERVER_OBJS)
$(CAMLC) -o $(EXEC2) $(LIBS) $(SERVER_OBJS)
+$(EXEC3): $(PINGPONG_OBJS)
+ $(CAMLC) -o $(EXEC3) $(LIBS) $(PINGPONG_OBJS)
+
# $(EXEC1).opt: $(OPTOBJS)
# $(CAMLOPT) -o $(EXEC1) $(LIBS:.cma=.cmxa) $(OPTOBJS)
@@ -46,6 +52,9 @@ clean:
rm -f $(EXEC1).opt
rm -f $(EXEC2)
rm -f $(EXEC2).opt
+ rm -f $(EXEC3)
+ rm -f $(EXEC3).opt
+
depend: $(SOURCE)
$(CAMLDEP) *.mli *.ml > .depend
View
6 actorsGlobal.ml
@@ -44,3 +44,9 @@ let a_mutex = Mutex.create()
let (receive_scheduler : (actor * (message -> unit)) Queue.t) = Queue.create()
let rs_mutex = Mutex.create()
+
+let (functions : (string, arg list -> unit) Hashtbl.t) = Hashtbl.create 97
+
+let nb_threads = ref 0
+let nb_threadmax = ref 4
+let nbt_mutex = Mutex.create()
View
9 actorsType.ml
@@ -24,7 +24,8 @@ and message = string * arg list
and local_actor = {
mailbox : message My_queue.t;
- mutex : Mutex.t;
+ w_mutex : Mutex.t;
+ r_mutex : Mutex.t;
}
and remote_actor = {
@@ -43,7 +44,11 @@ and actor = {
actor_location : location;
}
-type actor_env = {actor: actor; sleeping : (message -> unit) Queue.t}
+type actor_env = {
+ actor: actor;
+ s_mutex : Mutex.t;
+ sleeping : (message -> unit) Queue.t
+}
type node = {
name : int;
View
79 actorssg.ml
@@ -10,16 +10,12 @@ let schedule_receive a f =
let awake aid =
debug "In Awake : %!";
let a_env = Hashtbl.find actors aid in
- match a_env.actor.actor_location with
- | Local lac ->
- begin mutex_lock lac.mutex;
- try
- let f = Queue.pop a_env.sleeping in
- begin schedule_receive a_env.actor f;
- mutex_unlock lac.mutex end
- with Queue.Empty -> mutex_unlock lac.mutex
- end
- | Remote o -> failwith "You cannot awake a remote actor";;
+ mutex_lock a_env.s_mutex;
+ try
+ let f = Queue.pop a_env.sleeping in
+ begin schedule_receive a_env.actor f;
+ mutex_unlock a_env.s_mutex end
+ with Queue.Empty -> mutex_unlock a_env.s_mutex;;
let react f = raise (React f);;
@@ -38,9 +34,9 @@ let create() =
Mutex.unlock a_mutex;
i end in
let id = new_aid() in
- let l_act = {mailbox = My_queue.create() ; mutex = Mutex.create()} in
+ let l_act = {mailbox = My_queue.create() ; w_mutex = Mutex.create(); r_mutex = Mutex.create()} in
let new_actor = {actor_id = id; actor_location = Local l_act} in
- let new_act_env = {actor = new_actor; sleeping = Queue.create()} in
+ let new_act_env = {actor = new_actor; sleeping = Queue.create(); s_mutex = Mutex.create()} in
Hashtbl.add actors new_actor.actor_id new_act_env;
new_actor;;
@@ -87,9 +83,9 @@ let rec sender o =
let rec send a m =
match a.actor_location with
| Local lac -> begin debug "Sending to the local actor %n : %!" a.actor_id;
- mutex_lock lac.mutex;
+ mutex_lock lac.w_mutex;
My_queue.add m lac.mailbox;
- mutex_unlock lac.mutex;
+ mutex_unlock lac.w_mutex;
awake a.actor_id end
| Remote rma -> debug "Sending to the remote actor %n from %s %n %!" a.actor_id rma.actor_host rma.actor_node;
let rmn = (try Hashtbl.find nodes rma.actor_node
@@ -130,8 +126,6 @@ and client server_name =
start ac (fun() -> sender o);
hst;;
-let functions = Hashtbl.create 97
-
let rec host_actor() =
react ca
and ca m =
@@ -159,32 +153,49 @@ let reacting a g =
match a.actor_location with
| Local lac ->
let rec reacting_aux() =
- mutex_lock lac.mutex;
+ mutex_lock lac.w_mutex;
try
let m = My_queue.take lac.mailbox in
try
- mutex_unlock lac.mutex; g m
+ mutex_unlock lac.w_mutex; g m;
with
| React f -> schedule_receive a f;
| NotHandled -> begin reacting_aux();
- mutex_lock lac.mutex;
+ mutex_lock lac.w_mutex;
My_queue.push m lac.mailbox;
- mutex_unlock lac.mutex end
- with My_queue.Empty -> let a_env = Hashtbl.find actors a.actor_id in begin
- Queue.add g a_env.sleeping; mutex_unlock lac.mutex end
- in reacting_aux()
+ mutex_unlock lac.w_mutex end
+ with My_queue.Empty -> let a_env = Hashtbl.find actors a.actor_id in begin mutex_lock a_env.s_mutex;
+ Queue.add g a_env.sleeping; mutex_unlock a_env.s_mutex; mutex_unlock lac.w_mutex end
+ in begin mutex_lock lac.r_mutex; reacting_aux(); mutex_unlock lac.r_mutex end
| Remote rac -> failwith "You cannot run a remote actor";;
-let rec receive_handler() =
- (* debug "RH : number %!"; *)
+let rec receive_handler() =
+ debug "In RH :%!";
+ Printf.printf " %n %!" !nb_threads;
+ let exec (a, f) =
+ reacting a f; mutex_lock nbt_mutex; nb_threads:= !nb_threads - 1; mutex_unlock nbt_mutex in
+ let rec eval a f b =
+ mutex_lock nbt_mutex;
+ if (!nb_threads >= !nb_threadmax)
+ then if b then begin mutex_unlock nbt_mutex; Thread.delay 0.1; eval a f false end
+ else begin nb_threadmax := (!nb_threadmax) * 3 / 2;
+ mutex_unlock nbt_mutex;
+ eval a f true end
+ else
+ (incr nb_threads;
+ mutex_unlock nbt_mutex;
+ ignore (Thread.create exec (a, f));
+ debug "RH : number %d \n%!" a.actor_id) in
let cont = ref true in begin
- (try
- let (a, f) = Queue.pop receive_scheduler in
- begin (* debug "%d \n%!" a.actor_id; *)
- reacting a f; end
- with Queue.Empty -> let f a b c = c + Queue.length b.sleeping in
- let att = Hashtbl.fold f actors 0 in
- if att = 0 then begin debug "\n Finex.\n%!"; cont := false end
- else begin Thread.delay 0.0001;
- (* debug " En attente : %d \n%!" (Hashtbl.fold f actors 0) end*) end);
+ (try
+ mutex_lock rs_mutex;
+ let (a, f) = Queue.pop receive_scheduler in
+ mutex_unlock rs_mutex;
+ eval a f true;
+ with Queue.Empty -> mutex_unlock rs_mutex;
+ let f a b c = c + Queue.length b.sleeping in
+ let att = Hashtbl.fold f actors 0 in
+ if (att = 0 && !nb_threads = 0) then begin Thread.delay 0.001; debug "\n Finex.\n%!"; cont := false end
+ else begin debug "Attente : %n%!" att; Thread.delay 0.001;
+ (* debug " En attente : %d \n%!" (Hashtbl.fold f actors 0) end*) end);
if (!cont) then receive_handler() end;;
View
8 client.ml
@@ -29,13 +29,6 @@ let rec irc_talk rem =
send rem ("post", [S s; S pseudo]);
irc_talk rem in
-(* let rec funtest rem = *)
-(* let s = input_line (in_channel_of_descr stdin) in *)
-(* Printf.printf "You Wrote : %s \n%!" s; *)
-(* print_actor rem; *)
-(* send rem ("post", [S s]); *)
-(* funtest rem in *)
-
let irc_connect() =
let rec display() =
@@ -49,7 +42,6 @@ let irc_connect() =
match m with
| ("connected", (Actor a) :: q) -> Printf.printf "Connected ! \n%!";
let _ = Thread.create irc_talk a in
- (* let _ = Thread.create funtest a in Printf.printf "fdvfezgfdzf\n%!"; *)
display()
| _ -> react wait_validation
View
117 server.ml
@@ -11,11 +11,11 @@ and bj m =
| ("bonjour", [S s]) -> Printf.printf "%s\n%!" s; bonjour [];
| _ -> Printf.printf "Wrong Message"; bonjour [];;
-let treat_connect i o client =
+let treat_connect i o clientset =
let s = input_value i in
- (* mutex_lock n_mutex; *)
+ mutex_lock n_mutex;
if (Hashtbl.mem nodes s) then begin Printf.printf "This node already exists\n%!"; close_out o;
- (* mutex_unlock n_mutex *) end
+ mutex_unlock n_mutex end
else begin output_value o local_node;
flush o;
let ac = create() in
@@ -25,38 +25,36 @@ let treat_connect i o client =
start ac (fun() -> sender o);
actors_display();
nodes_display();
- Printf.printf "Noeud distant : %n\n%!" hst.name end;;
-(* mutex_unlock n_mutex;; *)
+ Printf.printf "Noeud distant : %n\n%!" hst.name end;
+ mutex_unlock n_mutex;;
let rec restart_on_EINTR f x =
- try f x with Unix_error (EINTR, _, _) -> restart_on_EINTR f x;;
+ try f x with Unix.Unix_error (Unix.EINTR, _, _) -> restart_on_EINTR f x;;
let install_tcp_server_socket addr =
- let s = socket PF_INET SOCK_STREAM 0 in
+ let s = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
try
- bind s addr;
- listen s 10;
+ Unix.bind s addr;
+ Unix.listen s 10;
s
- with z -> close s; raise z;;
+ with z -> Unix.close s; raise z;;
let tcp_server treat_connection addr =
let rec run s =
- let client = restart_on_EINTR accept s in
- let _ = Thread.create treat_connection client in
+ let clientset = restart_on_EINTR Unix.accept s in
+ let _ = Thread.create treat_connection clientset in
run s in
- ignore (signal sigpipe Signal_ignore);
+ ignore (Sys.signal Sys.sigpipe Sys.Signal_ignore);
let server_sock = install_tcp_server_socket addr in
run server_sock;;
let server () =
- (* Random.init (int_of_float (Unix.time())); *)
- (* local_node := (!local_node) ^ string_of_int (Random.int 1024); *)
Printf.printf "Local Node : %n\n%!" local_node;
let port = 4242 in
Printf.printf "%s\n%!" (string_of_inet_addr ((gethostbyname(gethostname())).h_addr_list.(0)));
(* let host = (gethostbyname(gethostname())).h_addr_list.(0) in *)
(* let host = (gethostbyname "127.0.0.1").h_addr_list.(0) in *)
- let host = (gethostbyname "193.55.250.242").h_addr_list.(0) in
+ let host = (gethostbyname "193.55.250.232").h_addr_list.(0) in
let addr = ADDR_INET (host, port) in
let treat (client_sock, client_addr as client) =
(* log information *)
@@ -70,8 +68,8 @@ let server () =
treat_connect (in_channel_of_descr client_sock) (out_channel_of_descr client_sock) client in
Thread.create (tcp_server treat) addr;;
-let _ = handle_unix_error server () in
-Hashtbl.add functions "bonjour" bonjour ;
+(* let _ = handle_unix_error server () in *)
+Hashtbl.add functions "bonjour" bonjour ;;
let rec pipong l =
let gs ar m =
@@ -83,85 +81,4 @@ let rec pipong l =
| ( _ , (s, _)) -> Printf.printf "pas bon : %s\n%!" s; pipong [ar] in
react (gs (List.hd l)) in
-Hashtbl.add functions "pipong" pipong ;
-
-let irc_mutex = Mutex.create() in
-let irc_on = ref false in
-let irc_act = ref creator in
-
-let irc_connections l =
- let connec = Hashtbl.create 13 in
-
- (* let users_display() = *)
- (* Printf.printf "Users : "; *)
- (* let f a b c = Printf.printf "%s; %!" a; c in *)
- (* Hashtbl.fold f connec (); *)
- (* Printf.printf "\n%!" in *)
-
- let ac = create() in
- Mutex.unlock irc_mutex;
- Hashtbl.add connec "server" ("server", ac);
- let rec irc_server() =
- (* users_display(); *)
- let ircm (h, l) =
- let spread a b =
- match b with
- | ("server", _ ) -> (match l with
- | [S s; S g] -> Printf.printf "<%s> : %s \n%!" g s
- | _ -> failwith "Wrong Message Type Error in irc_connections");
- | ( s , a ) -> (* Printf.printf "Talking to : %s, " s; print_actor a; *) send a ("post", l) in
- match (h, l) with
- | ("post", [S s; S st]) -> Hashtbl.iter spread connec; irc_server();
- (* | ("post", [S s; S st]) -> let (nam, orig) = Hashtbl.find connec st in *)
- (* send orig ("post", [S s; S st]); irc_server(); *)
- | _ -> irc_server() in
- react ircm in
- start ac irc_server;
- let rec connexion_handler() =
- let connexion_request m =
-
- (* let send_connected a b lst = (\*Creates a list containing all the people connected*\) *)
- (* match b with *)
- (* | (s, a) -> (S s) :: (Actor a) :: lst in *)
-
- match m with
- | ("join", [S s; Actor a]) -> (* let lis = Hashtbl.fold send_connected connec [] in *)
- Hashtbl.add connec s (s, a);
- (* send a ("connected", lis); *)
- send a ("connected", [Actor ac]);
- connexion_handler();
- | _ -> Printf.printf "Wrong Message in connexion_request\n%!";
- connexion_handler() in
- react connexion_request in
- connexion_handler() in
-
-Hashtbl.add functions "irc_connections" irc_connections;
-
-let transit_act = create() in
-
-let trans a =
- let tran m =
- match m with
- | ("retour", [S s; Actor ac]) -> irc_act:= ac; send a ("retour", [S s; Actor ac]);
- | _ -> () in
- react tran in
-
-let irc q =
- Mutex.lock irc_mutex;
- match q with
- | [Actor ac] -> Printf.printf "Irc_on : %b\n%!" (!irc_on);
- if (!irc_on = false) then begin
- irc_on := true;
- start transit_act (fun ()-> trans ac);
- send creator ("start", [S "irc_connections"; Actor transit_act]) end
- else begin
- print_actor (!irc_act);
- send ac ("retour", [S "irc_connections"; Actor (!irc_act)]);
- Mutex.unlock irc_mutex end
- | _ -> Mutex.unlock irc_mutex in
-
-Hashtbl.add functions "irc" irc;
-
-start creator host_actor;
-actors_display();
-receive_handler();;
+Hashtbl.add functions "pipong" pipong;;
View
39 test.ml
@@ -1,40 +1,45 @@
open ActorsType
+open ActorsGlobal
open Actorssg
let ping_pong() =
let act1 = create() in
- (* let act2 = create() in *)
+ let act2 = create() in
let rec ping() =
react pig
and pig m =
- let (s, l) = m in
- if (s = "ping") then begin print_string "ping";
+ let (s, l) = m in
+ if (s = "ping") then begin Printf.printf "ping";
(match l with
| (Actor a) :: (I i) :: q -> Printf.printf " : %d\n%!" i;
send a ("pong", (Actor act1) :: (I (i + 1)) :: []);
| _ -> raise NotHandled) ;
- ping() end in
+ ping() end
+ else raise NotHandled
+ in
let rec pong() =
react pog
and pog m =
- let (s, l) = m in
- if (s = "pong") then begin print_string "pong";
+ let (s, l) = m in
+ if (s = "pong") then begin Printf.printf "pong";
(match l with
- | (Actor a) :: (I i) :: q -> Printf.printf " : %d\n%!" i;
- (* send a ("ping", (Actor act2) :: (I (i + 1)) :: []); *)
- send a ("ping", (Actor act1) :: (I (i + 1)) :: []);
+ | (Actor a) :: (I i) :: q -> Printf.printf " : %n\n%!" i;
+ (* Printf.printf "Threads : %n\n%!" (!nb_threads); *)
+ send a ("ping", (Actor act2) :: (I (i + 1)) :: []);
+ (* send a ("ping", (Actor act1) :: (I (i + 1)) :: []); *)
| _ -> raise NotHandled) ;
pong() end
+ else raise NotHandled
in begin
start act1 ping;
- (* start act2 pong; *)
- start act1 pong;
- (* send act1 ("ping", (Actor act2) :: (I 1) :: []); *)
- send act1 ("ping", (Actor act1) :: (I 1) :: []);
+ start act2 pong;
+ (* start act1 pong; *)
+ send act1 ("ping", (Actor act2) :: (I 1) :: []);
+ (* send act1 ("ping", (Actor act1) :: (I 1) :: []); *)
receive_handler();
end;;
-ping_pong();;
+(* ping_pong();; *)
let calcul_pi n s =
let t = Sys.time() in
@@ -62,7 +67,7 @@ let calcul_pi n s =
let (str, l) = m in
match l with
| (F r) :: q -> res := (!res) +. r;
- if (!compteur = s) then Printf.printf "*** %f ***\n %!" (4. *. !res /. float_of_int n)
+ if (!compteur = s) then Printf.printf "*** Actors : %f ***\n %!" (4. *. !res /. float_of_int n)
else begin debug "Recu %d%!" (!compteur);
incr compteur;
act() end;
@@ -80,7 +85,7 @@ let calcul_pi n s =
receive_handler();
Printf.printf "*** Time : %f ***\n %!" (Sys.time() -. t);;
-(* calcul_pi 30000000 1;; *)
+calcul_pi 30000000 100;;
let calcul_picpu n =
let t = Sys.time() in
@@ -92,4 +97,4 @@ let calcul_picpu n =
Printf.printf "*** CPU : %f ***\n%!" (4. *. !res /. float_of_int n);
Printf.printf "*** Time : %f ***\n %!" (Sys.time() -. t);;
-(* calcul_picpu 30000000;; *)
+calcul_picpu 30000000;;
Please sign in to comment.
Something went wrong with that request. Please try again.