Skip to content

Commit

Permalink
Use Producer in Body and BodyParser
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Aug 23, 2016
1 parent 52f7ec6 commit 5fe5f68
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 132 deletions.
90 changes: 28 additions & 62 deletions lib/Body.php
Expand Up @@ -2,9 +2,7 @@

namespace Aerys;

use Amp\Observable;
use Amp\Observer;
use Amp\Postponed;
use Amp\{ Internal\Producer, Observable, Observer, Postponed };

/**
* An API allowing responders to buffer or stream request entity bodies
Expand Down Expand Up @@ -33,74 +31,42 @@
* };
*/
class Body extends Observer implements Observable {
private $whens = [];
private $watchers = [];
private $string;
private $error;
use Producer;

public function __construct(Observable $observable) {
$observable->subscribe(function($data) {
foreach ($this->watchers as $func) {
$func($data);
}
$observable->subscribe(function ($data) {
return $this->emit($data);
});

parent::__construct($observable); // DO NOT MOVE - preserve order in which things happen
$when = static function ($e, $bool) use (&$continue) {
$continue = $bool;
};
$observable->when(function($e, $result) use (&$continue, $when) {

$observable->when(function($e, $result) {
if ($e) {
$this->fail($e);
return;
}

$when = static function ($e, $bool) use (&$continue) {
$continue = $bool;
};
$string = "";
$this->next()->when($when);
while ($continue) {
$string[] = $this->getCurrent();
$string .= $this->getCurrent();
$this->next()->when($when);
}

$this->next()->when(function ($ex) use (&$e) {
$e = $ex;
});

if (isset($string)) {
if (isset($string[1])) {
$string = implode($string);
} else {
$string = $string[0];
}

// way to restart, so that even after the success, the next() / getCurrent() API will still work
$postponed = new Postponed;
parent::__construct($postponed->getObservable());
$postponed->emit($string);
if ($e) {
$postponed->fail($e);
} else {
$postponed->resolve($result);
}

// way to restart, so that even after the success, the next() / getCurrent() API will still work
$postponed = new Postponed;
parent::__construct($postponed->getObservable());
$postponed->emit($string);
if ($e) {
$postponed->fail($e);
} else {
$string = "";
}
$this->string = $string;
$this->error = $e;

foreach ($this->whens as $when) {
$when($e, $string);
$postponed->resolve($result);
}
$this->whens = $this->watchers = [];


$this->resolve($string);
});
}

public function when(callable $func) {
if (isset($this->string)) {
$func($this->error, $this->string);
} else {
$this->whens[] = $func;
}
return $this;
}

public function subscribe(callable $func) {
if (!isset($this->string)) {
$this->watchers[] = $func;
}
}
}
}
114 changes: 47 additions & 67 deletions lib/BodyParser.php
Expand Up @@ -2,22 +2,17 @@

namespace Aerys;

use Amp\Coroutine;
use Amp\Deferred;
use Amp\Observable;
use Amp\Postponed;
use Amp\Success;
use Amp\{ Coroutine, Deferred, Internal\Producer, Observable, Postponed, Success };

class BodyParser implements Observable {
use Producer {
subscribe as watch;
}

private $req;
private $body;
private $boundary = null;

private $whens = [];
private $watchers = [];
private $error = null;
private $result = null;

private $bodyDeferreds = [];
private $bodies = [];
private $parsing = false;
Expand Down Expand Up @@ -50,7 +45,7 @@ public function __construct(Request $req, array $options = []) {
if (!preg_match('#^\s*multipart/(?:form-data|mixed)(?:\s*;\s*boundary\s*=\s*("?)([^"]*)\1)?$#', $type, $m)) {
$this->req = null;
$this->parsing = true;
$this->result = new ParsedBody([]);
$this->resolve(new ParsedBody([]));
return;
}

Expand All @@ -67,27 +62,21 @@ public function __construct(Request $req, array $options = []) {
if ($e instanceof ClientSizeException) {
$e = new ClientException("", 0, $e);
}
$this->error = $e;
$this->error($e);
} else {
$this->result = $this->end($data);
}

if (!$this->parsing) {
$this->parsing = 2;
foreach ($this->result->getNames() as $field) {
foreach ($this->result->getArray($field) as $_) {
foreach ($this->watchers as $cb) {
$cb($field);
$result = $this->end($data);

if (!$this->parsing) {
$this->parsing = 2;
foreach ($result->getNames() as $field) {
foreach ($result->getArray($field) as $_) {
$this->emit($field);
}
}
}

$this->resolve($result);
}

foreach ($this->whens as $cb) {
$cb($this->error, $this->result);
}

$this->whens = $this->watchers = [];
});
});
}
Expand Down Expand Up @@ -164,27 +153,17 @@ private function end($data) {
return new ParsedBody($fields, array_filter($metadata));
}
}

public function when(callable $cb) {
if ($this->req || !$this->parsing) {
$this->whens[] = $cb;
} else {
$cb($this->error, $this->result);
}

return $this;
}

public function subscribe(callable $cb) {

public function subscribe(callable $onNext) {
if ($this->req) {
$this->watchers[] = $cb;
$this->watch($onNext);

if (!$this->parsing) {
$this->parsing = true;
\Amp\defer(function() { return $this->initIncremental(); });
}
} elseif (!$this->parsing) {
$this->watchers[] = $cb;
$this->watch($onNext);
}
}

Expand Down Expand Up @@ -216,9 +195,7 @@ public function stream(string $name, int $size = 0): FieldBody {
return new FieldBody($body->getObservable(), $metadata->getAwaitable());
}
} elseif (empty($this->bodies[$name])) {
$postponed = new Postponed;
$postponed->resolve();
return new FieldBody($postponed->getObservable(), new Success([]));
return new FieldBody(new Success, new Success([]));
}

$key = key($this->bodies[$name]);
Expand All @@ -229,14 +206,14 @@ public function stream(string $name, int $size = 0): FieldBody {

private function initField($field, $metadata = []) {
if ($this->inputVarCount++ == $this->maxInputVars || \strlen($field) > $this->maxFieldLen) {
$this->fail();
$this->error();
return null;
}

$this->curSizes[$field] = 0;
$this->usedSize += \strlen($field);
if ($this->usedSize > $this->size) {
$this->fail();
$this->error();
return null;
}

Expand All @@ -250,43 +227,38 @@ private function initField($field, $metadata = []) {
$this->bodies[$field][] = new FieldBody($dataPostponed->getObservable(), new Success($metadata));
}

foreach ($this->watchers as $cb) {
$cb($field);
}
$this->emit($field);

return $dataPostponed;
}

private function updateFieldSize($field, $data) {
$this->curSizes[$field] += \strlen($data);
if (isset($this->sizes[$field])) {
if ($this->curSizes[$field] > $this->sizes[$field]) {
$this->fail();
$this->error();
return true;
}
} else {
$this->usedSize += \strlen($data);
if ($this->usedSize > $this->size) {
$this->fail();
$this->error();
return true;
}
}
return false;
}

private function fail($e = null) {
$this->error = $e ?? $e = new ClientSizeException;
private function error(\Throwable $e = null) {
$e = $e ?? new ClientSizeException;
foreach ($this->bodyDeferreds as list($deferred, $metadata)) {
$deferred->fail($e);
$metadata->fail($e);
}
$this->bodyDeferreds = [];
$this->req = null;

foreach ($this->whens as $cb) {
$cb($e, null);
}

$this->whens = $this->watchers = [];
$this->fail($e);
}

// this should be inside a defer (not direct Coroutine) to give user a chance to install watch() handlers
Expand All @@ -302,13 +274,15 @@ private function initIncremental() {
$sep = "--$this->boundary";
while (\strlen($buf) < \strlen($sep) + 4) {
if (!yield $this->body->next()) {
return $this->fail(new ClientException);
$this->error(new ClientException);
return;
}
$buf .= $this->body->getCurrent();
}
$off = \strlen($sep);
if (strncmp($buf, $sep, $off)) {
return $this->fail(new ClientException);
$this->error(new ClientException);
return;
}

$sep = "\r\n$sep";
Expand All @@ -318,7 +292,8 @@ private function initIncremental() {

while (($end = strpos($buf, "\r\n\r\n", $off)) === false) {
if (!yield $this->body->next()) {
return $this->fail(new ClientException);
$this->error(new ClientException);
return;
}
$off = \strlen($buf);
$buf .= $this->body->getCurrent();
Expand All @@ -329,13 +304,15 @@ private function initIncremental() {
foreach (explode("\r\n", substr($buf, $off, $end - $off)) as $header) {
$split = explode(":", $header, 2);
if (!isset($split[1])) {
return $this->fail(new ClientException);
$this->error(new ClientException);
return;
}
$headers[strtolower($split[0])] = trim($split[1]);
}

if (!preg_match('#^\s*form-data(?:\s*;\s*(?:name\s*=\s*"([^"]+)"|filename\s*=\s*"([^"]+)"))+\s*$#', $headers["content-disposition"] ?? "", $m) || !isset($m[1])) {
return $this->fail(new ClientException);
$this->error(new ClientException);
return;
}
$field = $m[1];

Expand All @@ -357,7 +334,8 @@ private function initIncremental() {
if (!yield $this->body->next()) {
$e = new ClientException;
$dataPostponed->fail($e);
return $this->fail($e);
$this->error($e);
return;
}

$buf .= $this->body->getCurrent();
Expand All @@ -384,7 +362,8 @@ private function initIncremental() {

while (\strlen($buf) < 4) {
if (!yield $this->body->next()) {
return $this->fail(new ClientException);
$this->error(new ClientException);
return;
}
$buf .= $this->body->getCurrent();
}
Expand Down Expand Up @@ -457,11 +436,12 @@ private function initIncremental() {
$buf = substr($buf, \strlen($new) + 1);
$noData = true;
} elseif (\strlen($buf) > $this->maxFieldLen) {
return $this->fail();
$this->error();
return;
}
}

if ($field !== null && $buf != "" && (\strlen($buf > 2) || $buf[0] !== "%")) {
if ($field !== null && $buf != "" && (\strlen($buf) > 2 || $buf[0] !== "%")) {
if (\strlen($buf) > 1 ? false !== $percent = strrpos($buf, "%", -2) : !($percent = $buf[0] !== "%")) {
if ($percent) {
if ($this->updateFieldSize($field, $data)) {
Expand Down
5 changes: 2 additions & 3 deletions test/BodyParsingTest.php
Expand Up @@ -24,9 +24,8 @@ function testDecoding($header, $data, $fields, $metadata) {
$postponed->resolve();

\Amp\execute(function() use ($ireq, &$result) {
yield \Aerys\parseBody(new StandardRequest($ireq))->when(function ($e, $parsedBody) use (&$result) {
$result = $parsedBody->getAll();
});
$parsedBody = yield \Aerys\parseBody(new StandardRequest($ireq));
$result = $parsedBody->getAll();
});

$this->assertEquals($fields, $result["fields"]);
Expand Down

0 comments on commit 5fe5f68

Please sign in to comment.