## Proxy

| Field | Content |
| --- | --- |
| Class | Proxy |
| Collaborator | Method Request, Scheduler, Future |
| Responsibility | Defines the active object's interface to clients; Creates Method Requests; Runs in the client's thread |

## Method Request

| Field | Content |
| --- | --- |
| Class | Method Request |
| Collaborator | Servant, Future |
| Responsibility | Represents a method call on the active object; Provides guards to check when the method request becomes runnable |

## Concrete Method Request

| Field | Content |
| --- | --- |
| Class | Concrete Method Request |
| Collaborator | Servant, Future |
| Responsibility | Implements the representation of a specific method call; Implements guards |

## Active Object Collaboration Diagram

```mermaid
classDiagram
  direction LR

  class Client
  class Proxy {
    method_1()
    method_N()
  }
  class Scheduler {
    dispatch()
    insert()
  }
  class ActivationQueue {
    enqueue()
    dequeue()
  }
  class MethodRequest {
    can_run()
    call()
  }
  class ConcreteMethodRequest1
  class ConcreteMethodRequestN
  class Servant {
    method_1()
    method_N()
  }
  class Future

  Client ..> Proxy : invoke
  Proxy "1" -- "1" Scheduler : schedule
  Scheduler "1" -- "1" ActivationQueue : owns
  Proxy ..> Future : create
  Proxy ..> MethodRequest : create
  ActivationQueue "1" --> "*" MethodRequest : maintain
  MethodRequest --> Servant : execute
  MethodRequest --> Future : write to
  Client --> Future : obtain result from
  MethodRequest <|-- ConcreteMethodRequest1 : inherits
  MethodRequest <|-- ConcreteMethodRequestN : inherits
```


## Active Object Sequence Diagram

```mermaid
sequenceDiagram
  participant Client
  participant Proxy
  participant Future
  participant Scheduler
  participant ActivationList
  participant MethodRequest
  participant Servant

  Client->>Proxy: method()
  Proxy->>Future: create
  Proxy->>MethodRequest: create
  Proxy->>Scheduler: insert()
  Scheduler->>ActivationList: insert()
  Scheduler-->>Proxy: MethodRequest
  Proxy-->>Client: Future

  Scheduler->>ActivationList: dispatch()
  ActivationList-->>Scheduler: remove()
  Scheduler->>MethodRequest: can_run()
  MethodRequest->>Servant: call()
  Servant->>Servant: method()
  Servant-->>MethodRequest: Result
  MethodRequest-->>Future: write to future

  Client->>Future: read from future
  Future-->>Client: Result
```


Come si vede qui il client proxy (che gira in un thread) richiama un metodo sincronizzato. Questo metodo sincronizzato:
1. crea un `MethodRequest` (simile a un command)
2. chiama il metodo `insert()`

Il MethodRequest è una classe Facade che verrà poi specializzata dai ConcreteRequestMethod col solito giochetto puntatore padre = new puntatore figlio. Lo scheduler è in un altro thread, è parte del ActiveObject. Monitora col thread ed esegue. Esegue dispacciando la richiesta al servant. 

## Implementazione
Implementiamo un `ActiveObject` per sincronizzare un MQServant che farà semplicemente un'operazione lenta.

### Dipendenze

Poiché stiamo parlando di thread usiamo il supporto C++ al threading.

In [11]:
#include <iostream>
#include <string>
#include <thread>

using namespace std::chrono_literals;


In [12]:

static void log_line(const std::string& s) {
  std::cout << "[" << std::this_thread::get_id() << "] " << s << "\n";
}


In [13]:

  std::cout << "Hello, World of concurrency\n";


Hello, World of concurrency


In [14]:
  const std::string s = "This is a log message from the main thread";
  log_line(s);


[133606501812800] This is a log message from the main thread


Qui lancio in un altro thread e mi aspetto un TID diverso.

In [15]:

  const std::string s = "This is a log message from thread 1";
  std::thread t1(log_line, s);
  t1.join();



[133606342850240] This is a log message from thread 1


### Attore 1: `MQ_Servant`

Questo implementa le funzionalità core da sincronizzare. Qui dovremmo usare la solita tecnica corretta del C++ con la *Rule of 5* ma non lo farò... Andrò diretto al cuore della classe.
Ho 2 predicati che indicano 3 stati:
```mermaid
stateDiagram-v2
    [*] --> Empty

    Empty --> Neither: put() / not full()
    Neither --> Full: put() / becomes full
    Neither --> Neither: put() / still not full

    Full --> Neither: get() / not empty()
    Neither --> Empty: get() / becomes empty
    Neither --> Neither: get() / still not empty

    Empty --> Empty: get() blocked by empty()
    Full --> Full: put() blocked by full()

```

In [None]:
class MQ_Servant{
    public:
        MQ_Servant(size_t mq_size) : mq_size_(mq_size) {}
        ~MQ_Servant() = default;
        
    // Message Queue implementation
    void put(const Message& msg);
    Message get();

    // Predicates
    bool is_full() const;
    bool is_empty() const;

    private:
        std::queue<Message> mq_;
        size_t mq_size_;

}

### Actor 2: Invocation infrastructure

Qui scriviamo tutto il circo necessario al client per eseguire il ActiveObject methods.

#### Proxy

Interfaccia verso il servant. Se tiserve un metodo, lo trovi nerl proxy. Il metodo del proxy può esser econsiderato una *closure* del metodo del Servant. Nel contesto io ci metto:
1. I parametri del metodo da eseguire
2. binding al metodo del servant (vanno agganciati...)
3. il `future` in cui appoggiare il risultato (verrà interperllato dal client)
4. la scaffalatura di codice che eseguirà il `MethodRequest`

##### Implemento `MQ_Proxy`

Questo non è altro che un `Factory` che costruisce istanze dei method request e li passa allo scehduler.


In [None]:
class MQ_Proxy{
    public:
        enum {
            MQ_SIZE = 10
        };
        // I pass size and let proxy own everything!!
        MQ_Proxy(size_t size = MQ_SIZE) : scheduler_(size), servant_(size) {}

        //Schedule <put> to execute on the active object
        void put(const Message& msg) {
            //This is the third line in the graph
            MethodRequest* mr = new Put(servant_, msg);
            //This is the fourth line in the graph
            scheduler_.insert(mr);
        }

        //Return a <Message_Future> as the "future" result of the asynchronous 
        //<get> method call on the active object
        Message_Future get() {
            // Lo uso per metterci il risultato del get
            Message_Future result;
            // Get uses the result which is injected and managed by function
            MethodRequest* mr = new Get(servant_, result);
            // mr is passed as a pointer onto the heap. When I exit the function, 
            // it is owned by scheduler
            scheduler_.insert(mr);
            // Return by copy
            return result;
        }

        bool is_full() const {
            return servant_.is_full();
        }
        bool is_empty() const {
            return servant_.is_empty();
        }

        // Private state holders injected via constructor injection
        private:
            MQ_Scheduler scheduler_;
            MQ_Servant servant_;
}

#### Implemento `method request`

Questi sono dei command object [[GoF95](https://up.curiousprogrammer.dev/docs/skills/system-design/design-patterns/gang-of-four/behavioral-patterns/command/)]. Disaccoppio il scheduler dai dettagli interni di come fare la `MQ_Servant.get()`, ci penserà lui. Tipicamente poi provvediamo un `can_run()` guard method per proteggere l'esecuzione ed un `call()` method usato come hook per metterci il metodo eseguito poi dal servant (eg. tramite delegate o lambda).

 Essendo una interfaccia, andrà poi concretizzata, le classi figlio implementano il command tipizzato sulla GET o la PUT.

In [None]:
class Method_Request {
public:
    // Evaluate the synchronization constraint.
    virtual bool can_run () const = 0;

    // Execute the method.
    virtual void call () = 0;
};

Essendo una interfaccia, andrà poi concretizzata, le classi figlio implementano il command tipizzato sulla GET o la PUT. Ecco un esempio:

In [None]:
class Get : public Method_Request {
    public:
        // I took out const otherwise is ill-typed
        // I pass by value a MQ_Servant
        Get(MQ_Servant* rep, Message_Future& f) : servant_(rep), result_(f) {}
        virtual bool can_run() const override {
            //Synchronization constraint: cannot call if the message queue is empty
            //Here we use the MQ_Servant's is_empty() predicate
            return !servant_->is_empty();
        }
        // keyword virtual, hooks the right call at runtime. If I use the trick 
        // of father  pointer child implementation, I need to set the good one, 
        // otherwise mr->call() will not call correct Get::call()
        virtual void call() override{
            // Bind dequeued message to the future result.
            result_ = servant_->get();
        }
    private:
        // I do not own the servant
        MQ_Servant* servant_;
        Message_Future result_;
};

```mermaid
sequenceDiagram
actor Client
participant Ref as ref: AbstractClass
participant Obj as runtime: SubClassA

Note over Ref,Obj: Same instance, shown as base-type reference + runtime subtype
Client->>Ref: templateMethod()
activate Ref
Ref->>Obj: primitive1() (runtime binding)
Obj-->>Ref: overridden implementation

alt hook1() overridden
  Ref->>Obj: hook1() (runtime binding)
  Obj-->>Ref: custom hook logic
else hook1() not overridden
  Ref->>Ref: hook1() default no-op
end

Ref->>Obj: primitive2() (runtime binding)
Obj-->>Ref: overridden implementation
Ref-->>Client: done
deactivate Ref

```

#### Implemento `Activation_List`

Questa ActivationList è un bounded buffer da usare secondo il pattern Producer-Consumer per garantire la sincronizzazioen (in questo caso serializzazione). 
In questo caso il `Client_Thread` gioca il ruolo del producer inserendo `Method_Reqeuest` nella `Activation_List` attraverso i metodi sincronizzati del `MQ_Proxy`. Dall'altra parte lo scheduler thread gioca il ruolo del consumer prelevando col metodo `Activation_List::remove()` i `Method_Request` e richiamando il hook del `Servant` "bindato". 

In [None]:
class Activation_List{
    public:
        enum { INFINITE = -1 };
        //define a trait
        typedef Activation_List_Iterator iterator;
        Activation_List(size_t high_water_mark);

        void insert(Method_Request* mr);
        void remove(Method_Request* mr);
    private:
        std::queue<Method_Request*> al_;
        std::mutex mtx_; //unused in this implementation, but needed for a thread-safe implementation
};

### Implemento lo `Scheduler`

Questo scheduler è un [Command Processor][Command Processor]

[Command Processor]: https://www.google.it
ciao

In [None]:
class MQ_Scheduler {
public:
   // Initialize the <Activation_List> to have
   // the specified capacity and make <MQ_Scheduler>
   // run in its own thread of control.
   MQ_Scheduler (size_t high_water_mark);
 
   // … Other constructors/destructors, etc.
 
   // Put <Method_Request> into <Activation_List>. This
   // method runs in the thread of its client, i.e.
   // in the proxy’s thread.
   void insert (Method_Request *mr) {
      act_list_.insert (mr);
   }
 
   // Dispatch the method requests on their servant
   // in its scheduler’s thread of control.
   virtual void dispatch dispatch () {
       // Iterate continuously in a separate thread.
       for (;;) {
          Activation_List::iterator request;
          // The iterator’s <begin> method blocks
          // when the <Activation_List> is empty.
          for (request = act_list_.begin ();
             request != act_list_.end ();
             ++request) {
             // Select a method request whose
             // guard evaluates to true.
             if ((*request).can_run ()) {
                // Take <request> off the list.
                act_list_.remove (*request);
                (*request).call ();
                delete *request;
             }
             // Other scheduling activities can go here,
             // e.g., to handle when no <Method_Request>s
             // in the <Activation_List> have <can_run>
             // methods that evaluate to true.
          }
       }
    }
        // Entry point into the new thread.
    void *svc_run (void *args) {
        MQ_Scheduler *this_obj =
            static_cast<MQ_Scheduler *> (args);
        
        this_obj->dispatch ();
    }
    private:
    // List of pending Method_Requests.
    Activation_List act_list_;
};

Lo scheduler esegue in un thread di controllo diverso dai client. Ricordiamoci che il client thread usa il suo proxy per inserire i MethodRequest nel activationList. Inoltre qui si usa ThreadManager che non è altro che un Facde per la parte thread. 
Ultimo punto da determinare è il rendezvous del value (risultato dell'operazione) e return value policy. In sostanza dobbiamo capire come si restituisce il valore della computazione di un metodo invocato su un active object. 
1. *synchronous waiting*: block finché non c'è un valore
2. *Synchronous TIMED wait*: aspetta un intervallo finito di tempo
3. *Asincrono*: non c'è bisogno di spiegare

Il Future mi permette di avere un meccanismo a doppia viadi comunicazione. Evita disastri acqiusendo un lock e poi scrivendo il valore del risultato. 

In [None]:
class Message_Future {
public:
   // Binds <this> and <pre> to the same <Msg._Future_Imp.>
   Message_Future (const Message_Future &f);
 
   // Initializes <Message_Future_Implementation> to
   // point to <message> m immediately.
   Message_Future (const Message &message);
 
   // Creates a <Msg._Future_Imp.>
   Message_Future ();
 
   // Binds <this> and <pre> to the same
   // <Msg._Future_Imp.>, which is created if necessary. Copy ctor
   void operator= (const Message_Future &f);
 
    //block indefinitely, to be refactored
   Message result () const;
private:
   // <Message_Future_Implementation> uses the Counted
   // Pointer idiom.
   Message_Future_Implementation *future_impl_;
};

Attenzione: col fatto che accedo al *body* del future solo attraverso il handle, io posso fare tutto il bookekeping di chi lo usa e quindo posso usare counter reference e usare il *Counted Pointer Idiom* che poi viene usato più spesso in C++11.

A questo punto un client che vuole usare tutto questo circo fa questo:

In [None]:
MQ_Proxy message_queue;
 
// Obtain future and block thread until message arrives.
Message_Future future = message_queue.get ();
Message msg = future.result ();
 
// Transmit message to the consumer.
send (msg);

Succesivamente, dato che quasi mai avrai il future pronto:

In [None]:
// Obtain a future (does not block the client).
Message_Future future = message_queue.get ();
 
// Do something else here…
 
// Evaluate future and block if result is not available.
Message msg = future.result ();
send (msg);

Ecco un esempio di client:

In [None]:
class Consumer_Handler {
public:
   // Constructor spawns the active object’s thread.
   Consumer_Handler ();
 
   // Put the message into the queue.
   void put (const Message &msg) { msg_q_.put (msg); }
private:
   MQ_Proxy msg_q_; // Proxy to the Active Object.
   SOCK_Stream connection_; // Connection to consumer.
 
   // Entry point into the new thread.
   static void *svc_run (void *arg);
};

Il POSA crea tutto questo per un ipotetico router di messaggi che viene implementato così nel libro:

In [None]:
void Supplier_Handler::route_message (const Message &msg)
{
   // Locate the appropriate consumer based on the
   // address information in <Message>.
   Consumer_Handler *consumer_handler =
      routing_table_.find (msg.address ());
 
   // Put the Message into the Consumer Handler’s queue.
   consumer_handler->put (msg);
}

Attenzione ch è responsabilità del client fare spawning dei thread da usare e quindi fa anche qualcosa di simile:

In [None]:
Consumer_Handler::Consumer_Handler () {
   // Spawn a separate thread to get messages from the
   // message queue and send them to the consumer.
   Thread_Manager::instance ()->spawn (&svc_run, this);
   // …
}