-
Notifications
You must be signed in to change notification settings - Fork 0
/
Pipeline.php
203 lines (178 loc) · 5.15 KB
/
Pipeline.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
<?php
namespace Ant\Middleware;
use Closure;
use Generator;
use Exception;
use InvalidArgumentException;
/**
* Todo Server版
* Todo 尝试使用Context代替中间件传输参数
* Todo 中间件处理完异常后,继续回调剩下来的栈
*
* Class Pipeline
* @package Ant\Middleware
*/
class Pipeline
{
/**
* 默认加载的中间件
*
* @var array
*/
protected $nodes = [];
/**
* 执行时传递给每个中间件的参数
*
* @var array
*/
protected $arguments = [];
/**
* 7.0之前使用第二次协同返回数据,7.0之后通过getReturn返回数据
*
* @var bool
*/
protected $isPhp7 = false;
/**
* Middleware constructor.
*/
public function __construct()
{
$this->isPhp7 = version_compare(PHP_VERSION, '7.0.0', '>=');
}
/**
* 添加一个中间件到顶部
*
* @param callable $callback
* @return $this
*/
public function unShift(callable $callback)
{
array_unshift($this->nodes,$callback);
return $this;
}
/**
* 添加一个中间件到尾部
*
* @param callable $callback
* @return $this
*/
public function push(callable $callback)
{
array_push($this->nodes,$callback);
return $this;
}
/**
* 设置在中间件中传输的参数
*
* @return self $this
*/
public function send()
{
$this->arguments = func_get_args();
return $this;
}
/**
* 设置经过的中间件
*
* @param array|callable $nodes 经过的每个节点
* @return $this
*/
public function through($nodes)
{
$nodes = is_array($nodes) ? $nodes : func_get_args();
foreach ($nodes as $node) {
if (!is_callable($node)) {
throw new InvalidArgumentException('Pipeline must be a callback');
}
}
$this->nodes = $nodes;
return $this;
}
/**
* 设定中间件运行终点,并执行
*
* @param callable $destination
* @return mixed
* @throws Exception
*/
public function then(callable $destination)
{
// 初始化参数
$stack = [];
$arguments = $this->arguments;
try {
foreach ($this->nodes as $node) {
$generator = call_user_func_array($node, $arguments);
if ($generator instanceof Generator) {
// 将协同函数添加到函数栈
$stack[] = $generator;
$yieldValue = $generator->current();
if ($yieldValue === false) {
// 打断中间件执行流程
$status = false;
break;
} elseif ($yieldValue instanceof Arguments) {
// 替换传递参数
$arguments = $yieldValue->toArray();
}
}
}
$result = !isset($status) || $status !== false
? call_user_func_array($destination, $arguments)
: null;
// 回调函数栈
while ($generator = array_pop($stack)) {
$generator->send($result);
// 尝试用协同返回数据进行替换,如果无返回则继续使用之前结果
$result = $this->getResult($generator);
}
} catch (Exception $exception) {
$tryCatch = $this->exceptionHandle($stack, function ($e) {
// 如果无法处理,交给上层应用处理
throw $e;
});
$result = $tryCatch($exception);
}
return $result;
}
/**
* 处理异常
*
* @param $stack
* @param $throw
* @return Closure
*/
protected function exceptionHandle($stack, $throw)
{
// 出现异常之后开始回调中间件函数栈
// 如果内层中间件无法处理异常
// 那么外层中间件会尝试捕获这个异常
// 如果一直无法处理,异常将会抛到最顶层来处理
// 如果处理了这个异常,那么异常回调链将会被打断
return array_reduce($stack, function (Closure $stack, Generator $generator) {
return function (Exception $exception) use ($stack, $generator) {
try {
// 将异常交给内层中间件
$generator->throw($exception);
// 异常处理成功,将结果返回给应用程序
return $this->getResult($generator);
} catch (Exception $e) {
// 将异常交给外层中间件
return $stack($e);
}
};
},$throw);
}
/**
* 获取协程函数返回的数据,php7获取return数据,php7以下使用第二次yield
*
* @param Generator $generator
* @return mixed
*/
protected function getResult(Generator $generator)
{
return $this->isPhp7
? $generator->getReturn()
: $generator->current();
}
}