/
functions.php
171 lines (143 loc) 路 5.29 KB
/
functions.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
<?php
namespace Amp;
use Amp\Internal\FutureState;
use Revolt\EventLoop;
use Revolt\EventLoop\UnsupportedFeatureException;
/**
* Creates a new fiber asynchronously using the given closure, returning a Future that is completed with the
* eventual return value of the passed function or will fail if the closure throws an exception.
*
* @template T
*
* @param \Closure():T $closure
* @param mixed ...$args Arguments forwarded to the closure when starting the fiber.
*
* @return Future<T>
*/
function async(\Closure $closure, mixed ...$args): Future
{
static $run = null;
$run ??= static function (FutureState $state, \Closure $closure, array $args): void {
try {
$state->complete($closure(...$args));
} catch (\Throwable $exception) {
$state->error($exception);
}
};
$state = new Internal\FutureState;
EventLoop::queue($run, $state, $closure, $args);
return new Future($state);
}
/**
* Returns the current time relative to an arbitrary point in time.
*
* @return float Time in seconds.
*/
function now(): float
{
return (float) \hrtime(true) / 1_000_000_000;
}
/**
* Non-blocking sleep for the specified number of seconds.
*
* @param float $timeout Number of seconds to wait.
* @param bool $reference If false, unreference the underlying watcher.
* @param Cancellation|null $cancellation Cancel waiting if cancellation is requested.
*/
function delay(float $timeout, bool $reference = true, ?Cancellation $cancellation = null): void
{
$suspension = EventLoop::createSuspension();
$callbackId = EventLoop::delay($timeout, static fn () => $suspension->resume());
$cancellationId = $cancellation?->subscribe(
static fn (CancelledException $exception) => $suspension->throw($exception)
);
if (!$reference) {
EventLoop::unreference($callbackId);
}
try {
$suspension->suspend();
} finally {
EventLoop::cancel($callbackId);
/** @psalm-suppress PossiblyNullArgument $cancellationId will not be null if $cancellation is not null. */
$cancellation?->unsubscribe($cancellationId);
}
}
/**
* Wait for signal(s) in a non-blocking way.
*
* @param int|int[] $signals Signal number or array of signal numbers.
* @param bool $reference If false, unreference the underlying watcher.
* @param Cancellation|null $cancellation Cancel waiting if cancellation is requested.
*
* @return int Caught signal number.
* @throws UnsupportedFeatureException
*/
function trapSignal(int|array $signals, bool $reference = true, ?Cancellation $cancellation = null): int
{
$suspension = EventLoop::createSuspension();
$callback = static fn (string $watcher, int $signal) => $suspension->resume($signal);
$id = $cancellation?->subscribe(static fn (CancelledException $exception) => $suspension->throw($exception));
$callbackIds = [];
if (\is_int($signals)) {
$signals = [$signals];
}
foreach ($signals as $signo) {
$callbackIds[] = $callbackId = EventLoop::onSignal($signo, $callback);
if (!$reference) {
EventLoop::unreference($callbackId);
}
}
try {
return $suspension->suspend();
} finally {
foreach ($callbackIds as $callbackId) {
EventLoop::cancel($callbackId);
}
/** @psalm-suppress PossiblyNullArgument $id will not be null if $cancellation is not null. */
$cancellation?->unsubscribe($id);
}
}
/**
* Returns a Closure that maintains a weak reference to any $this object held by the Closure (a weak-Closure).
* This allows a class to hold a self-referencing Closure without creating a circular reference that would
* prevent or delay automatic garbage collection.
* Invoking the returned Closure after the object is destroyed will throw an instance of Error.
*
* @param \Closure $closure
*
* @return \Closure
*/
function weakClosure(\Closure $closure): \Closure
{
$reflection = new \ReflectionFunction($closure);
$that = $reflection->getClosureThis();
if (!$that) {
return $closure;
}
$method = $reflection->getShortName();
if ($method !== '{closure}') {
// Closure from first-class callable or \Closure::fromCallable(), declare an anonymous closure to rebind.
/** @psalm-suppress InvalidScope Closure is bound before being invoked. */
$closure = fn (mixed ...$args) => $this->{$method}(...$args);
} else {
// Rebind to remove reference to $that
$closure = $closure->bindTo(new \stdClass());
}
// For internal classes use \Closure::bindTo() without scope.
$useBindTo = !(new \ReflectionClass($that))->isUserDefined();
$reference = \WeakReference::create($that);
return static function (mixed ...$args) use ($reference, $closure, $useBindTo): mixed {
$that = $reference->get();
if (!$that) {
throw new \Error('Weakened closure invoked after referenced object destroyed');
}
if ($useBindTo) {
$closure = $closure->bindTo($that);
if (!$closure) {
throw new \RuntimeException('Unable to rebind function to object of type ' . \get_class($that));
}
return $closure(...$args);
}
return $closure->call($that, ...$args);
};
}