Skip to content

Commit

Permalink
Multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
ctippler committed Jun 2, 2020
1 parent d5c3a4f commit ceb1eb1
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ are available (custom settings which the user can define for runtime execution)
* [Installation & updates](./doc/installationAndUpdates.md)
* [Configuration](./doc/configuration.md)
* [How to use](./doc/usage.md)
* [How to use - Parallization](./doc/usageParallelization.md)
* [Migration from Pimcore 4 to Pimcore 5](./doc/migration.md)

***First impressions:***
Expand Down
88 changes: 88 additions & 0 deletions doc/sample/src/AppBundle/Command/MultiprocessingSampleCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

namespace AppBundle\Command;

use Pimcore\Console\AbstractCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputOption;
use Elements\Bundle\ProcessManagerBundle;
use Elements\Bundle\ProcessManagerBundle\Model\MonitoringItem;

class MultiprocessingSampleCommand extends AbstractCommand
{
use ProcessManagerBundle\ExecutionTrait;

protected function configure()
{
$this
->setName('processmanager:multiplrocessing-sample')
->setDescription('Create a set of sample products for testing.')
->addOption(
'monitoring-item-id', null,
InputOption::VALUE_REQUIRED,
"Contains the monitoring item if executed via the Pimcore backend"
)
->addOption(
'monitoring-item-parent-id', null,
InputOption::VALUE_REQUIRED,
"Contains the parent monitoring item id. If present - it is the child process"
);
}


public function execute(InputInterface $input, OutputInterface $output)
{
$monitoringItem = $this->initProcessManager($input->getOption("monitoring-item-id"), ['autoCreate' => true]);
if($input->getOption("monitoring-item-parent-id")){
$this->executeChild($input,$output,$monitoringItem); //child process
}else{
$this->executeParent($input,$output,$monitoringItem); //main process
}
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @param MonitoringItem $monitoringItem
* @throws \Exception
*/
protected function executeParent(InputInterface $input, OutputInterface $output, MonitoringItem $monitoringItem){
$monitoringItem->getLogger()->debug('Start collection data...');

$data = json_decode('[{"id":"84","fullpath":"\/PIM\/Product Tree\/Test Product 01","published":"1","creationDate":"1581108862","modificationDate":"1588164251","name":"Test Product 01","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/citroen\/auto-1515428.jpg"},{"id":"85","fullpath":"\/PIM\/Product Tree\/Test Product 02","published":"1","creationDate":"1581108863","modificationDate":"1588164259","name":"Test Product 02","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/citroen\/transport-546819.jpg"},{"id":"86","fullpath":"\/PIM\/Product Tree\/Test Product 03","published":"1","creationDate":"1581108863","modificationDate":"1588164265","name":"Test Product 03","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/dodge\/Dodge.383.magnum-black.front.view-sstvwf.JPG"},{"id":"87","fullpath":"\/PIM\/Product Tree\/Test Product 04","published":"1","creationDate":"1581108863","modificationDate":"1588164268","name":"Test Product 04","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/dodge\/dodge-charger-78260.jpg"},{"id":"88","fullpath":"\/PIM\/Product Tree\/Test Product 05","published":"1","creationDate":"1581108863","modificationDate":"1588164276","name":"Test Product 05","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/ferrari\/1962_Ferrari_250_GTE.jpg"},{"id":"89","fullpath":"\/PIM\/Product Tree\/Test Product 06","published":"1","creationDate":"1581108864","modificationDate":"1588164402","name":"Test Product 06","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/ferrari\/Inetrior_Ferrari_250_GTB_Berlinetta_SWB.jpg"},{"id":"90","fullpath":"\/PIM\/Product Tree\/Test Product 07","published":"1","creationDate":"1581108864","modificationDate":"1588164413","name":"Test Product 07 (Motor)","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/ferrari\/1990_Ferrari_Testarossa_engine.jpg"},{"id":"91","fullpath":"\/PIM\/Product Tree\/Test Product 08","published":"1","creationDate":"1581108864","modificationDate":"1588164419","name":"Test Product 08","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/citroen\/Citroen_2CV_1X7A7979.jpg"},{"id":"92","fullpath":"\/PIM\/Product Tree\/Test Product 09","published":"1","creationDate":"1581108864","modificationDate":"1588164426","name":"Test Product 09","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/citroen\/Citroen_DS_and_elephant.jpg"},{"id":"93","fullpath":"\/PIM\/Product Tree\/Test Product 10","published":"1","creationDate":"1581108864","modificationDate":"1588164433","name":"Test Product 10","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/citroen\/red-89219.jpg"},{"id":"94","fullpath":"\/PIM\/Product Tree\/Test Product 11","published":"1","creationDate":"1581108865","modificationDate":"1588164438","name":"Test Product 11","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 2","additionalCategories":"","mainImage":"\/Car Images\/citroen\/car-1679015.jpg"},{"id":"95","fullpath":"\/PIM\/Product Tree\/Test Product 12","published":"1","creationDate":"1581108865","modificationDate":"1588164443","name":"Test Product 12","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 2","additionalCategories":"","mainImage":"\/Car Images\/ferrari\/Ferrari_250_GT_Berlinetta_SWB.jpg"},{"id":"96","fullpath":"\/PIM\/Product Tree\/Test Product 13","published":"1","creationDate":"1581108865","modificationDate":"1588164448","name":"Test Product 13","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 2","additionalCategories":"","mainImage":"\/Car Images\/dodge\/field-1833347.jpg"},{"id":"97","fullpath":"\/PIM\/Product Tree\/Test Product 14","published":"1","creationDate":"1581108865","modificationDate":"1588164454","name":"Test Product 14","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/ferrari\/asphalt-automobile-automotive-1085174.jpg"},{"id":"98","fullpath":"\/PIM\/Product Tree\/Test Product 15","published":"1","creationDate":"1581108865","modificationDate":"1588164463","name":"Test Product 15","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/dodge\/charger-1833346.jpg"},{"id":"99","fullpath":"\/PIM\/Product Tree\/Test Product 16","published":"1","creationDate":"1581108866","modificationDate":"1588164476","name":"Test Product 16","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/dodge\/Charger_photo5.JPG"},{"id":"100","fullpath":"\/PIM\/Product Tree\/Test Product 17","published":"1","creationDate":"1581108866","modificationDate":"1588164539","name":"Test Product 17","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/ferrari\/Ferrari_Testarossa_-_Flickr_-_Alexandre_Pr-C3-A9vot_-285-29_-28cropped-29.jpg"},{"id":"101","fullpath":"\/PIM\/Product Tree\/Test Product 18","published":"1","creationDate":"1581108866","modificationDate":"1588164523","name":"Test Product 18","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/dodge\/car-978088.jpg"},{"id":"102","fullpath":"\/PIM\/Product Tree\/Test Product 19","published":"1","creationDate":"1581108866","modificationDate":"1588164528","name":"Test Product 19","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/citroen\/auto-1515426.jpg"},{"id":"103","fullpath":"\/PIM\/Product Tree\/Test Product 20","published":"1","creationDate":"1581108866","modificationDate":"1588164533","name":"Test Product 20","mainCategory":"object:\/PIM\/Product Category Tree\/Test Category 1","additionalCategories":"","mainImage":"\/Car Images\/dodge\/classic-car-2726200.png"}]',true);

$callback = function (MonitoringItem $childMonitoringItem){ //do some fancy stuff with the monitoring item before the job is executed
$childMonitoringItem->setActions([])->setName('PM import child :-)');
};
$this->executeChildProcesses($monitoringItem,$data,2,4,$callback);
$monitoringItem->setMessage('Finished')->save();
}

protected function executeChild(InputInterface $input, OutputInterface $output, ProcessManagerBundle\Model\MonitoringItem $monitoringItem){
$monitoringItem->setMessage('Starting child process');
$monitoringItem->getLogger()->info('Workload' . $monitoringItem->getMetaData());

$workload = json_decode($monitoringItem->getMetaData(),true);

$monitoringItem->setCurrentWorkload(0)->setTotalWorkload(count($workload))->setMessage('Processing Data')->save();
foreach($workload as $i => $data){
$object = \AppBundle\Model\DataObject\Product::getById($data['id']);

if($data['id'] == 88){
# throw new \Exception('Oh something happened with 88');
}
if($object){
$monitoringItem->setMessage('Updating object ID:' . $object->getId())->setCurrentWorkload($i+1)->save();
$object->setName($data['name'].' MID: ' . $monitoringItem->getId() ,'en');
$object->save();
sleep(1); //just for demo
}
}

$monitoringItem->setMessage('Workload processed');
}



}
21 changes: 21 additions & 0 deletions doc/usageParallelization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
### How to use - Parallelization / Multiprocessing

The Processmanager allows you to execute multiple child processes. Before you use the multiprocessing option please make sure
that you have read the [Basic usage guide](/doc/usage.md).

To get started please take a look at the [Sample Command](/doc/sample/src/AppBundle/Command/MultiprocessingSampleCommand.php) which shows how to use the feature.

When a child process is executed the parameter "--monitoring-item-parent-id" is passed so you have to support this parameter in your command.
Depending on the paremeter you can execute different methods... it's up to you.

To execute the Child processes you can use the method of the trait (or implement your own logic...)
````php
$this->executeChildProcesses($monitoringItem,$data,2,4,$callback);
````
Parameter | Description |
| ------------- |-------------|
| $monitoringItem | The monitoring item of the main process |
| $data | is the workload to process - most of the time it is just a array with entries. It is saved to the "metadata" field of the child process so you can access it later on |
| $numberOfChildProcesses | defines how much child processes can be run in parallel |
| $batchSize | defines how much entries should be processed in each child process |
| $callback | A function that can be used to modify the monitoringItem settings of the child process
11 changes: 9 additions & 2 deletions src/Controller/MonitoringItemController.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function listAction(Request $request)
$ids = $db->fetchCol('SELECT id FROM users where name LIKE '.$db->quote('%'.$f->value.'%')) ?: [0];

return ' executedByUser IN( '.implode(',', $ids).') ';
},
}
];
if ($filterCondition = QueryParams::getFilterCondition(
$request->get('filter'),
Expand All @@ -80,6 +80,13 @@ public function listAction(Request $request)
}

$condition = $list->getCondition();
if($filters = $request->get('filter')){
foreach(json_decode($filters,true) as $e){
if($e['property'] == 'id'){
$condition .= ' OR `parentId` = ' . (int)$e['value'].' ';
}
}
}

if (!$request->get('showHidden') || $request->get('showHidden') == 'false') {
$filterConditionArray = QueryParams::getFilterCondition($request->get('filter'), ['id', 'o_id', 'pid'], false, $callbacks);
Expand All @@ -92,8 +99,8 @@ public function listAction(Request $request)
$condition .= ' published=1';
}
}
$list->setCondition($condition);
}
$list->setCondition($condition);

$total = $list->getTotalCount();

Expand Down
42 changes: 40 additions & 2 deletions src/ExecutionTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ trait ExecutionTrait
{
protected $commandObject;

protected $childProcessCheckInterval = 500000; //microseconds

protected static function getCommand($options)
{
global $argv;
Expand Down Expand Up @@ -234,14 +236,50 @@ protected function executeChildProcesses(MonitoringItem $monitoringItem,array $w
$result = Helper::executeJob($monitoringItem->getConfigurationId(), $monitoringItem->getCallbackSettings(), 0,$package,$monitoringItem->getId(),$callback);

while ($monitoringItem->getChildProcessesStatus()['summary']['active'] >= $numberOfchildProcesses){ //run x processes parrallel
$monitoringItem->getLogger()->info('Waiting -> status: ' . print_r($monitoringItem->getChildProcessesStatus()['summary'],true));
sleep(1);
$monitoringItem->getLogger()->info('Waiting to start child processes -> status: ' . print_r($monitoringItem->getChildProcessesStatus()['summary'],true));
$this->childProcessCheck($monitoringItem);
usleep($this->childProcessCheckInterval);
}

if($monitoringItem->getChildProcessesStatus()['failed']){
throw new \Exception('Childs failed');
}
}

while($monitoringItem->getChildProcessesStatus()['summary']['active']){
$monitoringItem->getLogger()->info('Waiting for child processes to be finished -> status: ' . print_r($monitoringItem->getChildProcessesStatus()['summary'],true));
$this->childProcessCheck($monitoringItem);
usleep($this->childProcessCheckInterval);
}
}

protected function childProcessCheck(MonitoringItem $monitoringItem){
$statuses = $monitoringItem->getChildProcessesStatus();
if($statuses['summary']['failed']){
foreach([MonitoringItem::STATUS_RUNNING,MonitoringItem::STATUS_INITIALIZING,MonitoringItem::STATUS_UNKNOWN] as $status){
$items = $statuses['details'][$status];
foreach((array)$items as $entry){
$mItem = MonitoringItem::getById($entry['id']);

//little hack to remove console loggers if they are defined in the child processes
$loggers = $mItem->getLoggers();
foreach($loggers as $i => $e){
if($e['class'] == '\Elements\Bundle\ProcessManagerBundle\Executor\Logger\Console'){
unset($loggers[$i]);
}
}
$mItem->setLoggers($loggers);


if($mItem){
$mItem->stopProcess();
$mItem->setMessage('Killed by MonitoringItem ID '. $monitoringItem->getId(). ' because child process failed')->save();
}
}
}

throw new \Exception('Exiting because child failed: ' .print_r($statuses['details'][MonitoringItem::STATUS_FAILED],true));
}

}
}
33 changes: 29 additions & 4 deletions src/Resources/public/js/panel/monitoringItem.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,19 @@ pimcore.plugin.processmanager.panel.monitoringItem = Class.create({

var gridColumns = [];

gridColumns.push({header: "ID", width: 70, sortable: true, dataIndex: 'id', filter: 'numeric'});
gridColumns.push({header: "ID", width: 70, sortable: true, dataIndex: 'id', filter: 'numeric',
renderer: function (v,meta, record) {
var data = record.getData();
if(data.parentId){
v += '/' + data.parentId;
}
return v;
}
});
gridColumns.push({header: "Parent ID", width: 70, sortable: true, dataIndex: 'parentId', filter: 'numeric',hidden: true});

gridColumns.push({
header: "CID",
header: "Configuration ID",
width: 40,
hidden: true,
sortable: true,
Expand Down Expand Up @@ -254,16 +263,32 @@ pimcore.plugin.processmanager.panel.monitoringItem = Class.create({
sortable: false,
width: 50
});
gridColumns.push({
header: t("plugin_pm_details"),
xtype: 'actioncolumn',
width: 50,
sortable: false,
items: [
{
icon : '/bundles/pimcoreadmin/img/flat-color-icons/about.svg', // Use a URL in the icon config
tooltip: 'Details',
handler: function(grid, rowIndex, colIndex,item,e) {
var rec = grid.store.getAt(rowIndex);
new pimcore.plugin.processmanager.window.detailwindow(rec.getData());
}
}
]
});

gridColumns.push({
header: t("plugin_pm_retry"),
width: 50,
sortable: false,
renderer: function (v, x, record) {
if (record.get('retry')) {
return '<a href="#" onClick="processmanagerPlugin.monitoringItemRestart(' + record.get('id') + ')"><img src="/bundles/pimcoreadmin/img/flat-color-icons/refresh.svg" height="18" /></a>';
return '<a href="#" onClick="processmanagerPlugin.monitoringItemRestart(' + record.get('id') + ')"><img src="/bundles/pimcoreadmin/img/flat-color-icons/refresh.svg" height="18" title="Restart" /></a>';
} else {
return '<a href="#" onClick="processmanagerPlugin.monitoringItemCancel(' + record.get('id') + ')"><img src="/bundles/pimcoreadmin/img/flat-color-icons/cancel.svg" height="18" /></a>';
return '<a href="#" onClick="processmanagerPlugin.monitoringItemCancel(' + record.get('id') + ')"><img src="/bundles/pimcoreadmin/img/flat-color-icons/cancel.svg" height="18" title="Stop"/></a>';
}
return '';
}
Expand Down
8 changes: 8 additions & 0 deletions src/Resources/public/js/window/detailwindow.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ pimcore.plugin.processmanager.window.detailwindow = Class.create({
value: this.data.id
});

items.push({
xtype: "numberfield",
fieldLabel: "Parent ID",
name: "parentId",
readOnly: true,
value: this.data.parentId
});

items.push({
xtype: "numberfield",
fieldLabel: "CID",
Expand Down
3 changes: 2 additions & 1 deletion src/Resources/translations/admin.en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,5 @@ plugin_pm_logger_emailSummary: "E-Mail"
plugin_pm_to: "To"
plugin_pm_to_tooltip: "Seperate multiple email addresses with &quot;;&quot;"
plugin_pm_subject: "Subject"
plugin_pm_text: "Text"
plugin_pm_text: "Text"
plugin_pm_details: "Details"

0 comments on commit ceb1eb1

Please sign in to comment.