Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed memory exhausted for rabbitmq caused by confirm channel. #3657

Merged
merged 6 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG-2.2.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
- [#3606](https://github.com/hyperf/hyperf/pull/3606) Added RPN component.
- [#3629](https://github.com/hyperf/hyperf/pull/3629) Added `Hyperf\Utils\Channel\ChannelManager` which used to manage channels.
- [#3631](https://github.com/hyperf/hyperf/pull/3631) Support multiplexing for AMQP component.
- [#3639](https://github.com/hyperf/hyperf/pull/3639) Close push channel and socket when worker exited.
- [#3640](https://github.com/hyperf/hyperf/pull/3640) Optimized log level for SwooleIO.
- [#3657](https://github.com/hyperf/hyperf/pull/3657) Fixed memory exhausted for rabbitmq caused by confirm channel.
- [#3635](https://github.com/hyperf/hyperf/pull/3635) Added `Hyperf\Utils\CodeGen\PhpParser` which used to generate AST for reflection.
- [#3648](https://github.com/hyperf/hyperf/pull/3648) Added `Hyperf\Utils\CodeGen\PhpDocReaderManager` to manage `PhpDocReader`.

Expand Down
1 change: 1 addition & 0 deletions src/amqp/publish/amqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'heartbeat' => 3,
'channel_rpc_timeout' => 0.0,
'close_on_destruct' => false,
'max_idle_channels' => 10,
],
],
];
145 changes: 128 additions & 17 deletions src/amqp/src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
*/
namespace Hyperf\Amqp;

use Hyperf\Amqp\IO\SwooleIO;
use Hyperf\Engine\Channel;
use Hyperf\Utils\Channel\ChannelManager;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Psr\Log\LoggerInterface;

Expand All @@ -26,11 +30,6 @@ class AMQPConnection extends AbstractConnection

public const CONFIRM_CHANNEL_POOL_LENGTH = 10000;

/**
* @var bool
*/
public $isBroken = false;

/**
* @var Channel
*/
Expand All @@ -52,9 +51,25 @@ class AMQPConnection extends AbstractConnection
protected $lastChannelId = 0;

/**
* @param null $login_response @deprecated
* @param AbstractIO $io
* @var null|Params
*/
protected $params;

/**
* @var bool
*/
protected $loop = false;

/**
* @var bool
*/
protected $enableHeartbeat = false;

/**
* @var ChannelManager
*/
protected $channelManager;

public function __construct(
string $user,
string $password,
Expand All @@ -68,15 +83,20 @@ public function __construct(
int $connection_timeout = 0,
float $channel_rpc_timeout = 0.0
) {
$this->channelManager = new ChannelManager(16);
$this->channelManager->get(0, true);

parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout);

$this->pool = new Channel(static::CHANNEL_POOL_LENGTH);
$this->confirmPool = new Channel(static::CONFIRM_CHANNEL_POOL_LENGTH);
Coroutine::create(function () {
if ($this->io instanceof SwooleIO) {
$this->isBroken = $this->io->isBroken();
}
});
}

public function write($data)
{
$this->loop();

parent::write($data);
}

/**
Expand All @@ -85,9 +105,16 @@ public function __construct(
public function setLogger(?LoggerInterface $logger)
{
$this->logger = $logger;
if ($this->io instanceof SwooleIO) {
$this->io->setLogger($logger);
}

return $this;
}

/**
* @return static
*/
public function setParams(Params $params)
{
$this->params = $params;
return $this;
}

Expand All @@ -110,11 +137,19 @@ public function getChannel(): AMQPChannel
return $this->channel($id);
}

public function channel($channel_id = null)
{
$this->channelManager->close($channel_id);
$this->channelManager->get($channel_id, true);

return parent::channel($channel_id); // TODO: Change the autogenerated stub
}

public function getConfirmChannel(): AMQPChannel
{
$id = 0;
$confirm = false;
if (! $this->pool->isEmpty()) {
if (! $this->confirmPool->isEmpty()) {
$id = (int) $this->confirmPool->pop(0.001);
}

Expand All @@ -131,6 +166,14 @@ public function getConfirmChannel(): AMQPChannel

public function releaseChannel(AMQPChannel $channel, bool $confirm = false): void
{
if ($this->params) {
$length = $confirm ? $this->confirmPool->getLength() : $this->pool->getLength();
if ($length > $this->params->getMaxIdleChannels()) {
$channel->close();
return;
}
}

if ($confirm) {
$this->confirmPool->push($channel->getChannelId());
} else {
Expand All @@ -149,4 +192,72 @@ protected function makeChannelId(): int

throw new AMQPRuntimeException('No free channel ids');
}

protected function loop(): void
{
$this->heartbeat();

if ($this->loop) {
return;
}

$this->loop = true;

Coroutine::create(function () {
try {
while (true) {
[$frame_type, $channel, $payload] = $this->wait_frame(0);
$this->channelManager->get($channel)->push([$frame_type, $payload], 0.001);
}
} catch (\Throwable $exception) {
$this->logger && $this->logger->error('Recv loop broken. The reason is ' . (string) $exception);
} finally {
$this->loop = false;
$this->close();
}
});
}

protected function wait_channel($channel_id, $timeout = 0)
{
$chan = $this->channelManager->get($channel_id);

$data = $chan->pop($timeout);
if ($data === false) {
if ($chan->isTimeout()) {
throw new AMQPTimeoutException('Timeout waiting on channel');
}
}

return $data;
}

protected function heartbeat(): void
{
if (! $this->enableHeartbeat && $this->getHeartbeat() > 0) {
$this->enableHeartbeat = true;

Coroutine::create(function () {
while (true) {
if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($this->getHeartbeat())) {
break;
}

try {
// PING
if ($this->isConnected()) {
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->getIO()->write($pkt->getvalue());
}
} catch (\Throwable $exception) {
$this->logger && $this->logger->error((string) $exception);
}
}
});
}
}
}
33 changes: 19 additions & 14 deletions src/amqp/src/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ public function refresh(string $pool)
$count = $config['pool']['connections'] ?? 1;

if (Locker::lock(static::class)) {
for ($i = 0; $i < $count; ++$i) {
$connection = $this->make($config);
$this->connections[$pool][] = $connection;
try {
for ($i = 0; $i < $count; ++$i) {
$connection = $this->make($config);
$this->connections[$pool][] = $connection;
}
} finally {
Locker::unlock(static::class);
}
Locker::unlock(static::class);
}
}

Expand All @@ -61,13 +64,16 @@ public function getConnection(string $pool): AMQPConnection
if (! empty($this->connections[$pool])) {
$index = array_rand($this->connections[$pool]);
$connection = $this->connections[$pool][$index];
if ($connection->isBroken) {
if (! $connection->isConnected()) {
if (Locker::lock(static::class . 'getConnection')) {
unset($this->connections[$pool][$index]);
$connection->close();
$connection = $this->make($this->getConfig($pool));
$this->connections[$pool][] = $connection;
Locker::unlock(static::class . 'getConnection');
try {
unset($this->connections[$pool][$index]);
$connection->close();
$connection = $this->make($this->getConfig($pool));
$this->connections[$pool][] = $connection;
} finally {
Locker::unlock(static::class . 'getConnection');
}
} else {
return $this->getConnection($pool);
}
Expand All @@ -92,9 +98,7 @@ public function make(array $config): AMQPConnection
$io = new SwooleIO(
$host,
$port,
$params->getConnectionTimeout(),
$params->getReadWriteTimeout(),
$params->getHeartbeat()
$params->getConnectionTimeout()
);

$connection = new AMQPConnection(
Expand All @@ -111,7 +115,8 @@ public function make(array $config): AMQPConnection
$params->getChannelRpcTimeout()
);

return $connection->setLogger($this->container->get(StdoutLoggerInterface::class));
return $connection->setParams($params)
->setLogger($this->container->get(StdoutLoggerInterface::class));
}

protected function getConfig(string $pool): array
Expand Down
Loading