Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of async transitions. Require enqueue lib. #1

Merged
merged 4 commits into from
Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,44 @@
# Process virtual machine

The library provides us with a frame to build a [workflow](https://en.wikipedia.org/wiki/Workflow), business processes such as [BPMN](http://www.bpmn.org/).
It works like this: you build a process. It contains nodes and transitions between them. The process could be saved and used later.
The process could be executed with process engine. The processes could be visualized.

It is backed up by [workflow nets (WF-nets)](https://en.wikipedia.org/wiki/Petri_net) mathematical model and [graphs theory](https://en.wikipedia.org/wiki/Graph_theory).

## The "Hello world!" example

Let's build our first process and execute it. It'll contain one single task that prints "Hello world" to the output.

```php
<?php
namespace Acme;

use Formapro\Pvm\DefaultBehaviorRegistry;
use Formapro\Pvm\CallbackBehavior;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\Process;

$registry = new DefaultBehaviorRegistry();
$registry->register('hello_world', new CallbackBehavior(function() {
echo 'Hello world';
}));

$process = new Process();
$node = $process->createNode();
$node->setLabel('Shouts "Hello world!"');
$node->setBehavior('hello_world');

$transition = $process->createTransition(null, $node);
$token = $process->createToken($transition);

(new ProcessEngine($registry))->proceed($token);
```

Here's the process diagram:

![Hello world graph](docs/images/hello_world_example.png)

## Developed by Forma-Pro

Forma-Pro is a full stack development company which interests also spread to open source development.
Expand Down
6 changes: 5 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
},
"require": {
"php": "^5.5.0|^7.0",
"makasim/yadm": "@dev",
"makasim/values": "@dev",
"mongodb/mongodb": "^1",
"symfony/console": "^3.1",
"graphp/graphviz": "^0.2.1",
"ramsey/uuid": "^3"
},
"suggest": {
"enqueue/enqueue": "Install the Enqueue library to be able to process tasks async (by using MQ).",
"makasim/yadm": "Install the Yadm library to be able to persist store in MongoDB"
},
"extra": {
"branch-alias": {
"dev-master": "0.1.x-dev"
Expand Down
Binary file added docs/images/hello_world_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion examples/fork-join.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use Formapro\Pvm\DefaultBehaviorRegistry;
use Formapro\Pvm\EchoBehavior;
use Formapro\Pvm\Exception\InterruptExecutionException;
use Formapro\Pvm\MongoProcessStorage;
use Formapro\Pvm\Yadm\MongoProcessStorage;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\Token;
use Formapro\Pvm\Process;
Expand Down
2 changes: 1 addition & 1 deletion examples/fork-join2.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use Formapro\Pvm\DefaultBehaviorRegistry;
use Formapro\Pvm\EchoBehavior;
use Formapro\Pvm\Exception\InterruptExecutionException;
use Formapro\Pvm\MongoProcessStorage;
use Formapro\Pvm\Yadm\MongoProcessStorage;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\Token;
use Formapro\Pvm\Process;
Expand Down
10 changes: 10 additions & 0 deletions src/AsyncTransitionIsNotConfigured.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php
namespace Formapro\Pvm;

class AsyncTransitionIsNotConfigured implements AsyncTransition
{
public function transition(array $tokens)
{
throw new \LogicException('The async transitions is not configured. To be able to start using async transition you have to use enqueue library and its bridge classes or implement AsyncTransition interface yourself and provide it to the process engine.');
}
}
36 changes: 36 additions & 0 deletions src/Enqueue/AsyncTransition.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
namespace Formapro\Pvm\Enqueue;

use Enqueue\Client\ProducerInterface;
use Formapro\Pvm\Token;

class AsyncTransition implements \Formapro\Pvm\AsyncTransition
{
/**
* @var ProducerInterface
*/
private $producer;

/**
* @param ProducerInterface $producer
*/
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}

/**
* {@inheritdoc}
*/
public function transition(array $tokens)
{
foreach ($tokens as $token) {
/** @var Token $token */

$this->producer->send(HandleAsyncTransitionProcessor::TOPIC, [
'process' => $token->getProcess()->getId(),
'token' => $token->getId(),
]);
}
}
}
75 changes: 75 additions & 0 deletions src/Enqueue/HandleAsyncTransitionProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php
namespace Formapro\Pvm\Enqueue;

use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
use Enqueue\Util\JSON;
use Formapro\Pvm\Process;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\ProcessStorage;
use Psr\Log\NullLogger;

class HandleAsyncTransitionProcessor implements PsrProcessor, TopicSubscriberInterface
{
const TOPIC = 'pvm_handle_async_transition';

/**
* @var ProcessEngine
*/
private $processEngine;

/**
* @var ProcessStorage
*/
private $processExecutionStorage;

/**
* @param ProcessEngine $processEngine
* @param ProcessStorage $processExecutionStorage
*/
public function __construct(ProcessEngine $processEngine, ProcessStorage $processExecutionStorage)
{
$this->processEngine = $processEngine;
$this->processExecutionStorage = $processExecutionStorage;
}

/**
* {@inheritdoc}
*/
public function process(PsrMessage $psrMessage, PsrContext $psrContext)
{
if ($psrMessage->isRedelivered()) {
return Result::reject('The message failed. Remove it');
}

$data = JSON::decode($psrMessage->getBody());

/** @var Process $process */
if (false == $process = $this->processExecutionStorage->get($data['process'])) {
return Result::reject('Process was not found');
}

if (false == $token = $process->getToken($data['token'])) {
return Result::reject('No such token');
}

try {
$this->processEngine->proceed($token, new NullLogger());
} finally {
$this->processExecutionStorage->persist($process);
}

return self::ACK;
}

/**
* {@inheritdoc}
*/
public static function getSubscribedTopics()
{
return [static::TOPIC];
}
}
56 changes: 0 additions & 56 deletions src/MongoProcessStorage.php

This file was deleted.

2 changes: 1 addition & 1 deletion src/Node.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Node

public function __construct()
{
$this->setId(UUID::generate());
$this->setId(Uuid::generate());
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/NullProcessStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php
namespace Formapro\Pvm;

class NullProcessStorage implements ProcessStorage
{
/**
* {@inheritdoc}
*/
public function persist(Process $process)
{
}

/**
* {@inheritdoc}
*/
public function get($id)
{
throw new \LogicException(sprintf('The process with id "%s" could not be found', $id));
}
}
19 changes: 11 additions & 8 deletions src/ProcessEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ProcessEngine
/**
* @var ProcessStorage
*/
private $processStorage;
private $processExecutionStorage;

/**
* @var AsyncTransition
Expand All @@ -40,17 +40,17 @@ class ProcessEngine

/**
* @param BehaviorRegistry $behaviorRegistry
* @param ProcessStorage $processStorage
* @param ProcessStorage $processExecutionStorage
* @param AsyncTransition $asyncTransition
*/
public function __construct(
BehaviorRegistry $behaviorRegistry,
ProcessStorage $processStorage,
AsyncTransition $asyncTransition
ProcessStorage $processExecutionStorage = null,
AsyncTransition $asyncTransition = null
) {
$this->behaviorRegistry = $behaviorRegistry;
$this->processStorage = $processStorage;
$this->asyncTransition = $asyncTransition;
$this->processExecutionStorage = $processExecutionStorage ?: new NullProcessStorage();
$this->asyncTransition = $asyncTransition ?: new AsyncTransitionIsNotConfigured();
$this->asyncTokens = [];
$this->waitTokens = [];
}
Expand All @@ -73,8 +73,11 @@ public function proceed(Token $token, LoggerInterface $logger = null)
try {
$this->log('Start execution: process: %s, token: %s', $token->getProcess()->getId(), $token->getId());
$this->doProceed($token);
$this->processStorage->saveExecution($token->getProcess());
$this->asyncTransition->transition($this->asyncTokens);
$this->processExecutionStorage->persist($token->getProcess());

if ($this->asyncTokens) {
$this->asyncTransition->transition($this->asyncTokens);
}

return $this->waitTokens;
} catch (\Exception $e) {
Expand Down
10 changes: 10 additions & 0 deletions src/ProcessStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,14 @@ interface ProcessStorage
* @param Process $process
*/
public function persist(Process $process);

/**
* @param string $id
*
* @throw \LogicException if there is no such process
*
* @return Process
*/
public function get($id);

}
10 changes: 0 additions & 10 deletions src/SyncAsyncTransition.php

This file was deleted.

2 changes: 1 addition & 1 deletion src/Token.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Token

public function __construct()
{
$this->setId(UUID::generate());
$this->setId(Uuid::generate());
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Transition.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Transition

public function __construct()
{
$this->setId(UUID::generate());
$this->setId(Uuid::generate());
$this->setWeight(1);
$this->setAsync(false);
$this->setActive(true);
Expand Down
3 changes: 1 addition & 2 deletions src/UUID.php → src/Uuid.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
<?php
namespace Formapro\Pvm;


class UUID
class Uuid
{
public static function generate()
{
Expand Down
Loading