Skip to content

Commit

Permalink
Added async support.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bilge committed Nov 19, 2019
1 parent dc8c207 commit f127e71
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 10 deletions.
2 changes: 2 additions & 0 deletions composer.json
Expand Up @@ -9,6 +9,8 @@
],
"require": {
"ext-simplexml": "*",
"amphp/amp": "^2.1",
"amphp/parallel": "^1",
"connectors/http": "dev-5.0-rc",
"connectors/ssl": "dev-5.0-rc",
"scriptfusion/porter": "dev-master as 5"
Expand Down
15 changes: 13 additions & 2 deletions src/Provider/EuropeanCentralBankProvider.php
Expand Up @@ -3,21 +3,32 @@

namespace ScriptFUSION\Porter\Provider\EuropeanCentralBank\Provider;

use ScriptFUSION\Porter\Connector\AsyncConnector;
use ScriptFUSION\Porter\Connector\Connector;
use ScriptFUSION\Porter\Net\Http\AsyncHttpConnector;
use ScriptFUSION\Porter\Net\Http\HttpConnector;
use ScriptFUSION\Porter\Provider\AsyncProvider;
use ScriptFUSION\Porter\Provider\Provider;

final class EuropeanCentralBankProvider implements Provider
final class EuropeanCentralBankProvider implements Provider, AsyncProvider
{
private $connector;

public function __construct(HttpConnector $connector = null)
private $asyncConnector;

public function __construct(Connector $connector = null, AsyncConnector $asyncConnector = null)
{
$this->connector = $connector ?: new HttpConnector;
$this->asyncConnector = $asyncConnector ?: new AsyncHttpConnector;
}

public function getConnector(): Connector
{
return $this->connector;
}

public function getAsyncConnector(): AsyncConnector
{
return $this->asyncConnector;
}
}
44 changes: 39 additions & 5 deletions src/Provider/Resource/DailyForexRates.php
Expand Up @@ -3,13 +3,20 @@

namespace ScriptFUSION\Porter\Provider\EuropeanCentralBank\Provider\Resource;

use Amp\Deferred;
use Amp\Iterator;
use Amp\Producer;
use ScriptFUSION\Porter\Connector\ImportConnector;
use ScriptFUSION\Porter\Net\Http\AsyncHttpDataSource;
use ScriptFUSION\Porter\Net\Http\HttpDataSource;
use ScriptFUSION\Porter\Net\Http\HttpResponse;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Provider\EuropeanCentralBankProvider;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Records\AsyncCurrencyRecords;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Records\CurrencyRecords;
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;

class DailyForexRates implements ProviderResource
class DailyForexRates implements ProviderResource, AsyncResource
{
private const URL = 'http://www.ecb.europa.eu/stats/eurofxref/eurofxref-daily.xml';

Expand All @@ -20,11 +27,38 @@ public function getProviderClassName(): string

public function fetch(ImportConnector $connector): \Iterator
{
$xmlString = $connector->fetch(new HttpDataSource(self::URL));
$xml = simplexml_load_string((string)$xmlString);
$response = $connector->fetch(new HttpDataSource(self::URL));
[$date, $rates, $currencies] = self::parseResponse($response);

return new CurrencyRecords($currencies(), $date, count($rates), $this);
}

public function fetchAsync(ImportConnector $connector): Iterator
{
$dateDeferred = new Deferred;
$totalRatesDeferred = new Deferred;

return new AsyncCurrencyRecords(new Producer(
static function (\Closure $emit) use ($connector, $dateDeferred, $totalRatesDeferred): \Generator {
$response = yield $connector->fetchAsync(new AsyncHttpDataSource(self::URL));
[$date, $rates, $currencies] = self::parseResponse($response);

$dateDeferred->resolve($date);
$totalRatesDeferred->resolve(count($rates));

foreach ($currencies() as $currency) {
yield $emit($currency);
}
}
), $dateDeferred->promise(), $totalRatesDeferred->promise(), $this);
}

private static function parseResponse(HttpResponse $response): array
{
$xml = simplexml_load_string((string)$response);

$ratesContainer = $xml->Cube->Cube;
$date = (string)$ratesContainer['time'];
$date = new \DateTimeImmutable((string)$ratesContainer['time']);

$rates = $ratesContainer->Cube;
$currencies = static function () use ($rates): \Generator {
Expand All @@ -37,6 +71,6 @@ public function fetch(ImportConnector $connector): \Iterator
}
};

return new CurrencyRecords($currencies(), new \DateTimeImmutable($date), count($rates), $this);
return [$date, $rates, $currencies];
}
}
63 changes: 63 additions & 0 deletions src/Records/AsyncCurrencyRecords.php
@@ -0,0 +1,63 @@
<?php
declare(strict_types=1);

namespace ScriptFUSION\Porter\Provider\EuropeanCentralBank\Records;

use Amp\Iterator;
use Amp\Promise;
use ScriptFUSION\Porter\Collection\AsyncProviderRecords;
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
use function Amp\call;

class AsyncCurrencyRecords extends AsyncProviderRecords
{
private $date;

private $count;

/**
* @param Iterator $providerRecords
* @param Promise<\DateTimeImmutable> $date
* @param Promise<int> $count
* @param AsyncResource $resource
*/
public function __construct(
Iterator $providerRecords,
Promise $date,
Promise $count,
AsyncResource $resource
) {
parent::__construct($providerRecords, $resource);

$this->date = $date;
$this->count = $count;
}

/**
* @return Promise<\DateTimeImmutable>
*/
public function getDate(): Promise
{
return $this->date;
}

/**
* @return Promise<int>
*/
public function getCount(): Promise
{
return $this->count;
}

/**
* Converts the records to an associative array.
*
* @return Promise<float[]> Currency code as key and exchange rate as value.
*/
public function toAssociativeArray(): Promise
{
return call(function (): \Generator {
return array_column(yield Iterator\toArray($this), 'rate', 'currency');
});
}
}
46 changes: 43 additions & 3 deletions test/DailyForexRatesTest.php
Expand Up @@ -3,19 +3,27 @@

namespace ScriptFUSIONTest\Porter\Provider\EuropeanCentralBank;

use Amp\Loop;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use ScriptFUSION\Porter\Porter;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Provider\EuropeanCentralBankProvider;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Provider\Resource\DailyForexRates;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Records\AsyncCurrencyRecords;
use ScriptFUSION\Porter\Provider\EuropeanCentralBank\Records\CurrencyRecords;
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
use ScriptFUSION\Porter\Specification\ImportSpecification;

final class DailyForexRatesTest extends TestCase
{
public function test()
/** @var Porter */
private $porter;

protected function setUp()
{
$porter = new Porter(
parent::setUp();

$this->porter = new Porter(
\Mockery::mock(ContainerInterface::class)
->shouldReceive('has')
->with(EuropeanCentralBankProvider::class)
Expand All @@ -25,7 +33,11 @@ public function test()
->andReturn(new EuropeanCentralBankProvider)
->getMock()
);
$fxRates = $porter->import(new ImportSpecification(new DailyForexRates));
}

public function testSync(): void
{
$fxRates = $this->porter->import(new ImportSpecification(new DailyForexRates));

/** @var CurrencyRecords $currencyRecords */
$currencyRecords = $fxRates->findFirstCollection();
Expand All @@ -48,4 +60,32 @@ public function test()
self::assertGreaterThan(0, $rate);
}
}

public function testAsync(): void
{
Loop::run(function (): \Generator {
$fxRates = $this->porter->importAsync(new AsyncImportSpecification(new DailyForexRates));

/** @var AsyncCurrencyRecords $currencyRecords */
$currencyRecords = $fxRates->findFirstCollection();

self::assertInstanceOf(\DateTimeImmutable::class, yield $currencyRecords->getDate());

// There must be at least 25 exchange rates.
self::assertGreaterThan(25, yield $currencyRecords->getCount());

$rates = yield $currencyRecords->toAssociativeArray();

// Ensure major world currencies are available.
foreach (['USD', 'GBP', 'JPY'] as $currency) {
self::assertArrayHasKey($currency, $rates);
}

// Each rate must be a non-zero, positive float.
foreach ($rates as $rate) {
self::assertInternalType('float', $rate);
self::assertGreaterThan(0, $rate);
}
});
}
}

0 comments on commit f127e71

Please sign in to comment.