Browse files

Changement de la gestion des threads (variables de condition)

  • Loading branch information...
1 parent 9eb7e8d commit a899624d349154f873e6d4f97e6def1c5b436704 @SylvainGBR committed Jun 21, 2012
Showing with 107 additions and 36 deletions.
  1. +6 −1 actorsGlobal.ml
  2. +94 −28 actorssg.ml
  3. +7 −7 test.ml
View
7 actorsGlobal.ml
@@ -48,5 +48,10 @@ let rs_mutex = Mutex.create()
let (functions : (string, arg list -> unit) Hashtbl.t) = Hashtbl.create 97
let nb_threads = ref 0
-let nb_threadmax = ref 4
+let active_threads = ref 0
+let nb_threadmax = 5
let nbt_mutex = Mutex.create()
+
+let receive_cond = Condition.create();;
+let rc_mutex = Mutex.create();;
+
View
122 actorssg.ml
@@ -169,33 +169,99 @@ let reacting a g =
in begin mutex_lock lac.r_mutex; reacting_aux(); mutex_unlock lac.r_mutex end
| Remote rac -> failwith "You cannot run a remote actor";;
-let rec receive_handler() =
- debug "In RH :%!";
- Printf.printf " %n %!" !nb_threads;
- let exec (a, f) =
- reacting a f; mutex_lock nbt_mutex; nb_threads:= !nb_threads - 1; mutex_unlock nbt_mutex in
- let rec eval a f b =
- mutex_lock nbt_mutex;
- if (!nb_threads >= !nb_threadmax)
- then if b then begin mutex_unlock nbt_mutex; Thread.delay 0.1; eval a f false end
- else begin nb_threadmax := (!nb_threadmax) * 3 / 2;
- mutex_unlock nbt_mutex;
- eval a f true end
- else
- (incr nb_threads;
- mutex_unlock nbt_mutex;
- ignore (Thread.create exec (a, f));
- debug "RH : number %d \n%!" a.actor_id) in
- let cont = ref true in begin
- (try
- mutex_lock rs_mutex;
+let receive_cond = Condition.create();;
+let rc_mutex = Mutex.create();;
+
+let rec receive_util() =
+ let say_wake_up() = mutex_lock rc_mutex;
+ mutex_lock nbt_mutex;
+ (* Printf.printf "Sierra : (%n %n))\n%!" !nb_threads !active_threads; *)
+ mutex_unlock nbt_mutex;
+ Condition.signal receive_cond in
+ let go_to_bed() =
+ mutex_lock nbt_mutex;
+ active_threads := !active_threads - 1;
+ (* Printf.printf "Whiskey : (%n %n)\n%!" !nb_threads !active_threads; *)
+ mutex_unlock nbt_mutex;
+ Condition.wait receive_cond rc_mutex in
+ let wake_up() = mutex_lock nbt_mutex;
+ incr active_threads;
+ (* Printf.printf "Kilo : (%n %n))\n%!" !nb_threads !active_threads; *)
+ mutex_unlock nbt_mutex;
+ mutex_unlock rc_mutex in
+ (try
+ begin mutex_lock rs_mutex;
let (a, f) = Queue.pop receive_scheduler in
mutex_unlock rs_mutex;
- eval a f true;
- with Queue.Empty -> mutex_unlock rs_mutex;
- let f a b c = c + Queue.length b.sleeping in
- let att = Hashtbl.fold f actors 0 in
- if (att = 0 && !nb_threads = 0) then begin Thread.delay 0.001; debug "\n Finex.\n%!"; cont := false end
- else begin debug "Attente : %n%!" att; Thread.delay 0.001;
- (* debug " En attente : %d \n%!" (Hashtbl.fold f actors 0) end*) end);
- if (!cont) then receive_handler() end;;
+ reacting a f;
+ end
+ with Queue.Empty -> mutex_unlock rs_mutex;
+ mutex_lock nbt_mutex;
+ if (!nb_threads * 2 <= !active_threads * 3 || (!active_threads < 2)) then begin
+ mutex_unlock nbt_mutex;
+ say_wake_up();
+ go_to_bed();
+ wake_up() end
+ else begin nb_threads:= !nb_threads - 1;
+ active_threads := !active_threads - 1;
+ (* Printf.printf "Charlie : (%n %n))\n%!" !nb_threads !active_threads; *)
+ mutex_unlock nbt_mutex;
+ say_wake_up();
+ mutex_unlock rc_mutex;
+ Thread.exit() end);
+ receive_util();;
+
+let receive_handler() =
+ let rec update() =
+ mutex_lock nbt_mutex;
+ debug "Threads : (%n)\n%!" !nb_threads;
+ debug "Active : (%n)\n%!" !active_threads;
+ if ((!nb_threads * 2 >= !active_threads * 3 && (!nb_threads > 1))||(!nb_threads >= nb_threadmax)) then begin mutex_unlock nbt_mutex;
+ Thread.delay 0.5; mutex_lock nbt_mutex end;
+ if (!nb_threads < nb_threadmax) then
+ (let _ = Thread.create receive_util () in begin
+ incr active_threads;
+ incr nb_threads;
+ debug "Alpha : (%n %n))\n%!" !nb_threads !active_threads end);
+ mutex_unlock nbt_mutex;
+ Thread.delay 0.01;
+ update() in
+ mutex_lock nbt_mutex;
+ for i = 0 to 1 do
+ ignore (Thread.create receive_util ());
+ nb_threads:= !nb_threads + 1;
+ incr active_threads;
+ done;
+ mutex_unlock nbt_mutex;
+ update();;
+
+(* let rec receive_handler() = *)
+(* debug "In RH :%!"; *)
+(* Printf.printf " %n %!" !nb_threads; *)
+(* let exec (a, f) = *)
+(* reacting a f; mutex_lock nbt_mutex; nb_threads:= !nb_threads - 1; mutex_unlock nbt_mutex in *)
+(* let rec eval a f b = *)
+(* mutex_lock nbt_mutex; *)
+(* if (!nb_threads >= !nb_threadmax) *)
+(* then if b then begin mutex_unlock nbt_mutex; Thread.delay 0.1; eval a f false end *)
+(* else begin nb_threadmax := (!nb_threadmax) * 3 / 2; *)
+(* mutex_unlock nbt_mutex; *)
+(* eval a f true end *)
+(* else *)
+(* (incr nb_threads; *)
+(* mutex_unlock nbt_mutex; *)
+(* ignore (Thread.create exec (a, f)); *)
+(* debug "RH : number %d \n%!" a.actor_id) in *)
+(* let cont = ref true in begin *)
+(* (try *)
+(* mutex_lock rs_mutex; *)
+(* let (a, f) = Queue.pop receive_scheduler in *)
+(* mutex_unlock rs_mutex; *)
+(* eval a f true; *)
+(* with Queue.Empty -> mutex_unlock rs_mutex; *)
+(* let f a b c = c + Queue.length b.sleeping in *)
+(* let att = Hashtbl.fold f actors 0 in *)
+(* if (att = 0 && !nb_threads = 0) then begin Thread.delay 0.001; debug "\n Finex.\n%!"; cont := false end *)
+(* else begin debug "Attente : %n%!" att; Thread.delay 0.001; *)
+(* (\* debug " En attente : %d \n%!" (Hashtbl.fold f actors 0) end*\) end); *)
+(* if (!cont) then receive_handler() end;; *)
View
14 test.ml
@@ -9,7 +9,7 @@ let ping_pong() =
react pig
and pig m =
let (s, l) = m in
- if (s = "ping") then begin Printf.printf "ping";
+ if (s = "ping") then begin Printf.printf "ping";
(match l with
| (Actor a) :: (I i) :: q -> Printf.printf " : %d\n%!" i;
send a ("pong", (Actor act1) :: (I (i + 1)) :: []);
@@ -21,7 +21,7 @@ let ping_pong() =
react pog
and pog m =
let (s, l) = m in
- if (s = "pong") then begin Printf.printf "pong";
+ if (s = "pong") then begin Printf.printf "pong";
(match l with
| (Actor a) :: (I i) :: q -> Printf.printf " : %n\n%!" i;
(* Printf.printf "Threads : %n\n%!" (!nb_threads); *)
@@ -67,7 +67,8 @@ let calcul_pi n s =
let (str, l) = m in
match l with
| (F r) :: q -> res := (!res) +. r;
- if (!compteur = s) then Printf.printf "*** Actors : %f ***\n %!" (4. *. !res /. float_of_int n)
+ if (!compteur = s) then begin Printf.printf "*** Actors : %f ***\n %!" (4. *. !res /. float_of_int n);
+ Printf.printf "*** Time : %f ***\n %!" (Sys.time() -. t) end
else begin debug "Recu %d%!" (!compteur);
incr compteur;
act() end;
@@ -82,10 +83,7 @@ let calcul_pi n s =
debug "Sent pi_request to %d \n %!" k
end
done;
- receive_handler();
- Printf.printf "*** Time : %f ***\n %!" (Sys.time() -. t);;
-
-calcul_pi 30000000 100;;
+ receive_handler();;
let calcul_picpu n =
let t = Sys.time() in
@@ -98,3 +96,5 @@ let calcul_picpu n =
Printf.printf "*** Time : %f ***\n %!" (Sys.time() -. t);;
calcul_picpu 30000000;;
+calcul_pi 30000000 100;;
+

0 comments on commit a899624

Please sign in to comment.