Skip to content

Commit

Permalink
Rename things named Import (#3961)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-m committed May 10, 2023
1 parent 66cd8e3 commit 1fd9d1a
Show file tree
Hide file tree
Showing 16 changed files with 685 additions and 503 deletions.
2 changes: 1 addition & 1 deletion modules/datastore/datastore.services.yml
Expand Up @@ -53,7 +53,7 @@ services:
- [setLoggerFactory, ['@logger.factory']]

dkan.datastore.service.factory.import:
class: \Drupal\datastore\Service\Factory\Import
class: \Drupal\datastore\Service\Factory\ImportServiceFactory
arguments:
- '@dkan.common.job_store'
- '@dkan.datastore.database_table_factory'
Expand Down
Expand Up @@ -4,7 +4,7 @@

use Drupal\common\Storage\JobStoreFactory;
use Drupal\datastore\Service\Factory\ImportFactoryInterface;
use Drupal\datastore\Service\Import as Instance;
use Drupal\datastore\Service\ImportService;
use Drupal\datastore\Storage\DatabaseTableFactory;
use Drupal\datastore_mysql_import\Service\MysqlImport;

Expand Down Expand Up @@ -58,7 +58,7 @@ public function getInstance(string $identifier, array $config = []) {
$resource = $config['resource'];

if (!isset($this->services[$identifier])) {
$this->services[$identifier] = new Instance($resource, $this->jobStoreFactory, $this->databaseTableFactory);
$this->services[$identifier] = new ImportService($resource, $this->jobStoreFactory, $this->databaseTableFactory);
}

$this->services[$identifier]->setImporterClass(MysqlImport::class);
Expand Down
Expand Up @@ -11,7 +11,7 @@
use Drupal\common\Storage\JobStore;
use Drupal\common\Storage\JobStoreFactory;
use Drupal\Component\EventDispatcher\ContainerAwareEventDispatcher;
use Drupal\datastore\Service\Import as Service;
use Drupal\datastore\Service\ImportService;
use Drupal\datastore\Storage\DatabaseTableFactory;
use Drupal\datastore\Storage\DatabaseTable;
use Drupal\datastore_mysql_import\Service\MysqlImport;
Expand Down Expand Up @@ -86,7 +86,7 @@ public function testMysqlImporter() {
$databaseTableFactory = $this->getDatabaseTableFactoryMock();
$jobStoreFactory = $this->getJobstoreFactoryMock();

$service = new Service($resource, $jobStoreFactory, $databaseTableFactory);
$service = new ImportService($resource, $jobStoreFactory, $databaseTableFactory);
$service->setImporterClass(MysqlImport::class);
$service->import();

Expand Down
191 changes: 5 additions & 186 deletions modules/datastore/src/Plugin/QueueWorker/Import.php
Expand Up @@ -4,66 +4,17 @@

use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;

use Drupal\common\LoggerTrait;
use Drupal\common\Storage\DatabaseConnectionFactoryInterface;
use Drupal\datastore\DatastoreService;
use Drupal\metastore\Reference\ReferenceLookup;

use Procrastinator\Result;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
* Processes resource import.
*
* @QueueWorker(
* id = "datastore_import",
* title = @Translation("Queue to process datastore import"),
* cron = {
* "time" = 180,
* "lease_time" = 10800
* }
* )
* @deprecated
* @see \Drupal\datastore\Plugin\QueueWorker\ImportQueueWorker
*/
class Import extends QueueWorkerBase implements ContainerFactoryPluginInterface {
use LoggerTrait;

/**
* This queue worker's corresponding database queue instance.
*
* @var \Drupal\Core\Queue\DatabaseQueue
*/
protected $databaseQueue;

/**
* DKAN datastore service instance.
*
* @var \Drupal\datastore\DatastoreService
*/
protected $datastore;

/**
* Reference lookup service.
*
* @var \Drupal\metastore\Reference\ReferenceLookup
*/
protected $referenceLookup;

/**
* Datastore config settings.
*
* @var \Drupal\Core\Config\Config
*/
protected $datastoreConfig;

/**
* File system service.
*
* @var \Drupal\Core\File\FileSystemInterface
*/
protected $fileSystem;
class Import extends ImportQueueWorker {

/**
* Constructs a \Drupal\Component\Plugin\PluginBase object.
Expand Down Expand Up @@ -98,140 +49,8 @@ public function __construct(
DatabaseConnectionFactoryInterface $defaultConnectionFactory,
DatabaseConnectionFactoryInterface $datastoreConnectionFactory
) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->datastore = $datastore;
$this->referenceLookup = $referenceLookup;
$this->datastoreConfig = $configFactory->get('datastore.settings');
$this->databaseQueue = $datastore->getQueueFactory()->get($plugin_id);
$this->fileSystem = $datastore->getResourceLocalizer()->getFileSystem();
$this->setLoggerFactory($loggerFactory, 'datastore');
// Set the timeout for database connections to the queue lease time.
// This ensures that database connections will remain open for the
// duration of the time the queue is being processed.
$timeout = (int) $plugin_definition['cron']['lease_time'];
$defaultConnectionFactory->setConnectionTimeout($timeout);
$datastoreConnectionFactory->setConnectionTimeout($timeout);
}

/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('config.factory'),
$container->get('dkan.datastore.service'),
$container->get('logger.factory'),
$container->get('dkan.metastore.reference_lookup'),
$container->get('dkan.common.database_connection_factory'),
$container->get('dkan.datastore.database_connection_factory')
);
}

/**
* {@inheritdoc}
*/
public function processItem($data) {
if (is_object($data) && isset($data->data)) {
$data = $data->data;
}

try {
$this->importData($data);
}
catch (\Exception $e) {
$this->error("Import for {$data['identifier']} returned an error: {$e->getMessage()}");
}
}

/**
* Perform the actual data import.
*
* @param array $data
* Resource identifier information.
*/
protected function importData(array $data) {
$identifier = $data['identifier'];
$version = $data['version'];
$results = $this->datastore->import($identifier, FALSE, $version);

$queued = FALSE;
foreach ($results as $label => $result) {
$queued = isset($result) ? $this->processResult($result, $data, $queued, $label) : FALSE;
}

// Delete local resource file if enabled in datastore settings config.
if ($this->datastoreConfig->get('delete_local_resource')) {
$this->fileSystem->deleteRecursive("public://resources/{$identifier}_{$version}");
}
}

/**
* Process the result of the import operation.
*
* @param \Procrastinator\Result $result
* The result object.
* @param mixed $data
* The resource data for import.
* @param bool $queued
* Whether the import job is currently queued.
* @param string $label
* A label to distinguish types of jobs in status messages.
*
* @return bool
* The updated value for $queued.
*/
protected function processResult(Result $result, $data, bool $queued = FALSE, string $label = 'Import') {
$uid = "{$data['identifier']}__{$data['version']}";
$status = $result->getStatus();
switch ($status) {
case Result::STOPPED:
if (!$queued) {
$newQueueItemId = $this->requeue($data);
$this->notice("$label for {$uid} is requeueing. (ID:{$newQueueItemId}).");
$queued = TRUE;
}
break;

case Result::IN_PROGRESS:
case Result::ERROR:
$this->error("$label for {$uid} returned an error: {$result->getError()}");
break;

case Result::DONE:
$this->notice("$label for {$uid} completed.");
$this->invalidateCacheTags("{$uid}__source");
break;
}

return $queued;
}

/**
* Invalidate all appropriate cache tags for this resource.
*
* @param mixed $resourceId
* A resource ID.
*/
protected function invalidateCacheTags($resourceId) {
$this->referenceLookup->invalidateReferencerCacheTags('distribution', $resourceId, 'downloadURL');
}

/**
* Requeues the job with extra state information.
*
* @param array $data
* Queue data.
*
* @return mixed
* Queue ID or false if unsuccessful.
*
* @todo Clarify return value. Documentation suggests it should return ID.
*/
protected function requeue(array $data) {
return $this->databaseQueue->createItem($data);
parent::__construct($configuration, $plugin_id, $plugin_definition, $configFactory, $datastore, $loggerFactory, $referenceLookup, $defaultConnectionFactory, $datastoreConnectionFactory);
@trigger_error(__NAMESPACE__ . '\Import is deprecated. Use \Drupal\datastore\Plugin\QueueWorker\ImportQueueWorker instead.', E_USER_DEPRECATED);
}

}

0 comments on commit 1fd9d1a

Please sign in to comment.