Skip to content

Commit

Permalink
Fix handling of missing source rows in migrate:import (#4778)
Browse files Browse the repository at this point in the history
Co-authored-by: Vojislav Jovanovic <vaish@vjovanovic.com>
  • Loading branch information
claudiu-cristea and vaish committed Jun 19, 2021
1 parent e436955 commit c722ca9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/Drupal/Commands/core/MigrateRunnerCommands.php
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected function prepareTableRow(array $row, ?bool $namesOnly): array
* @option timestamp Show progress ending timestamp in progress messages
* @option total Show total processed item number in progress messages
* @option progress Show progress bar
* @option delete Delete destination records missed from the source
* @option delete Delete destination records missed from the source. Not compatible with --limit and --idlist options, and high_water_property source configuration key.
*
* @usage migrate:import --all
* Perform all migrations
Expand Down
69 changes: 26 additions & 43 deletions src/Drupal/Migrate/MigrateExecutable.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class MigrateExecutable extends MigrateExecutableBase
*/
protected $idlist;

/**
* List of all source IDs that are found in source during this migration.
*
* @var array
*/
protected $allSourceIdValues = [];

/**
* Count of number of items processed so far in this migration.
*
Expand Down Expand Up @@ -133,15 +140,6 @@ class MigrateExecutable extends MigrateExecutableBase
*/
protected $progressBar;

/**
* Used to bypass the prepare row feedback.
*
* @var bool
*
* @see \Drush\Drupal\Migrate\MigrateExecutable::handleMissingSourceRows()
*/
protected $showPrepareRowFeedback = true;

/**
* Constructs a new migrate executable instance.
*
Expand Down Expand Up @@ -176,7 +174,13 @@ public function __construct(MigrationInterface $migration, MigrateMessageInterfa
$this->feedback = $options['feedback'];
$this->showTimestamp = $options['timestamp'];
$this->showTotal = $options['total'];
$this->deleteMissingSourceRows = $options['delete'];
// Deleting the missing source rows is not compatible with options that
// limit number of source rows that will be processed. It should be
// ignored when:
// - `--idlist` option is used,
// - `--limit` option is used,
// - The migration source plugin has high_water_property set.
$this->deleteMissingSourceRows = $options['delete'] && !($this->limit || !empty($this->idlist) || !empty($migration->getSourceConfiguration()['high_water_property']));
// Cannot use the progress bar when:
// - `--no-progress` option is used,
// - `--feedback` option is used,
Expand Down Expand Up @@ -241,7 +245,6 @@ public function onMapDelete(MigrateMapDeleteEvent $event): void
public function onPreImport(MigrateImportEvent $event): void
{
$migration = $event->getMigration();
$this->handleMissingSourceRows($migration);
$this->initProgressBar($migration);
}

Expand All @@ -267,38 +270,14 @@ public function onPreImport(MigrateImportEvent $event): void
*/
protected function handleMissingSourceRows(MigrationInterface $migration): void
{
// Clone so that any generators aren't initialized prematurely.
$source = clone $migration->getSourcePlugin();

$idMap = $migration->getIdMap();
if (empty($this->idlist)) {
$idMap->prepareUpdate();
} else {
$keys = array_keys($migration->getSourcePlugin()->getIds());
foreach ($this->idlist as $sourceIdValues) {
$idMap->setUpdate(array_combine($keys, $sourceIdValues));
}
}

// As $source->next() is preparing the row, don't want to show the
// feedback in such circumstances.
// @see \Drush\Drupal\Migrate\MigrateExecutable::onPrepareRow()
$this->showPrepareRowFeedback = false;
$source->rewind();
$sourceIdValues = [];
while ($source->valid()) {
$sourceIdValues[] = $source->current()->getSourceIdValues();
$source->next();
}
$this->showPrepareRowFeedback = true;

$idMap->rewind();

// Collect the destination IDs no more present in source.
$destinationIds = [];
while ($idMap->valid()) {
$mapSourceId = $idMap->currentSource();
if (!in_array($mapSourceId, $sourceIdValues)) {
if (!in_array($mapSourceId, $this->allSourceIdValues)) {
$destinationIds[] = $idMap->currentDestination();
}
$idMap->next();
Expand Down Expand Up @@ -334,7 +313,11 @@ public function onMissingSourceRows(MigrateMissingSourceRowsEvent $event): void
);
// Filter the map on destination IDs.
$this->idMap = new MigrateIdMapFilter(parent::getIdMap(), [], $event->getDestinationIds());

$status = $this->migration->getStatus();
$this->migration->setStatus(MigrationInterface::STATUS_IDLE);
$this->rollback();
$this->migration->setStatus($status);
// Reset the ID map filter.
$this->idMap = null;
}
Expand All @@ -352,6 +335,7 @@ public function onPostImport(MigrateImportEvent $event): void
$migrateLastImportedStore->set($event->getMigration()->id(), round(microtime(true) * 1000));
$this->progressFinish();
$this->importFeedbackMessage();
$this->handleMissingSourceRows($event->getMigration());
$this->unregisterListeners();
}

Expand Down Expand Up @@ -516,9 +500,10 @@ public function onPostRowDelete(MigrateRowDeleteEvent $event): void
*/
public function onPrepareRow(MigratePrepareRowEvent $event): void
{
$row = $event->getRow();
$sourceId = $row->getSourceIdValues();

if (!empty($this->idlist)) {
$row = $event->getRow();
$sourceId = $row->getSourceIdValues();
$skip = true;
foreach ($this->idlist as $id) {
if (array_values($sourceId) == $id) {
Expand All @@ -531,11 +516,9 @@ public function onPrepareRow(MigratePrepareRowEvent $event): void
}
}

// If we're preparing the row in context of ::handleMissingSourceRows(),
// we don't need any counters update.
if (!$this->showPrepareRowFeedback) {
return;
}
// Collect all Source ID values so that we can handle missing source
// rows post import.
$this->allSourceIdValues[] = $sourceId;

if ($this->feedback && $this->counter && $this->counter % $this->feedback === 0) {
$this->importFeedbackMessage(false);
Expand Down
5 changes: 3 additions & 2 deletions tests/functional/MigrateRunnerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public function testMissingSourceRows(): void
{
$this->drush('state:set', ['woot.test_migration_source_data_amount', 5]);
$this->drush('migrate:import', ['test_migration']);
$this->assertStringContainsString('[notice] Processed 5 items (5 created, 0 updated, 0 failed, 0 ignored)', $this->getErrorOutput());
$this->drush('sql:query', ['SELECT title FROM node_field_data']);
$this->assertSame(['Item 1', 'Item 2', 'Item 3', 'Item 4', 'Item 5'], $this->getOutputAsList());

Expand All @@ -220,9 +221,9 @@ public function testMissingSourceRows(): void

$this->assertStringContainsString('[notice] 2 items are missing from source and will be rolled back', $this->getErrorOutput());
$this->assertStringContainsString("[notice] Rolled back 2 items - done with 'test_migration'", $this->getErrorOutput());
$this->assertStringContainsString('[notice] Processed 3 items (0 created, 3 updated, 0 failed, 0 ignored)', $this->getErrorOutput());
$this->assertStringContainsString('[notice] Processed 0 items (0 created, 0 updated, 0 failed, 0 ignored)', $this->getErrorOutput());
$this->drush('sql:query', ['SELECT title FROM node_field_data']);
$this->assertEquals(['Item 1', 'Item 3', 'Item 5'], $this->getOutputAsList());
$this->assertSame(['Item 1', 'Item 3', 'Item 5'], $this->getOutputAsList());
}

/**
Expand Down

0 comments on commit c722ca9

Please sign in to comment.