Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.0]Add gRPC extensibility #5047

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 49 additions & 28 deletions src/grpc-client/src/BaseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
namespace Hyperf\GrpcClient;

use Google\Protobuf\Internal\Message;
use Hyperf\Grpc\Parser;
use Hyperf\Grpc\StatusCode;
use Hyperf\GrpcClient\Exception\GrpcClientException;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\ChannelPool;
use InvalidArgumentException;
use Swoole\Http2\Response;

/**
* @method int send(Request $request)
Expand All @@ -27,11 +25,13 @@
*/
class BaseClient
{
private ?GrpcClient $grpcClient = null;
protected ?GrpcClient $grpcClient = null;

private bool $initialized = false;
protected bool $initialized = false;

public function __construct(private string $hostname, private array $options = [])
protected string $service = '';

public function __construct(protected string $hostname, protected array $options = [])
{
}

Expand Down Expand Up @@ -62,6 +62,30 @@ public function _getGrpcClient(): GrpcClient
return $this->grpcClient;
}

public function request(string $method, Message $argument, string $class, array $headers = []): Response
{
$streamId = retry($this->options['retry_attempts'] ?? 3, function () use ($method, $argument, $headers) {
$streamId = $this->send($this->buildRequest($this->path($method), $argument, $headers));
if ($streamId <= 0) {
$this->init();
// The client should not be used after this exception
throw new GrpcClientException('Failed to send the request to server', StatusCode::INTERNAL);
}
return $streamId;
}, $this->options['retry_interval'] ?? 100);
return Parser::parseResponse($this->recv($streamId), [$class, 'decode']);
}

public function path(string $method): string
{
return $this->service . $method;
}

public function url(string $method): string
{
return $this->hostname . $this->path($method);
}

/**
* Call a remote method that takes a single argument and has a
* single output.
Expand All @@ -70,26 +94,24 @@ public function _getGrpcClient(): GrpcClient
* @param Message $argument The argument to the method
* @param callable $deserialize A function that deserializes the response
* @throws GrpcClientException
* @return array|\Google\Protobuf\Internal\Message[]|Response[]
* @return array<int, null|int|Message|string|\Swoole\Http2\Response>
*/
protected function _simpleRequest(
string $method,
Message $argument,
$deserialize,
array $metadata = [],
array $options = []
) {
$options['headers'] = ($options['headers'] ?? []) + $metadata;
$streamId = retry($this->options['retry_attempts'] ?? 3, function () use ($method, $argument, $options) {
$streamId = $this->send($this->buildRequest($method, $argument, $options));
if ($streamId <= 0) {
$this->init();
// The client should not be used after this exception
throw new GrpcClientException('Failed to send the request to server', StatusCode::INTERNAL);
): array {
try {
$response = $this->request($method, $argument, $deserialize[0], ($options['headers'] ?? []) + $metadata);
return [$response->message, 0, $response->rawResponse];
} catch (GrpcClientException $exception) {
if ($exception->getMessage() === 'Failed to send the request to server') {
throw $exception;
}
return $streamId;
}, $this->options['retry_interval'] ?? 100);
return Parser::parseResponse($this->recv($streamId), $deserialize);
return [$exception->getMessage(), $exception->getCode(), null];
}
}

/**
Expand All @@ -109,7 +131,7 @@ protected function _clientStreamRequest(
): ClientStreamingCall {
$call = new ClientStreamingCall();
$call->setClient($this->_getGrpcClient())
->setMethod($method)
->setMethod($this->path($method))
->setDeserialize($deserialize)
->setMetadata($metadata);

Expand Down Expand Up @@ -163,13 +185,7 @@ protected function _bidiRequest(
return $call;
}

private function start()
{
$client = $this->grpcClient;
return $client->isRunning() || $client->start();
}

private function init()
protected function init()
{
if (! empty($this->options['client'])) {
if (! ($this->options['client'] instanceof GrpcClient)) {
Expand All @@ -192,9 +208,14 @@ private function init()
$this->initialized = true;
}

private function buildRequest(string $method, Message $argument, array $options): Request
protected function buildRequest(string $path, Message $argument, array $headers): Request
{
$headers = $options['headers'] ?? [];
return new Request($method, $argument, $headers);
return new Request($path, $argument, $headers);
}

private function start()
{
$client = $this->grpcClient;
return $client->isRunning() || $client->start();
}
}
36 changes: 36 additions & 0 deletions src/grpc-client/src/Parser.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient;

use Hyperf\Grpc\StatusCode;
use Hyperf\GrpcClient\Exception\GrpcClientException;
use Swoole\Http2\Response as SwooleResponse;

class Parser extends \Hyperf\Grpc\Parser
{
public static function parseResponse(?SwooleResponse $response, mixed $deserialize): Response
{
if (! $response || empty($response->data)) {
throw new GrpcClientException('No Response', StatusCode::INTERNAL);
}
if ($response->statusCode !== 200) {
throw new GrpcClientException('Http Code ' . $response->statusCode, StatusCode::HTTP_GRPC_STATUS_MAPPING[$response->statusCode] ?? StatusCode::UNKNOWN);
}
$code = (int) ($response->headers['grpc-status'] ?? 0);
if ($code !== 0) {
throw new GrpcClientException($response->headers['grpc-message'] ?? '', $code);
}
$data = $response->data;
$reply = static::deserializeMessage($deserialize, $data);
return new Response($reply, $response);
}
}
6 changes: 3 additions & 3 deletions src/grpc-client/src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ class Request extends BaseRequest
*/
public $usePipelineRead;

public function __construct(string $method, Message $argument = null, $headers = [])
public function __construct(string $path, Message $argument = null, $headers = [])
{
$this->method = 'POST';
$this->headers = array_replace($this->getDefaultHeaders(), $headers);
$this->path = $method;
$this->path = $path;
$argument && $this->data = Parser::serializeMessage($argument);
}

Expand All @@ -42,7 +42,7 @@ public function getDefaultHeaders(): array
];
}

private function buildDefaultUserAgent(): string
protected function buildDefaultUserAgent(): string
{
$userAgent = 'grpc-php-hyperf/1.0';
$grpcClientVersion = Package::getPrettyVersion('hyperf/grpc-client');
Expand Down
33 changes: 33 additions & 0 deletions src/grpc-client/src/Response.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\GrpcClient;

use Google\Protobuf\Internal\Message;
use Swoole\Http2\Response as RawResponse;

class Response
{
public Message $message;

public RawResponse $rawResponse;

public function __construct(Message $message, RawResponse $rawResponse)
{
$this->message = $message;
$this->rawResponse = $rawResponse;
}

public function __call($name, $arguments)
{
return $this->message->{$name}(...$arguments);
}
}
42 changes: 15 additions & 27 deletions src/grpc-client/src/StreamingCall.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,22 @@
*/
namespace Hyperf\GrpcClient;

use Hyperf\Grpc\Parser;
use Google\Protobuf\GPBEmpty;
use Hyperf\Grpc\StatusCode;
use Hyperf\GrpcClient\Exception\GrpcClientException;
use RuntimeException;

class StreamingCall
{
/**
* @var GrpcClient
*/
protected $client;

/**
* @var string
*/
protected $method = '';

/**
* @var mixed
*/
protected $deserialize;

/**
* @var int
*/
protected $streamId = 0;

/**
* @var array
*/
protected $metadata;
protected ?GrpcClient $client;

protected string $method = '';

protected mixed $deserialize;

protected int $streamId = 0;

protected array $metadata;

public function setClient(GrpcClient $client): self
{
Expand Down Expand Up @@ -132,10 +117,13 @@ public function recv(float $timeout = -1.0)
// server ended the stream
if ($recv->pipeline === false) {
$this->streamId = 0;
return [null, 0, $recv];
return new Response(new GPBEmpty(), $recv);
}

return Parser::parseResponse($recv, $this->deserialize);
if ($recv->statusCode !== 0) {
return Parser::parseResponse($recv, $this->deserialize);
}
return new Response(Parser::deserializeMessage($this->deserialize, $recv->data), $recv);
}

public function end(): void
Expand Down
33 changes: 20 additions & 13 deletions src/grpc-client/tests/RouteGuideClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*/
namespace HyperfTest\GrpcClient;

use Google\Protobuf\GPBEmpty;
use Google\Protobuf\Internal\Message;
use Hyperf\Di\Container;
use Hyperf\GrpcClient\Exception\GrpcClientException;
use Hyperf\GrpcClient\StreamingCall;
Expand Down Expand Up @@ -67,17 +69,19 @@ public function testGrpcRouteGuideListFeatures()
/** @var StreamingCall $call */
$call = $client->listFeatures();
$call->send($rect);
[$feature,] = $call->recv();
$this->assertEquals('Patriots Path, Mendham, NJ 07945, USA', $feature->getName());
[$feature,, $response] = $call->recv();
$this->assertEquals('101 New Jersey 10, Whippany, NJ 07981, USA', $feature->getName());
[,$status] = $call->recv();
$this->assertEquals(0, $status);
$result[0] = true;
while ($result[0] !== null) {
$result = $call->recv();
$response = $call->recv();
$this->assertEquals('Patriots Path, Mendham, NJ 07945, USA', $response->message->getName());
$response = $call->recv();
$this->assertEquals('101 New Jersey 10, Whippany, NJ 07981, USA', $response->message->getName());
$response = $call->recv();
$this->assertTrue($response->message instanceof Message);
while (true) {
$response = $call->recv();
if ($response->message instanceof GPBEmpty) {
break;
}
}
$this->assertFalse($result[2]->pipeline);
$this->assertFalse($response->rawResponse->pipeline);
}

public function testGrpcRouteGuideRecordRoute()
Expand All @@ -97,7 +101,8 @@ public function testGrpcRouteGuideRecordRoute()
$call->push($second);
$call->end();
/** @var RouteSummary $summary */
[$summary,] = $call->recv();
$response = $call->recv();
$summary = $response->message;
$this->assertEquals(2, $summary->getPointCount());
}

Expand Down Expand Up @@ -129,12 +134,14 @@ public function testGrpcRouteGuideRouteChat()
$call->recv(1);
$call->push($firstNote);
/** @var RouteNote $note */
[$note,] = $call->recv();
$response = $call->recv();
$note = $response->message;
$this->assertEquals($first->getLatitude(), $note->getLocation()->getLatitude());

$call->push($secondNote);
$call->push($secondNote);
[$note,] = $call->recv();
$response = $call->recv();
$note = $response->message;
$this->assertEquals($second->getLatitude(), $note->getLocation()->getLatitude());
}
}