Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post Import Resource Processor #3799

Merged
merged 7 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/common/tests/src/Kernel/ConfigFormTestBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ abstract class ConfigFormTestBase extends KernelTestBase {
* );
* @endcode
*
* @return array[]
* @return array[]
* Form test data.
*/
abstract public function provideFormData(): array;
Expand Down Expand Up @@ -69,7 +69,7 @@ public function testConfigForm(array $form_values) {
$this->assertTrue($valid_form, new FormattableMarkup('Input values: %values<br/>Validation handler errors: %errors', $args));

foreach ($form_values as $data) {
$this->assertEquals($this->config($data['#config_name'])->get($data['#config_key']), $data['#value']);
$this->assertEquals($data['#value'], $this->config($data['#config_name'])->get($data['#config_key']));
}
}

Expand Down
14 changes: 14 additions & 0 deletions modules/datastore/datastore.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ services:
- '@dkan.common.drupal_files'
- '@dkan.common.job_store'

dkan.datastore.service.resource_processor_collector:
class: \Drupal\datastore\Service\ResourceProcessorCollector
tags:
- { name: service_collector, tag: resource_processor, call: addResourceProcessor }

dkan.datastore.service.resource_processor.dictionary_enforcer:
class: \Drupal\datastore\Service\ResourceProcessor\DictionaryEnforcer
arguments:
- '@dkan.datastore.data_dictionary.alter_table_query_factory.mysql'
- '@dkan.metastore.service'
- '@dkan.metastore.data_dictionary_discovery'
tags:
- { name: resource_processor, priority: 25 }

dkan.datastore.service.resource_purger:
class: \Drupal\datastore\Service\ResourcePurger
arguments:
Expand Down
205 changes: 0 additions & 205 deletions modules/datastore/src/Plugin/QueueWorker/DictionaryEnforcer.php

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
<?php

namespace Drupal\datastore\Plugin\QueueWorker;

use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;

use Drupal\common\Resource;
use Drupal\datastore\DataDictionary\AlterTableQueryFactoryInterface;
use Drupal\datastore\Service\ResourceProcessorCollector;
use Drupal\metastore\ResourceMapper;

use Symfony\Component\DependencyInjection\ContainerInterface;

/**
* Apply specified data-dictionary to datastore belonging to specified dataset.
*
* @QueueWorker(
* id = "post_import",
* title = @Translation("Pass along new resources to resource processors"),
* cron = {
* "time" = 180,
* "lease_time" = 10800
* }
* )
*/
class PostImportResourceProcessor extends QueueWorkerBase implements ContainerFactoryPluginInterface {

/**
* A logger channel for this plugin.
*
* @var \Drupal\Core\Logger\LoggerChannelInterface
*/
protected LoggerChannelInterface $logger;

/**
* The metastore resource mapper service.
*
* @var \Drupal\metastore\ResourceMapper
*/
protected ResourceMapper $resourceMapper;

/**
* The resource processor collector service.
*
* @var \Drupal\datastore\Service\ResourceProcessorCollector
*/
protected ResourceProcessorCollector $resourceProcessorCollector;

/**
* Build queue worker.
*
* @param array $configuration
* A configuration array containing information about the plugin instance.
* @param string $plugin_id
* The plugin_id for the plugin instance.
* @param mixed $plugin_definition
* The plugin implementation definition.
* @param \Drupal\datastore\DataDictionary\AlterTableQueryFactoryInterface $alter_table_query_factory
* The alter table query factory service.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* A logger channel factory instance.
* @param \Drupal\metastore\ResourceMapper $resource_mapper
* The metastore resource mapper service.
* @param \Drupal\datastore\Service\ResourceProcessorCollector $processor_collector
* The resource processor collector service.
*/
public function __construct(
array $configuration,
$plugin_id,
$plugin_definition,
AlterTableQueryFactoryInterface $alter_table_query_factory,
LoggerChannelFactoryInterface $logger_factory,
ResourceMapper $resource_mapper,
ResourceProcessorCollector $processor_collector
) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->logger = $logger_factory->get('datastore');
$this->resourceMapper = $resource_mapper;
$this->resourceProcessorCollector = $processor_collector;
// Set the timeout for database connections to the queue lease time.
// This ensures that database connections will remain open for the
// duration of the time the queue is being processed.
$timeout = (int) $plugin_definition['cron']['lease_time'];
$alter_table_query_factory->setConnectionTimeout($timeout);
}

/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('dkan.datastore.data_dictionary.alter_table_query_factory.mysql'),
$container->get('logger.factory'),
$container->get('dkan.metastore.resource_mapper'),
$container->get('dkan.datastore.service.resource_processor_collector'),
);
}

/**
* {@inheritdoc}
*/
public function processItem($data) {
// Catch and log any exceptions thrown when processing the queue item to
// prevent the item from being requeued.
try {
$this->doProcessItem($data);
}
catch (\Exception $e) {
$this->logger->error($e->getMessage());
}
}

/**
* Pass along new resource to resource processors.
*
* @param \Drupal\common\Resource $resource
* DKAN Resource.
*/
public function doProcessItem(Resource $resource): void {
$identifier = $resource->getIdentifier();
$version = $resource->getVersion();

$latest_resource = $this->resourceMapper->get($identifier);
// Stop if resource no longer exists.
if (!isset($latest_resource)) {
$this->logger->notice('Cancelling resource processing; resource no longer exists.');
return;
}
// Stop if resource has changed.
if ($version !== $latest_resource->getVersion()) {
$this->logger->notice('Cancelling resource processing; resource has changed.');
return;
}
// Run all tagged resource processors.
$processors = $this->resourceProcessorCollector->getResourceProcessors();
array_map(fn ($processor) => $processor->process($resource), $processors);
}

}
Loading