# Ports and adapters with command handler patterns

## Part 1: Command Handler

In [1]:
from typing import NamedTuple

Architecture style: ports and adapters
Design pattern: Command handler

keeping business logic in model objects, no bleeding into controllers, no fat manager classes. 

complexity upfront to avoid accidental complexity down the road

building an issue management system

> First story: as a user I want to be able to report a new issue

### 3 architechture principles

1. we will always define where our use case begins and ends. no business processes strewn all over the codebase
2. depend on abstractions, not concrete implementations
3. glue code is distinct from business logic. put it in the right place

define the domain model. encapsulates our shared understanding of the problem. uses agreed terminology. create a separate python package for our domain model, no dependencies on other layers.

outside domain model put srvices - stateless objects that do stuff to the domain.

finally adapter layer. code that drives the service layer. eg concrete implementation for talking to the db. connect our app to the world

`DB -> (Adapters ( Services ( Domain ) ) ) <- User`

a command handler is an object that orchestrates a business process. similar to a controller in MVC

create a **command object** - small object that represents a state-changing action that can happen in the system. no behavior, pure data structures. Commands are instructions from an external agent. they have imperative names. avoid 'create' 'update' 'delete' - these are technical terms. use the business languange.

In [2]:
class ReportIssueCommand(NamedTuple):
    reporter_name: str
    reporter_email: str
    problem_description: str

the command objects are part of the domain. they express the api of your domain. The only way to change state is through a command. they can ve created in many ways: POST, celery, etc.

Create a **command handler** for your command. a stateless object that orchestrate system behaviour. kind of like glue code. fetching and saving objects, notifying other areas of the system.

Each command has exactly one handler.

In [3]:
class ReportIsseCommandHandler:
    def __init__(self, issue_log):
        self.issue_log = issue_log
        
    def __call__(self, cmd):
        reported_by = IssueReporter(
            cmd.reporter_name,
            cmd.reporter_email
        )
        issue = Issue(reported_by, cmd.problem_description)
        self.issue_log.add(issue)

the structure of CHs is consistent:
1. fetch the current state from persistent storage
2. update current state
3. persist the new state
4. notify any external systems the state has changed.

Boring code, no ifs, loops etc. stick to a single line of execution. no business logic. The following would be a bad CH:

In [4]:
class MarkIssueAsResolvedHandler:
    def __init__(self, issue_log):
        self.issue_log = issue_log
        
    def __call__(self, cmd):
        issue = self.issue_log.get(cmd.issue_id)
        # following is business logic
        if (issue.state != IssueStatus.Resolved):
            issue.mark_as_resolved(cmd.resolution)

here the if statement belongs in the domain model, probably in `mark_as_resolved` method of our issue logic

1. commands are logic free structures - just a name and bunch of values
2. they are a simple stable API. they don't depend on implementation
3. commands are handled by exactly 1 handler
4. each command instructs run through 1 use case
5. a handler: fetches state, updates state, persists new state, notifies about change.

In [5]:
# domain model

class IssueReporter:
    def __init__(self, name, email):
        self.name = name
        self.email = email
        

class Issue:
    def __init__(self,reporter,description):
        self.description = description
        self.reporter = reporter
        

class IssueLog:
    def add(self, issue):
        pass


class ReportIssueCommand(NamedTuple):
    reporter_name: str
    reporter_email: str
    problem_description: str
        
# service layer

class ReportIsseCommandHandler:
    def __init__(self, issue_log):
        self.issue_log = issue_log
        
    def __call__(self, cmd):
        reported_by = IssueReporter(
            cmd.reporter_name,
            cmd.reporter_email
        )
        issue = Issue(reported_by, cmd.problem_description)
        self.issue_log.add(issue)
        
# adapter

class FakeIssueLog(IssueLog):
    def __init__(self):
        self.issues = []
        
    def add(self, issue):
        self.issues.append(issue)
        
    def get(self, id):
        return self.issues[id]
    
    def __len__(self):
        return len(self.issues)
    
    def __getitem__(self, idx):
        return self.issues[idx]

## Part 2: repository and unit of work pattern

add persistent data access.

in our handler we had

```
reporter = IssueReporter(cmd.reporter_name, cmd.reporter_email)
issue = Issue(reporter, cmd.problem_description)
issue_log.add(issue)
```

the issue log comes from the business terminology. So it belongs in the domain. but its also the ideal abstraction for our data store. but we don't want our issue log to depend on our database. That leads us to ports and adapters.

in P&A, a domain exposes ports. a port gets data in or out of the domain. IssueLog was a port. Ports are connected to the world by adapters (FakeIssueLog)

a circuit which detects current over a threshold. if the threshold exceeds, output a signal. two ports, in and out.

In [6]:
class ReadablePort:
    pass

class WriteablePort:
    pass

class ThresholdDetectionCircuit:
    arbitrary_threshold = 4
    
    def __init__(self, input: ReadablePort, 
                 output: WriteablePort):
        self.input = input
        self.output = output
        
    def read_from_input(self):
        next_value = self.input.read()
        if next_value > self.arbitrary_threshold:
            self.output.write(1)

because your ports are standardised you can plug in different devices

In [7]:
class LightDetector(ReadablePort):
    def read(self):
        return self.get_light_amplitude()
    
class Buzzer(WriteablePort):
    def write(self, value):
        if value > 0:
            self.make-noise()
            
class Dial(ReadablePort):
    def read(self):
        return self.current_value
    
class Light(WriteablePort):
    def write(self, value):
        if value > 0:
            self.on = True
        else:
            self.on = False

Back to our project: IssueLog is like WriteablePort a way to get data in and out. below we plug in sqlalchemy and text. 

In [8]:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker()

class SqlAlchemyIssueLog(IssueLog):
    def __init__(self, session: Session):
        self.session = session
        
    def add(self, issue):
        self.session.add(issue)
        
class TextFileIssueLog(IssueLog):
    def __init__(self, path):
        self.path = path
    
    def add(self, issue):
        with open(self.path, 'w') as f:
            json.dump(f)

In fact IssueLog is a **repository**, an object that hides the detail of persistent storage by giving us an interface that looks like a collection. you can add things to a repository, or get things out. thats it.

A simple repo pattern

In [9]:
class FooRepository:
    def __init__(self, db_session):
        self.db_session = db_session
        
    def add_new_item(self, item):
        self.db_session.add(item)
        
    def get_item(self, id):
        return self.db_session.get(Foo, id)
    
    def find_foos_by_latitude(self, latitude):
        return self.session.query(Foo).filter(
            foo.latitude == latitude)

here we expose methods to add, to get, and to find by some criteria. its using an sqlalchemy session so it's an adapter layer. 

you could define a different adapter for unit tests

In [10]:
class FooRespository:
    def __init__(self, db_session):
        self.items = []
    
    def add_new_items(self, item):
        self.items.append(item)
        
# etc.

this works the same as the sql alchemy, but you don't need to spin up and tear down a test DB.

The repo is the read/write to our data store. it's commonly used with a **unit of work** pattern. a UOW represents a bunch of things that happend together. all the stuff in a UOW gets cached in memory and only R/W to datastore when UOW is complete, when everything is flushed

In [13]:
# stubs
class UnitOfWorkManager:
    pass

class UnitOfWork:
    pass

#

class SqlAlchemyUnitOfWorkManager(UnitOfWorkManager):
    """The unit of work manager returns a new unit of work
    Our UOW is backed by a sql alchemy session whose lifetime 
    can be scoped to a web request, or a long lived background
    job"""
    
    def __init__(self, session_maker):
        self.session_maker = session_maker
        
    def start(self):
        return SqlAlchemyUnitOfWork(self.session_maker)
    

class SqlAlchemyUnitOfWork(UnitOfWork):
    """the unit of work captures the idea of a set of things
    that need to happen together
    
    usually, in a relational db, one UOW = one DB transaction"""
    
    def __init__(self, sessionfactory):
        self.sessionfactory = sessionfactory
        
    def __enter__(self):
        self.session = self.sessionfactory()
        return seld
    
    def __exit__(self, type, value, traceback):
        self.session.close()
        
    def commit(self):
        self.session.commit()
        
    def rollback(self):
        self.session.rollback()
    
    # put repositories on UOW for convenient access
    @property
    def issues(self):
        return IssueRepository(self.session)

This is missing logging and error handling in the commit method but otherwise production ready.

Our UOW manager creates a UOW

Our CH will need to be adjusted to work with this, it should now start a unit of work, and commiting the uow when it's finshed. This is principle 1: clear beginning and end of business process.

Our CH works on abstractions - it doesn't care whether it's working with a test double or a sqlalchemy session. (P2)

And this is just glue code.

In [15]:
class ReportIssueHandler:
    def __init__(self, uowm:UnitOfWorkManager):
        self.uowm = uowm
        
    def handle(self, cmd):
        with uowm.start() as unit_of_work:
            reporter = IssueReporter(cms.reporter_name, 
                                     cmd.reporter_email)
            issue = Issue(reporter, cmd.problem_description)
            unit_of_work.issues.add(issue)
            unit_of_work.commit()

In the code sample for C1 there was
```
issue_logger_app_services
    |- __init__.py
    |- handlers.py (ReportIssueHandler)
issue_logger_model
    |- __init__.py
    |- commands.py (ReportIssueCommand)
    |- domain.py (IssueReporter, Issue, IssueLog)
issue_logger_unit_tests
requirements.txt
setup.py
```

Now it's
```
issues
    |- services
        |- __init__.py
    |- domain
        |- __init__.py
        |- commands.py (ReportIssueCommand)
        |- model.py (IssueReporter, Issue)
        |- ports.py (IssueLog, UnitOfWork, UnitOfWorkManager)
    |- adapters
        |- __init__.py
        |- orm.py (SqlAlchemyUnitOfWorkManager(UnitOfWorkMgr),
                   IssueRepository, 
                   SqlAlchemyUnitOfWork(UnitOfWork),
                   SqlAlchemy, SqlAlchemySessionContext, 
                   IssueViewBuilder) 
     |- slow-tests
         |- __init__.py
         |- issue_repository_tests.py (bunch of tests)
     |- quick_tests
         |- adapters.py (FakeIssueLog(IssueLog),
                        FakeUnitOfWork(UnitOfWork, 
                                    UnitOfWorkManager)
         |- test_issue_reporting.py (bunch of tests)
     |- __init__.py
requirements.txt
setup.py
```

Note the ORM code is in one module and it depends on the domain model, not the other way round.

Our unit tests would continue to work if we deleted SQLAlchemy

## Part 3: Command-Query separation Principle

every method should either be a command that performs an action, or a query that returns data to the caller, but not both. *Asking a question should not change the answer*

### Referential Transparency
a function is referentially transparent if you could replace it with a static value

In [16]:
class LightSwitch:
    def toggle_light(self):
        self.light_is_on = not self.light_is_on
        return self.light_is_on
    
    @property
    def is_on(self):
        return self.light_is_on

`is_on` is RT. it can be replaced by true or false without loss of functionality. `toggle_light` is side-effectual. to be CQS compliant We shouldn't return a value from the method.

if commands and queries are completely separate it makes your code much more readable.

### getting data out of a command-handler architecture

Commands and handlers perform state changes. what is the equivalent port for queries?

Low cost: just re-use your respositories

In [19]:
# @app.route("/issues") <- would be your flask
def list_issues():
    with unit_of_work_manager.start() as unit_of_work:
        open_issues = unit_of_work.issue.find_by_status('open')
        return json.dumps(open_issues)

but it's a slippery slope: soon you'll start including business logic

To stay pure, define your views explicitly

In [21]:
class OpenIssuesList:
    def __init__(self, sessionmaker):
        self.sessionmaker = sessionmaker
        
    def fetch(self):
        with self.sessionmaker() as session:
            result = session.execute(
                'SELECT reporter_name, timestamp, title'
                'FROM issues WHERE state = "open"'
            )
            return [dict(r) for r in result.fetchall()]
        
#@api.route('/issues/')
def list_issues():
    view_builder = OpenIssuesList(session_maker)
    return jsonify(view_builder.fetch())

Note the raw SQL - this is OK because your read model is super easy and lightweight, and completely separate from your much heavier write model. Abandoning the ORM is OK here

### Why a separate read-model?

mistake 1: making too make calls to the DB because you don't realise what you have in memory - makes for bad performance

In [None]:
# find all users who are assigned this task
# [[and]] notify them and their line manager
# then move the task to their in-queue
notification = task.as_notification()
for assignee in task.assignees:
    assignee.manager.notifications.add(notifications)
    assignee.notifications.add(notifications)
    assignee.queues.inbox.add(task)

### CQRS is CQS at a system level

Command-Query Responsibility Segregation.

just means that you separate the write model (the domain model) and the read model (lightweight simple model for showing things on the UI, or answering questions about the domain state)

![title](6.1.read_write.png)

This is why using raw SQL is fine for read. use whatever is fast and convenient.

### Application Controlled Identifiers

OK, so your commands can't return values, how do you get an ID back from my save method? say if I've just written an issue and want to point the user to that issue on the UI?

Answer: choose the IDs yourself instead of letting the db choose them. You can use uuid for this. but there's also hilo for sqlalchemy (google it)

Check in on the app model

![model](6.2.model.png)

## Part 4: why use domain events?
we have a skeleton for an app, and we can add new issues into the DB, and fetch them from a Flask API.

So far we don't have any domain logic. Let's look at more use-cases

> As an IT manager I want to assign priority to new items
> As an IT manager I want to assign categories to new items (both part of the 'triage'
> As an IT manager I want to assign an engineer to an issue
> as an engineer I want to pick up an unassigned issue
> As an IT user I want a queue of issues determined by priority and category so I know what to work on.

look at the Triaging. issues need to have a `state`, with a new issue having the state `AwaitingTriage`. So we add a Triage handler and command

In [22]:
class TriageIssueHandler:
    def __init__(seld, uowm: UnitOfWorkManager):
        self.uowm = uowm
        
    def handle(self, cmd):
        with self.uowm.start() as uow:
            issue = uow.issues.get(cmd.issue_id)
            issue.triage(cmd.priority, cmd.category)
            # issue method should also put the issue into
            # AwaitingAssignment state
            uow.commit()
            
class PickIssueHandler:
    def __init__(self, uowm: UnitOfWorkManager):
        self.uowm = uowm
        
    def handle(self, cmd):
        with self.uowm.start() as uow:
            issue = uow.issues.get(cmd.issue_id)
            issue.assign_to(cmd.picked_by)
            uow.commit()

So far each handler has had the steps
1. fetch current state
2. mutae the state by calling a method on our domain model
3. persist the new state

but we haven't done step 4 yet
4. notify other parts of the system the state has changed

> as an engineer I want to get an email when an issue is assigned to me

### Single Responsibility Principle


In [25]:
# dummy class
class EmailBuilder:
    pass
class EmailSender:
    pass

# code
class AssignIssueHandler:
    def __init__(self, uowm: UnitOfWorkManager,
                email_builder: EmailBuilder,
                email_sender: EmailSender):
        self.uowm = uowm
        self.email_builder = email_builder
        self.email_sender = email_sender
        
    def handle(self,cmd):
        # assign issue
        with self.uowm.start() as uow:
            issue = uow.issues.get(cmd.issue_id)
            issue.assign_to(
                cmd.assigned_to,
                assigned_by = cmd.assigned_by
            )
            uow.commit()
            
        
        # send email
        email = self.email_builder.build(
            cmd.assigned_to,
            cmd.assigned_by,
            issue.problem_description
        )
        self.email_sender.send(email)

So this is responsible for 2 things: assigning the task and sending the email. this violates SRP

> describe the behavior of you class. if you use the word 'and' or 'then' you might be breaking SRP 

In practical terms, how would you error handle this? what if you can't assign to a particular engineer? should you assign to another engineer? What if the assign part suceeds but the email fails? what action should be taken in response? what state will the system be in?

Assgining the issue is the thing we care about here - it should either suceed or fail completely. sending the email is secondary and the sysadmins can deal with it later

So lets fix it

In [26]:
class AssignIssueHandler:
    def __init__(self, uowm: UnitOfWorkManager):
        self.uowm = uowm
        
    def handle(self, cmd):
        with self.uowm.start() as uow:
            issue - uow.issues.get(cmd.issue_id)
            issue.assign_to(cmd.assignee_address,
                           assigned_by=cmd.assigner_address)
            uow.commit()
            

class SendAssignmentEmailHandler:
    def __init__(self, uowm: UnitOfWorkManager,
                email_builder: EmailBuilder,
                email_sender: EmailSender):
        self.uowm = uowm
        self.email_builder = email_builder
        self.email_sender = email_sender
        
    def handle(self,cmd):
        with self.uowm.start() as uow:
            issue = uow.issues.get(cmd.issue_id)
            
            email = self.email_builder.build(
                cmd.assignee_address,
                cmd.assigner_address,
                issue.problem_description
            )
            self.email_sender.send(email)

The email handler doesn't actually need a uow here, because the state isn't changing. So use a view builder instead

In [28]:
# dummy class
class IssueViewBuilder:
    pass

# code
class SendAssignmentEmailHandler:
    def __init__(self, 
                view: IssueViewBuilder,
                email_builder: EmailBuilder,
                email_sender: EmailSender):
        self.view = view
        self.email_builder = email_builder
        self.email_sender = email_sender
        
    def handle(self,cmd):
        issue = self.view.fetch(cmd.issue_id)
            
        email = self.email_builder.build(
            cmd.assignee_address,
            cmd.assigner_address,
            issue['problem description']
        )
        self.email_sender.send(email)

But how do you invoke the handler? Calling the email handler from the assigner handler violates principle 1

> 1. we will always define where our use case begins and ends. no business processes strewn all over the codebase

we need a way to signal between handlers. we need **Domain Events**

### message bus
Events are like commands, they're both types of message - chunks of data between entities. they differ in intent:

1. commands are named in the imperative (DoThisThing), events in past (ThingWasDone)
2. Commands have exactly one Handler. Events have 0 to many handlers
3. an error in a command results in the entire request failing. an event error fails gracefully

Use a domain event to *trigger workflows that fall outside our immediate use case boundary* - here the UCB is 'assign the issue' and the secondary requirement is 'send the email'. Notifcations are commoon reasons to trigger events, others are clear the cache, regenerate a view model.

we need to raise a DE when we assign an issue. we don't want to know about the subscibers to the event, or we are coupled. we need a mediator, a **message bus**

![bus](6.3.bus.png)

In [30]:
from collections import defaultdict

class MessageBus:
    def __init__(self):
        """our message bus is just a mapping
        from message type to a list of handlers"""
        self.subscribers = defaultdict(list)
    
    def handle(self, msg):
        """the handle method invokes each handler in
        turn with our event"""
        msg_name = type(msg).__name__
        subscribers = self.subscribers[msg_name]
        for subscriber in subscribers:
            subscriber.handle(cmd)
            
    def subscribe_to(self, msg, handler):
        """subscribe sets up a new mapping, we make
        sure not to allow > 1 handler per command"""
        subscribers = [msg.__name__]
        if msg.is_cmd and len(subscribers) > 0:
            raise CommandAlreadySubscribedException(msg.__name__)
        subscribers.append(handler)

usage example
```
bus = MessageBus()
bus.subscribe_to(ReportIssueCommand, 
                 ReportIssueHandler(db.unit_of_work_manager))
bus.handle(cmd)
```

To put this in our API handlers would look like:

```
@api.route('/issues', methods=['POST'])
def create_issue(self):
    issue_id = uuid.uuid4()
    cmd = ReportIssueCommand(issue_id = issue_id,
            **request.get_json())
    bus.handle(cmd)
    return "", 201, {"Location": "/issues/" + str(issue_id)}
```

So instead of directly constucting a handler and telling it to handle the command, we're passing it to the bus.

Raising an event? do it from command handlers

In [35]:
class AssignIssueHandler:
    def handle(self, cmd):
        with self.uowm.start() as uow:
            issue = uow.issues.get(cmd.id)
            issue.assign_to(cmd.assigned_to, cmd.assigned_by)
            uow.commit()
        
        self.bus.raise(IssueAssignedToEngineer(
            cmd.issue_id,
            cmd.assigned_to,
            cmd.assigned_by
        ))

SyntaxError: invalid syntax (<ipython-input-35-a8f9355d98b0>, line 8)

Or do it from your model objects, treat them as part of domain model proper

In [36]:
class Issue:
    def assign_to(self, assigned_to, assigned_by):
        self.assigned_to = assigned_to
        self.assigned_by = assigned_by
        
        self.events.add(IssueAssignedToEngineer(
            self.id, self.assigned_to, self.assigned_by
        ))

This means you can have business logic for deciding when to raise an event.