From b0d73b88f849431870b92c5c37e97c2bdb7c4550 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Fri, 22 Apr 2022 15:56:42 +0200 Subject: [PATCH] Nested pipeline (#225) --- src/Flow/ETL/DataFrame.php | 8 +++ .../ETL/Monitoring/Memory/Consumption.php | 18 ++++++- src/Flow/ETL/Monitoring/Memory/Unit.php | 5 ++ src/Flow/ETL/Pipeline/NestedPipeline.php | 49 +++++++++++++++++++ .../Unit/Pipeline/NestedPipelineTest.php | 46 +++++++++++++++++ 5 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 src/Flow/ETL/Pipeline/NestedPipeline.php create mode 100644 tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php diff --git a/src/Flow/ETL/DataFrame.php b/src/Flow/ETL/DataFrame.php index 0783941d5..34e4b8a7b 100644 --- a/src/Flow/ETL/DataFrame.php +++ b/src/Flow/ETL/DataFrame.php @@ -14,6 +14,7 @@ use Flow\ETL\Loader\SchemaValidationLoader; use Flow\ETL\Pipeline\CollectingPipeline; use Flow\ETL\Pipeline\GroupByPipeline; +use Flow\ETL\Pipeline\NestedPipeline; use Flow\ETL\Pipeline\ParallelizingPipeline; use Flow\ETL\Pipeline\VoidPipeline; use Flow\ETL\Row\Schema; @@ -229,6 +230,13 @@ public function parallelize(int $chunks) : self return $this; } + public function pipeline(Pipeline $pipeline) : self + { + $this->pipeline = new NestedPipeline($this->pipeline, $pipeline); + + return $this; + } + public function rename(string $from, string $to) : self { $this->pipeline->add(Transform::rename($from, $to)); diff --git a/src/Flow/ETL/Monitoring/Memory/Consumption.php b/src/Flow/ETL/Monitoring/Memory/Consumption.php index 55616f5a4..0c24fe4d4 100644 --- a/src/Flow/ETL/Monitoring/Memory/Consumption.php +++ b/src/Flow/ETL/Monitoring/Memory/Consumption.php @@ -8,14 +8,30 @@ final class Consumption { private readonly Unit $initial; + private Unit $max; + + private Unit $min; + public function __construct() { $this->initial = Unit::fromBytes(\memory_get_usage()); + $this->min = $this->initial; + $this->max = $this->initial; } public function current() : Unit { - return Unit::fromBytes(\memory_get_usage()); + $current = Unit::fromBytes(\memory_get_usage()); + + if ($current->isGreaterThan($this->max)) { + $this->max = $current; + } + + if ($current->isLowerThan($this->min)) { + $this->min = $current; + } + + return $current; } public function currentDiff() : Unit diff --git a/src/Flow/ETL/Monitoring/Memory/Unit.php b/src/Flow/ETL/Monitoring/Memory/Unit.php index 824467f7d..2ede82a0b 100644 --- a/src/Flow/ETL/Monitoring/Memory/Unit.php +++ b/src/Flow/ETL/Monitoring/Memory/Unit.php @@ -94,6 +94,11 @@ public function isGreaterThan(self $unit) : bool return $this->bytes > $unit->bytes; } + public function isLowerThan(self $unit) : bool + { + return $this->bytes < $unit->bytes; + } + public function percentage(int $value) : self { return new self((int) \round(($value / 100) * $this->bytes)); diff --git a/src/Flow/ETL/Pipeline/NestedPipeline.php b/src/Flow/ETL/Pipeline/NestedPipeline.php new file mode 100644 index 000000000..90e49a9b1 --- /dev/null +++ b/src/Flow/ETL/Pipeline/NestedPipeline.php @@ -0,0 +1,49 @@ +nextPipeline->add($pipe); + + return $this; + } + + public function cleanCopy() : Pipeline + { + return new self( + $this->currentPipeline->cleanCopy(), + $this->nextPipeline->cleanCopy(), + ); + } + + public function process(Config $config) : \Generator + { + foreach ($this->nextPipeline->source(new Extractor\PipelineExtractor($this->currentPipeline, $config))->process($config) as $rows) { + yield $rows; + } + } + + public function source(Extractor $extractor) : Pipeline + { + $this->currentPipeline->source($extractor); + + return $this; + } +} diff --git a/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php b/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php new file mode 100644 index 000000000..841880bb5 --- /dev/null +++ b/tests/Flow/ETL/Tests/Unit/Pipeline/NestedPipelineTest.php @@ -0,0 +1,46 @@ +add(Transform::add_boolean('active', true)), + new ParallelizingPipeline(new SynchronousPipeline(), 1) + ); + + $pipeline->source(new ProcessExtractor( + new Rows( + Row::create(Entry::integer('id', 1)), + Row::create(Entry::integer('id', 2)) + ) + )); + + $this->assertEquals( + [ + new Rows( + Row::create(Entry::integer('id', 1), Entry::boolean('active', true)), + ), + new Rows( + Row::create(Entry::integer('id', 2), Entry::boolean('active', true)) + ), + ], + \iterator_to_array($pipeline->process(Config::default())) + ); + } +}