# Übungsblatt 14

**Lernziele**

In den Übungen dieser Wochen lernen Sie:
* Nebenläufiges Programmieren mit den Modulen `Thread` und `Event`.
* Kanäle zur Kommunikation zwischen Threads einzusetzen.
* Einfache Algorithmen zu Parallelisieren.

### Threads

In utop können Threads mit `utop -I +threads` verwendet werden. Dann muss noch einmal `#thread;;` eingegeben werden.

In [1]:
#thread;;

Findlib has been successfully loaded. Additional directives:
  #require "package";;      to load a package
  #list;;                   to list the available packages
  #camlp4o;;                to load camlp4 (standard syntax)
  #camlp4r;;                to load camlp4 (revised syntax)
  #predicates "p,q,...";;   to set these predicates
  Topfind.reset();;         to force that packages will be reloaded
  #thread;;                 to enable threads



/home/vagrant/.opam/4.04.2/lib/ocaml/threads: added to search path
/home/vagrant/.opam/4.04.2/lib/ocaml/unix.cma: loaded
/home/vagrant/.opam/4.04.2/lib/ocaml/threads/threads.cma: loaded


No such package: camlp4


Hierdurch erhalten wir das `Thread` Modul, welches uns alle nötigen Funktionen im Umgang mit Threads zur Verfügung stellt. Im Folgenden stehen die wichtigsten Funktionen, die wir für benötigen.

In [2]:
Thread.create                           (* Erstellt einen neuen Thread *)

- : ('a -> 'b) -> 'a -> Thread.t = <fun>


In [3]:
Thread.join                             (* Joint dem gegebenen Thread *)

- : Thread.t -> unit = <fun>


In [4]:
Thread.self                             (* Gibt den aktuellen Thread zurück *)

- : unit -> Thread.t = <fun>


In [5]:
Thread.id                               (* Wandelt einen Thread ID in einen Integer um *)

- : Thread.t -> int = <fun>


In [6]:
Thread.delay                            (* Wartet die gegebene Anzahl Sekunden *)

- : float -> unit = <fun>


Hiermit lässt sich ein einfaches Beispiel konstruieren.

In [7]:
let print_thread msg =
  let thread_id = Thread.(id (self ())) in
    Printf.printf "Thread %d: %s\n%!" thread_id msg

val print_thread : string -> unit = <fun>


In [8]:
let create_thread () =
  let f () = Thread.delay (Random.float 1.); print_thread "Hallo :)" in
    Thread.create f ()

val create_thread : unit -> Thread.t = <fun>


Die `create_thread` Methode erstellt einen Thread, der nach einem zufälligen Zeitintervall die Methode `print_hello` aufruft, welche die ID des aktuellen Threads ausgibt. Wir erzeugen nun zwei Threads und nachdem diese terminiert sind, rufen wir `print_hello` noch aus dem Hauptthread auf.

In [9]:
let () =
  let t1 = create_thread () in
  let t2 = create_thread () in
    Thread.join t1;
    Thread.join t2;
    print_thread "Main thread"

Thread 1: Hallo :)
Thread 2: Hallo :)
Thread 0: Main thread


### Channels

Die Kommunikation zwischen verschiedenen Threads kann über sogenannte Channels erfolgen. In dem `Event` Modul finden wir die zugehörigen Typen und Methoden.

In [10]:
Event.new_channel                       (* Erzeugt einen neuen Channel *)

- : unit -> 'a Event.channel = <fun>


In [11]:
Event.send                              (* Schickt den gegebenen Wert über den Channel (non-blocking) *)

- : 'a Event.channel -> 'a -> unit Event.event = <fun>


In [12]:
Event.receive                           (* Empfängt ein Event vom Channel (non-blocking) *)

- : 'a Event.channel -> 'a Event.event = <fun>


In [13]:
Event.sync                              (* Wandelt ein Event in einen tatsächlichen Wert um (blocking) *)

- : 'a Event.event -> 'a = <fun>


Es gibt noch weitere fortgeschrittene Funktionen im Event Modul, die eventuell benötigt werden, wenn mehrere Events verarbeitet werden sollen.

In [14]:
Event.wrap                              (* Wendet eine Funktion auf die Werte im Event an *)

- : 'a Event.event -> ('a -> 'b) -> 'b Event.event = <fun>


In [15]:
Event.choose                            (* Wählt ein Event aus der Event-Liste aus (non-blocking) *)

- : 'a Event.event list -> 'a Event.event = <fun>


In [16]:
Event.select                            (* Wählt ein Event aus und synchronisiert direkt (blocking) *)

- : 'a Event.event list -> 'a = <fun>


Im Folgenden wieder ein Beispiel zu Channels.

In [17]:
let channel = Event.new_channel ()

val channel : '_a Event.channel = <abstr>


In [18]:
let send_string x =
  let f () =
    print_thread "sending to channel ...";
    Event.(sync (send channel x));
    print_thread ("sent '" ^ x ^ "' to channel")
  in
    Thread.create f ()

val send_string : string -> Thread.t = <fun>


In [19]:
let receive_string () =
  let f () =
    print_thread "receiving from channel ...";
    let x = Event.(sync (receive channel)) in
    print_thread ("received '" ^ x ^ "' from channel")
  in
    Thread.create f ()

val receive_string : unit -> Thread.t = <fun>


Die Methoden zum Lesen und Schreiben der Channels können wir jetzt verwenden.

In [20]:
send_string "abc"

- : Thread.t = <abstr>


In [21]:
send_string "def"

Thread 3: sending to channel ...


- : Thread.t = <abstr>


In [22]:
receive_string ()

- : Thread.t = <abstr>


In [23]:
receive_string ()

Thread 4: sending to channel ...
Thread 5: receiving from channel ...
Thread 5: received 'abc' from channel
Thread 3: sent 'abc' to channel


- : Thread.t = <abstr>


In [24]:
receive_string ()

Thread 6: receiving from channel ...
Thread 6: received 'def' from channel
Thread 4: sent 'def' to channel


- : Thread.t = <abstr>


In [25]:
send_string "ghi"

Thread 7: receiving from channel ...


- : Thread.t = <abstr>


## Aufgabe 14.1 (P) Future

Als vordefinierte Hilfsfunktion verwenden wir `forever` aus den Folien. Diese Methode bekommt eine Funktion `f` übergeben, die in einem separatem Thread durchgängig ausgeführt wird.

In [26]:
let forever f init =
  let rec loop x = loop (f x) in
    Thread.create loop init

Thread 8: sending to channel ...
Thread 8: sent 'ghi' to channel
Thread 7: received 'ghi' from channel


val forever : ('a -> 'a) -> 'a -> Thread.t = <fun>


In [27]:
(* let t = forever (fun () -> Thread.delay 1.; Printf.printf "Hallo\n%!") () *)

In [28]:
module Future :
  sig
    type 'a t
    val create : ('a -> 'b) -> 'a -> 'b t
    val get : 'a t -> 'a
  end
  =
  struct
    open Event
  
    type 'a t = 'a channel
    
    let create f a =
      let c = new_channel () in
      let task () =
         let b = f a in
         forever (fun () -> sync (send c b)) ()
      in
        ignore (Thread.create task ());
        c
    
    let get c = sync (receive c)
  end

module Future :
  sig
    type 'a t
    val create : ('a -> 'b) -> 'a -> 'b t
    val get : 'a t -> 'a
  end


Dieses Modul können wir jetzt folgendermaßen verwenden:

In [29]:
let f = Future.create (fun x -> Thread.delay 10.; 2 * x) 4

val f : int Future.t = <abstr>


In [30]:
Future.get f

- : int = 8


In [31]:
Future.get f

- : int = 8


## Aufgabe 14.2 (P) Parallel map

In [32]:
let comp2 f g x y = f (g x) (g y)
let compareBy f = comp2 compare f

val comp2 : ('a -> 'a -> 'b) -> ('c -> 'a) -> 'c -> 'c -> 'b = <fun>


val compareBy : ('a -> 'b) -> 'a -> 'a -> int = <fun>


In [33]:
module Parallel :
  sig
    val map : ('a -> 'b) -> 'a list -> 'b list
  end
  =
  struct
    let map f xs =
      let t f x =
        let c = Event.new_channel () in
        let _ = Thread.create (fun x -> Event.(sync (send c (f x)))) x in
        Event.receive c
      in
      let fs = List.mapi (fun i x -> Event.wrap (t f x) (fun y -> i,y)) xs in
      (* call select for each element... *)
      let ys = List.map (fun _ -> Event.select fs) fs in
      List.sort (compareBy fst) ys |> List.map snd
  end

module Parallel : sig val map : ('a -> 'b) -> 'a list -> 'b list end


## Aufgabe 14.3 (P) Server

In [34]:
module Server :
  sig
    type ('a, 'b) t
    val serve : ('a -> 'b) -> ('a, 'b) t
    val request : ('a, 'b) t -> 'a -> 'b
  end
  =
  struct
    open Event
  
    type ('a, 'b) t = ('a * 'b channel) channel
    
    let serve f =
      let s = new_channel () in
      let task () =
        let (a, r) = sync (receive s) in
        ignore (Thread.create (fun () -> sync (send r (f a))) ())
      in
        ignore (forever task ());
        s
    
    let request s a =
      let r = new_channel () in
        sync (send s (a, r));
        sync (receive r)

  end

module Server :
  sig
    type ('a, 'b) t
    val serve : ('a -> 'b) -> ('a, 'b) t
    val request : ('a, 'b) t -> 'a -> 'b
  end


Jetzt können wir einen Server starten und dann Anfragen an ihn stellen. Unser Server soll die Ackermannfunktion berechnen.

In [35]:
let rec ackermann = function
    (0, m) -> m + 1
  | (n, 0) -> ackermann (n - 1, 1)
  | (n, m) -> ackermann (n - 1, ackermann (n, m - 1))

val ackermann : int * int -> int = <fun>


In [36]:
let server = Server.serve ackermann

val server : (int * int, int) Server.t = <abstr>


In [37]:
Server.request server (1, 2)

- : int = 4


In [38]:
Server.request server (0, 1)

- : int = 2
