Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added missing functions #6

Merged
merged 5 commits into from
Oct 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
sudo: false
sudo: required

language: php

php:
- 7.0
- 7.1
- 7.2
- nightly

matrix:
Expand All @@ -13,6 +14,8 @@ matrix:
fast_finish: true

before_script:
- sudo apt-get install -qq --force-yes beanstalkd
- beanstalkd >/dev/null 2>&1 &
# --ignore-platform-reqs, because https://github.com/FriendsOfPHP/PHP-CS-Fixer/pull/2722
- composer update -n --prefer-dist --ignore-platform-reqs
- composer require satooshi/php-coveralls dev-master --ignore-platform-reqs
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"require": {
"amphp/amp": "^2",
"amphp/socket": "^0.10",
"amphp/uri": "^0.1"
"amphp/uri": "^0.1",
"symfony/yaml": "^3.3"
},
"require-dev": {
"amphp/phpunit-util": "^1",
Expand Down
85 changes: 72 additions & 13 deletions src/BeanstalkClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Amp\Deferred;
use Amp\Promise;
use Amp\Uri\Uri;
use Symfony\Component\Yaml\Yaml;
use Throwable;
use function Amp\call;

Expand Down Expand Up @@ -86,6 +87,25 @@ public function use(string $tube) {
});
}

public function pause(string $tube, int $delay): Promise {
$payload = "pause-tube $tube $delay\r\n";

return $this->send($payload, function (array $response) use ($tube) {
list($type) = $response;

switch ($type) {
case "PAUSED":
return null;

case "NOT_FOUND":
throw new NotFoundException("Tube with name $tube is not found");

default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}

public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): Promise {
$payload = "put $priority $delay $timeout " . strlen($payload) . "\r\n$payload\r\n";

Expand Down Expand Up @@ -248,6 +268,10 @@ public function ignore(string $tube): Promise {
});
}

public function quit() {
$this->send("quit\r\n");
}

public function getJobStats(int $id): Promise {
$payload = "stats-job $id\r\n";

Expand All @@ -256,7 +280,7 @@ public function getJobStats(int $id): Promise {

switch ($type) {
case "OK":
return new Job($this->getStatsFromString($response[1]));
return new Job(Yaml::parse($response[1]));

case "NOT_FOUND":
throw new NotFoundException("Job with $id is not found");
Expand All @@ -275,7 +299,7 @@ public function getTubeStats(string $tube): Promise {

switch ($type) {
case "OK":
return new Tube($this->getStatsFromString($response[1]));
return new Tube(Yaml::parse($response[1]));

case "NOT_FOUND":
throw new NotFoundException("Tube $tube is not found");
Expand All @@ -294,24 +318,59 @@ public function getSystemStats(): Promise {

switch ($type) {
case "OK":
return new System($this->getStatsFromString($response[1]));
return new System(Yaml::parse($response[1]));

default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}

private function getStatsFromString(string $stats): array {
$result = [];
$source = explode("\n", $stats);
foreach ($source as $stat) {
if ($stat == '---' || empty($stat)) {
continue;
public function listTubes(): Promise {
$payload = "list-tubes\r\n";

return $this->send($payload, function (array $response): array {
list($type) = $response;

switch ($type) {
case "OK":
return Yaml::parse($response[1]);

default:
throw new BeanstalkException("Unknown response: " . $type);
}
list($key, $value) = explode(':', $stat);
$result[$key] = trim($value);
}
return $result;
});
}

public function listWatchedTubes(): Promise {
$payload = "list-tubes-watched\r\n";

return $this->send($payload, function (array $response): array {
list($type) = $response;

switch ($type) {
case "OK":
return Yaml::parse($response[1]);

default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}

public function getUsedTube(): Promise {
$payload = "list-tube-used\r\n";

return $this->send($payload, function (array $response): string {
list($type) = $response;

switch ($type) {
case "USING":
return $response[1];

default:
throw new BeanstalkException("Unknown response: " . $type);
}
});
}
}