Skip to content

Commit

Permalink
Refactored entire SMTP session and transaction flow
Browse files Browse the repository at this point in the history
We now have more sanity, maintainability and testability with 100% less smell
  • Loading branch information
PeeHaa committed Jan 14, 2019
1 parent cac8517 commit d9c8d02
Show file tree
Hide file tree
Showing 37 changed files with 3,359 additions and 1,547 deletions.
167 changes: 23 additions & 144 deletions src/Transaction/Processor/ExtensionNegotiation.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,18 @@
use Amp\Promise;
use HarmonyIO\SmtpClient\Buffer;
use HarmonyIO\SmtpClient\ClientAddress\Address;
use HarmonyIO\SmtpClient\Exception\Smtp\TransmissionChannelClosed;
use HarmonyIO\SmtpClient\Log\Output;
use HarmonyIO\SmtpClient\Socket;
use HarmonyIO\SmtpClient\Transaction\Command\Ehlo;
use HarmonyIO\SmtpClient\Transaction\Command\Helo;
use HarmonyIO\SmtpClient\Transaction\Extension\Collection;
use HarmonyIO\SmtpClient\Transaction\Processor\ExtensionNegotiation\ProcessEhlo;
use HarmonyIO\SmtpClient\Transaction\Processor\ExtensionNegotiation\ProcessExtensions;
use HarmonyIO\SmtpClient\Transaction\Processor\ExtensionNegotiation\ProcessHelo;
use HarmonyIO\SmtpClient\Transaction\Reply\Factory;
use HarmonyIO\SmtpClient\Transaction\Reply\PermanentNegativeCompletion;
use HarmonyIO\SmtpClient\Transaction\Reply\PositiveCompletion;
use HarmonyIO\SmtpClient\Transaction\Reply\Reply;
use HarmonyIO\SmtpClient\Transaction\Reply\TransientNegativeCompletion;
use HarmonyIO\SmtpClient\Transaction\Status\ExtensionNegotiation as Status;
use function Amp\call;

final class ExtensionNegotiation implements Processor
{
private const AVAILABLE_COMMANDS = [
Status::SENT_EHLO => [
PositiveCompletion::class,
TransientNegativeCompletion::class,
PermanentNegativeCompletion::class,
],
Status::PROCESSING_EXTENSION_LIST => [
PositiveCompletion::class,
TransientNegativeCompletion::class,
PermanentNegativeCompletion::class,
],
Status::SENT_HELO => [
PositiveCompletion::class,
TransientNegativeCompletion::class,
PermanentNegativeCompletion::class,
],
];

/** @var Status */
private $currentStatus;

/** @var Factory */
private $replyFactory;

Expand Down Expand Up @@ -74,123 +49,27 @@ public function __construct(
public function process(Buffer $buffer): Promise
{
return call(function () use ($buffer) {
$this->sendEhlo();

while ($this->currentStatus->getValue() !== Status::COMPLETED) {
$this->processReply(yield $buffer->readLine());
$status = new Status(Status::START_PROCESS);

$processors = [
new ProcessEhlo($this->replyFactory, $this->logger, $this->connection, $this->clientAddress),
new ProcessHelo($this->replyFactory, $this->logger, $this->connection, $this->clientAddress),
new ProcessExtensions($this->replyFactory, $this->logger, $this->extensionCollection),
];

/** @var Processor $processor */
foreach ($processors as $processor) {
if (get_class($processor) === ProcessHelo::class && $status->getValue() !== Status::SEND_HELO) {
continue;
}

/** @var Status $status */
$status = yield $processor->process($buffer);

if ($status->getValue() === Status::COMPLETED) {
return;
}
}
});
}

private function processReply(string $line): void
{
$reply = $this->replyFactory->build($line, self::AVAILABLE_COMMANDS[$this->currentStatus->getValue()]);

$this->logger->debug('Server reply object: ' . get_class($reply));

switch ($this->currentStatus->getValue()) {
case Status::SENT_EHLO:
$this->processSentEhloReply($reply);
return;

case Status::PROCESSING_EXTENSION_LIST:
$this->processExtensionListReply($reply);
return;

case Status::SENT_HELO:
$this->processSentHeloReply($reply);
return;
}
}

private function processSentEhloReply(Reply $reply): void
{
switch (get_class($reply)) {
case PositiveCompletion::class:
$this->processEhloSupported($reply);
return;

case TransientNegativeCompletion::class:
$this->processClosingConnection();
return;

case PermanentNegativeCompletion::class:
$this->fallbackToHelo();
return;
}
}

private function processExtensionListReply(Reply $reply): void
{
switch (get_class($reply)) {
case PositiveCompletion::class:
$this->addExtensionIfSupported($reply);
return;

case TransientNegativeCompletion::class:
case PermanentNegativeCompletion::class:
$this->processClosingConnection();
return;
}
}

private function processSentHeloReply(Reply $reply): void
{
switch (get_class($reply)) {
case PositiveCompletion::class:
$this->processHeloSupported();
return;

case TransientNegativeCompletion::class:
case PermanentNegativeCompletion::class:
$this->processClosingConnection();
return;
}
}

private function sendEhlo(): void
{
$this->currentStatus = new Status(Status::SENT_EHLO);

$this->connection->write((string) new Ehlo($this->clientAddress));
}

private function processEhloSupported(Reply $reply): void
{
if ($reply->isLastLine()) {
$this->currentStatus = new Status(Status::COMPLETED);

return;
}

$this->currentStatus = new Status(Status::PROCESSING_EXTENSION_LIST);
}

private function processClosingConnection(): void
{
throw new TransmissionChannelClosed();
}

private function fallbackToHelo(): void
{
$this->currentStatus = new Status(Status::SENT_HELO);

$this->connection->write((string) new Helo($this->clientAddress));
}

private function processHeloSupported(): void
{
$this->currentStatus = new Status(Status::COMPLETED);
}

private function addExtensionIfSupported(Reply $reply): void
{
if ($reply->isLastLine()) {
$this->currentStatus = new Status(Status::COMPLETED);

return;
}

$this->extensionCollection->enable($reply);
}
}
118 changes: 118 additions & 0 deletions src/Transaction/Processor/ExtensionNegotiation/ProcessEhlo.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?php declare(strict_types=1);

namespace HarmonyIO\SmtpClient\Transaction\Processor\ExtensionNegotiation;

use Amp\Promise;
use Amp\Success;
use HarmonyIO\SmtpClient\Buffer;
use HarmonyIO\SmtpClient\ClientAddress\Address;
use HarmonyIO\SmtpClient\Exception\Smtp\TransmissionChannelClosed;
use HarmonyIO\SmtpClient\Log\Output;
use HarmonyIO\SmtpClient\Socket;
use HarmonyIO\SmtpClient\Transaction\Command\Ehlo;
use HarmonyIO\SmtpClient\Transaction\Processor\Processor;
use HarmonyIO\SmtpClient\Transaction\Reply\Factory;
use HarmonyIO\SmtpClient\Transaction\Reply\PermanentNegativeCompletion;
use HarmonyIO\SmtpClient\Transaction\Reply\PositiveCompletion;
use HarmonyIO\SmtpClient\Transaction\Reply\Reply;
use HarmonyIO\SmtpClient\Transaction\Reply\TransientNegativeCompletion;
use HarmonyIO\SmtpClient\Transaction\Status\ExtensionNegotiation as Status;
use function Amp\call;

class ProcessEhlo implements Processor
{
private const ALLOWED_REPLIES = [
PositiveCompletion::class,
TransientNegativeCompletion::class,
PermanentNegativeCompletion::class,
];

/** @var Status */
private $currentStatus;

/** @var Factory */
private $replyFactory;

/** @var Output */
private $logger;

/** @var Socket */
private $connection;

/** @var Address */
private $clientAddress;

public function __construct(Factory $replyFactory, Output $logger, Socket $connection, Address $clientAddress)
{
$this->replyFactory = $replyFactory;
$this->logger = $logger;
$this->connection = $connection;
$this->clientAddress = $clientAddress;
}

public function process(Buffer $buffer): Promise
{
return call(function () use ($buffer) {
yield $this->sendEhlo();

while ($this->currentStatus->getValue() === Status::SENT_EHLO) {
$line = yield $buffer->readLine();
$reply = $this->replyFactory->build($line, self::ALLOWED_REPLIES);

$this->logger->debug('Server reply object: ' . get_class($reply));

yield $this->processReply($reply);
}

return $this->currentStatus;
});
}

private function sendEhlo(): Promise
{
$this->currentStatus = new Status(Status::SENT_EHLO);

$this->connection->write((string) new Ehlo($this->clientAddress));

return new Success();
}

private function processReply(Reply $reply): Promise
{
switch (get_class($reply)) {
case PositiveCompletion::class:
return $this->processEhloSupported($reply);

case TransientNegativeCompletion::class:
return $this->processClosingConnection();

case PermanentNegativeCompletion::class:
return $this->fallbackToHelo();
}
}

private function processEhloSupported(Reply $reply): Promise
{
if ($reply->isLastLine()) {
$this->currentStatus = new Status(Status::COMPLETED);

return new Success();
}

$this->currentStatus = new Status(Status::PROCESSING_EXTENSION_LIST);

return new Success();
}

private function processClosingConnection(): Promise
{
throw new TransmissionChannelClosed();
}

private function fallbackToHelo(): Promise
{
$this->currentStatus = new Status(Status::SEND_HELO);

return new Success();
}
}
Loading

0 comments on commit d9c8d02

Please sign in to comment.