Skip to content

Commit

Permalink
Nested pipeline (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Apr 22, 2022
1 parent 3fdf072 commit b0d73b8
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/Flow/ETL/DataFrame.php
Expand Up @@ -14,6 +14,7 @@
use Flow\ETL\Loader\SchemaValidationLoader;
use Flow\ETL\Pipeline\CollectingPipeline;
use Flow\ETL\Pipeline\GroupByPipeline;
use Flow\ETL\Pipeline\NestedPipeline;
use Flow\ETL\Pipeline\ParallelizingPipeline;
use Flow\ETL\Pipeline\VoidPipeline;
use Flow\ETL\Row\Schema;
Expand Down Expand Up @@ -229,6 +230,13 @@ public function parallelize(int $chunks) : self
return $this;
}

public function pipeline(Pipeline $pipeline) : self
{
$this->pipeline = new NestedPipeline($this->pipeline, $pipeline);

return $this;
}

public function rename(string $from, string $to) : self
{
$this->pipeline->add(Transform::rename($from, $to));
Expand Down
18 changes: 17 additions & 1 deletion src/Flow/ETL/Monitoring/Memory/Consumption.php
Expand Up @@ -8,14 +8,30 @@ final class Consumption
{
private readonly Unit $initial;

private Unit $max;

private Unit $min;

public function __construct()
{
$this->initial = Unit::fromBytes(\memory_get_usage());
$this->min = $this->initial;
$this->max = $this->initial;
}

public function current() : Unit
{
return Unit::fromBytes(\memory_get_usage());
$current = Unit::fromBytes(\memory_get_usage());

if ($current->isGreaterThan($this->max)) {
$this->max = $current;
}

if ($current->isLowerThan($this->min)) {
$this->min = $current;
}

return $current;
}

public function currentDiff() : Unit
Expand Down
5 changes: 5 additions & 0 deletions src/Flow/ETL/Monitoring/Memory/Unit.php
Expand Up @@ -94,6 +94,11 @@ public function isGreaterThan(self $unit) : bool
return $this->bytes > $unit->bytes;
}

public function isLowerThan(self $unit) : bool
{
return $this->bytes < $unit->bytes;
}

public function percentage(int $value) : self
{
return new self((int) \round(($value / 100) * $this->bytes));
Expand Down
49 changes: 49 additions & 0 deletions src/Flow/ETL/Pipeline/NestedPipeline.php
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Pipeline;

use Flow\ETL\Config;
use Flow\ETL\Extractor;
use Flow\ETL\Loader;
use Flow\ETL\Pipeline;
use Flow\ETL\Transformer;

final class NestedPipeline implements Pipeline
{
public function __construct(
private readonly Pipeline $currentPipeline,
private readonly Pipeline $nextPipeline
) {
}

public function add(Loader|Transformer $pipe) : Pipeline
{
$this->nextPipeline->add($pipe);

return $this;
}

public function cleanCopy() : Pipeline
{
return new self(
$this->currentPipeline->cleanCopy(),
$this->nextPipeline->cleanCopy(),
);
}

public function process(Config $config) : \Generator
{
foreach ($this->nextPipeline->source(new Extractor\PipelineExtractor($this->currentPipeline, $config))->process($config) as $rows) {
yield $rows;
}
}

public function source(Extractor $extractor) : Pipeline
{
$this->currentPipeline->source($extractor);

return $this;
}
}
46 changes: 46 additions & 0 deletions tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Pipeline;

use Flow\ETL\Config;
use Flow\ETL\DSL\Entry;
use Flow\ETL\DSL\Transform;
use Flow\ETL\Extractor\ProcessExtractor;
use Flow\ETL\Pipeline\NestedPipeline;
use Flow\ETL\Pipeline\ParallelizingPipeline;
use Flow\ETL\Pipeline\SynchronousPipeline;
use Flow\ETL\Row;
use Flow\ETL\Rows;
use PHPUnit\Framework\TestCase;

final class NestedPipelineTest extends TestCase
{
public function test_nested_pipelines() : void
{
$pipeline = new NestedPipeline(
(new SynchronousPipeline())->add(Transform::add_boolean('active', true)),
new ParallelizingPipeline(new SynchronousPipeline(), 1)
);

$pipeline->source(new ProcessExtractor(
new Rows(
Row::create(Entry::integer('id', 1)),
Row::create(Entry::integer('id', 2))
)
));

$this->assertEquals(
[
new Rows(
Row::create(Entry::integer('id', 1), Entry::boolean('active', true)),
),
new Rows(
Row::create(Entry::integer('id', 2), Entry::boolean('active', true))
),
],
\iterator_to_array($pipeline->process(Config::default()))
);
}
}

0 comments on commit b0d73b8

Please sign in to comment.