Skip to content

Commit

Permalink
Merge pull request #10 from esmero/ISSUE-9
Browse files Browse the repository at this point in the history
ISSUE-9: Implement immediate processing via Batch
  • Loading branch information
DiegoPino committed Feb 17, 2021
2 parents 2a006a4 + 23581d3 commit ee93a15
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 213 deletions.
147 changes: 147 additions & 0 deletions src/AmiBatchQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<?php


namespace Drupal\ami;

use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Render\Markup;

/**
* Batch Class to process a AMI Sets
*
* Class AmiBatchQueue
*
* @package Drupal\ami
*/
class AmiBatchQueue {

/**
* Batch processes on Queue item at the time for an AMI Set.
* @param string $queue_name
* @param string $set_id
* @param array $context
*
* @throws \Drupal\Component\Plugin\Exception\PluginException
*/
public static function takeOne(string $queue_name, string $set_id, array &$context) {
/** @var $queue_manager \Drupal\Core\Queue\QueueWorkerManagerInterface */
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
/** @var \Drupal\Core\Queue\QueueFactory $queue_factory */
$queue_factory = \Drupal::service('queue');

$queue_factory->get($queue_name)->createQueue();
// The actual queue worker is the one from the general AMI ingest Queue
// That way this "per set" queue does not appear in any queue_ui listings
// not can be processed out of the context of a UI facing batch.
$queue_worker = $queue_manager->createInstance('ami_ingest_ado');
$queue = $queue_factory->get($queue_name);

$num_of_items = $queue->numberOfItems();
if (!array_key_exists('max', $context['sandbox'])
|| $context['sandbox']['max'] < $num_of_items
) {
$context['sandbox']['max'] = $num_of_items;
}

$context['finished'] = 0;
$context['results']['queue_name'] = $queue_name;
$context['results']['queue_label'] = 'AMI Set '. ($set_id ?? '');



try {
// Only process Items of this Set if $context['set_id'] is set.
if ($item = $queue->claimItem()) {
$ado_title = isset($item->data->info['row']['uuid']) ? 'ADO with UUID '.$item->data->info['row']['uuid'] : 'Unidentifed ADO without UUID';
$title = t('For %name processing %adotitle, <b>%count</b> items remaining', [
'%name' => $context['results']['queue_label'],
'%adotitle' => $ado_title,
'%count' => $num_of_items,
]);
$context['message'] = $title;

// Process and delete item
$queue_worker->processItem($item->data);
$queue->deleteItem($item);

$num_of_items = $queue->numberOfItems();

// Update context
$context['results']['processed'][] = $item->item_id;
$context['finished'] = ($context['sandbox']['max'] - $num_of_items) / $context['sandbox']['max'];
}
else {
// Done processing if can not claim.
$context['finished'] = 1;
}
} catch (RequeueException $e) {
if (isset($item)) {
$queue->releaseItem($item);
}
} catch (SuspendQueueException $e) {
if (isset($item)) {
$queue->releaseItem($item);
}

watchdog_exception('ami', $e);
$context['results']['errors'][] = $e->getMessage();

// Marking the batch job as finished will stop further processing.
$context['finished'] = 1;
} catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the item
// in the queue to be processed again later.
watchdog_exception('ami', $e);
$context['results']['errors'][] = $e->getMessage();
}
}

/**
* Callback when finishing a batch job.
*
* @param $success
* @param $results
* @param $operations
*/
public static function finish($success, $results, $operations) {
// Display success of no results.
if (!empty($results['processed'])) {
\Drupal::messenger()->addMessage(
\Drupal::translation()->formatPlural(
count($results['processed']),
'%queue: One item successfully processed.',
'%queue: @count items successfully processed.',
['%queue' => $results['queue_label']]
)
);
}
elseif (!isset($results['processed'])) {
\Drupal::messenger()->addMessage(\Drupal::translation()
->translate("Items were not processed. Try to release existing items or add new items to the queues."),
'warning'
);
}

if (!empty($results['errors'])) {
\Drupal::messenger()->addError(
\Drupal::translation()->formatPlural(
count($results['errors']),
'Queue %queue error: @errors',
'Queue %queue errors: <ul><li>@errors</li></ul>',
[
'%queue' => $results['queue_label'],
'@errors' => Markup::create(implode('</li><li>', $results['errors'])),
]
)
);
}
// Cleanup and remove the queue. This is a live batch operation.
/** @var \Drupal\Core\Queue\QueueFactory $queue_factory */
$queue_name = $results['queue_name'];
$queue_factory = \Drupal::service('queue');
$queue_factory->get($queue_name)->deleteQueue();
}

}

4 changes: 0 additions & 4 deletions src/AmiUtilityService.php
Original file line number Diff line number Diff line change
Expand Up @@ -1246,15 +1246,11 @@ public function processWebform($data, array $row) {

}


public function ingestAdo($data, array $row) {

}





public function updateAdo($data) {

}
Expand Down
68 changes: 53 additions & 15 deletions src/Form/amiSetEntityProcessForm.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ public function submitForm(array &$form, FormStateInterface $form_state) {

$info = $this->AmiUtilityService->preprocessAmiSet($file, $data);
$SetURL = $this->entity->toUrl('canonical', ['absolute' => TRUE])->toString();
$notprocessnow = $form_state->getValue('not_process_now', NULL);
$queue_name = 'ami_ingest_ado';
if (!$notprocessnow) {
// This queues have no queue workers. That is intended since they
// are always processed by the ami_ingest_ado one manually.
$queue_name = 'ami_ingest_ado_set_'.$this->entity->id();
\Drupal::queue($queue_name, TRUE)->createQueue();
// @TODO acquire a Lock that is renewed for each queue item processing
// To avoid same batch to be send to processing by different users at
// the same time.
}
$added = [];
foreach($info as $item) {
// We set current User here since we want to be sure the final owner of
// the object is this and not the user that runs the queue
Expand All @@ -95,16 +107,26 @@ public function submitForm(array &$form, FormStateInterface $form_state) {
'set_url' => $SetURL,
'attempt' => 1
];
\Drupal::queue('ami_ingest_ado')
$added[] = \Drupal::queue($queue_name)
->createItem($data);
}
$this->messenger()->addMessage(
$this->t('Set @label enqueued and processed .',
[
'@label' => $this->entity->label(),
]
)
);

if ($notprocessnow) {
$this->messenger()->addMessage(
$this->t('Set @label enqueued and processed .',
[
'@label' => $this->entity->label(),
]
)
);
$form_state->setRedirectUrl($this->getCancelUrl());
} else {
$count = count(array_filter($added));
// TODO check if count($info) == $count
if ($count) {
$this->submitBatch($form_state, $queue_name, $count);
}
}
} else {
$this->messenger()->addError(
$this->t('So Sorry. This Ami Set has incorrect Metadata and/or has its CSV file missing. Please correct or delete and generate a new one.',
Expand All @@ -113,29 +135,45 @@ public function submitForm(array &$form, FormStateInterface $form_state) {
]
)
);
$form_state->setRebuild();
}

$form_state->setRedirectUrl($this->getCancelUrl());
}


/**
* {@inheritdoc}
*/
public function buildForm(array $form, FormStateInterface $form_state) {
$processnow = $form_state->getValue('process_now', NULL);
$form['process_now'] = [
$notprocessnow = $form_state->getValue('not_process_now', NULL);
$form['not_process_now'] = [
'#type' => 'checkbox',
'#title' => $this->t('Enqueue but do not process Batch'),
'#title' => $this->t('Enqueue but do not process Batch in realtime.'),
'#description' => $this->t(
'Check this to only enqueue but not trigger an interactive Batch processing. Cron or any other mechanism you have enabled will do the actual operation'
'Check this to enqueue but not trigger the interactive Batch processing. Cron or any other mechanism you have enabled will do the actual operation. This queue is shared by all AMI Sets in this repository and will be processed on a First-In First-Out basis.'
),
'#required' => FALSE,
'#default_value' => !empty($processnow) ? $processnow : TRUE,
'#default_value' => !empty($notprocessnow) ? $notprocessnow : FALSE,
];

return $form + parent::buildForm($form, $form_state);
}

/*
* Process queue(s) with batch.
*
* @param \Drupal\Core\Form\FormStateInterface $form_state
* @param $queue
*/
public function submitBatch(FormStateInterface $form_state, $queue_name) {
$batch = [
'title' => $this->t('Batch processing your Set'),
'operations' => [],
'finished' => ['\Drupal\ami\AmiBatchQueue', 'finish'],
'progress_message' => t('Processing Set @current of @total.'),
];
$batch['operations'][] = ['\Drupal\ami\AmiBatchQueue::takeOne', [$queue_name, $this->entity->id()]];
batch_set($batch);
}

}

Loading

0 comments on commit ee93a15

Please sign in to comment.