Skip to content

Commit

Permalink
Added Reindex and deprecated CrossIndex (ruflin#1311)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giovanni Albero committed May 23, 2017
1 parent d9c59b5 commit 497f298
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -16,13 +16,14 @@ All notable changes to this project will be documented in this file based on the
- Add support for Health parameters for Cluster\Health endpoint (new prop : delayed_unassigned_shards, number_of_pending_tasks, number_of_in_flight_fetch, task_max_waiting_in_queue_millis, active_shards_percent_as_number)
- Add support for querystring in Type. this allow to use `update_all_types` in type mapping in order to resolve conflicts between fields in different types. [Conflicts between fields in different types](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#merging-conflicts)
- Added `\Elastica\Query\ParentId` to avoid join with parent documents [#1287](https://github.com/ruflin/Elastica/issues/1287)
- Added `\Elastica\Reindex` for reindexing between indices [#1311](https://github.com/ruflin/Elastica/issues/1311)

### Improvements

- Added support for `other_bucket` and `other_bucket_key` paramters on `Elastica\Aggregation\Filters`

### Deprecated

- Deprecated `Tool\CrossIndex` use `\Elastica\Reindex` instead [#1311](https://github.com/ruflin/Elastica/issues/1311)

## [Unreleased](https://github.com/ruflin/Elastica/compare/5.1.0...5.2.0)

Expand Down
139 changes: 139 additions & 0 deletions lib/Elastica/Reindex.php
@@ -0,0 +1,139 @@
<?php
namespace Elastica;

use Elastica\Query\AbstractQuery;

class Reindex
{
const VERSION_TYPE = 'version_type';
const VERSION_TYPE_INTERNAL = 'internal';
const VERSION_TYPE_EXTERNAL = 'external';
const OPERATION_TYPE = 'op_type';
const OPERATION_TYPE_CREATE = 'create';
const CONFLICTS = 'conflicts';
const CONFLICTS_PROCEED = 'proceed';
const TYPE = 'type';
const SIZE = 'size';
const QUERY = 'query';

/**
* @var Index
*/
protected $_oldIndex;

/**
* @var Index
*/
protected $_newIndex;

/**
* @var array
*/
private $_options;

public function __construct(Index $oldIndex, Index $newIndex, array $options = [])
{
$this->_oldIndex = $oldIndex;
$this->_newIndex = $newIndex;
$this->_options = $options;
}

public function run()
{
$body = $this->getBody($this->_oldIndex, $this->_newIndex, $this->_options);

$reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
$reindexEndpoint->setBody($body);

$this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint);
$this->_newIndex->refresh();

return $this->_newIndex;
}

protected function getBody($oldIndex, $newIndex, $options)
{
$body = array_merge([
'source' => $this->getSourcePartBody($oldIndex, $options),
'dest' => $this->getDestPartBody($newIndex, $options)
], self::resolveBodyOptions($options));

return $body;
}

private function getSourcePartBody(Index $index, array $options)
{
$sourceBody = array_merge([
'index' => $index->getName(),
], $this->resolveSourceOptions($options));

$sourceBody = $this->setSourceQuery($sourceBody);
$sourceBody = $this->setSourceType($sourceBody);

return $sourceBody;
}

private function getDestPartBody(Index $index, array $options)
{
return array_merge([
'index' => $index->getName(),
], $this->resolveDestOptions($options));
}

private static function resolveSourceOptions(array $options)
{
return array_intersect_key($options, [
self::TYPE => null,
self::QUERY => null,
]);
}

private function resolveDestOptions(array $options)
{
return array_intersect_key($options, [
self::VERSION_TYPE => null,
self::OPERATION_TYPE => null,
]);
}

private function resolveBodyOptions(array $options)
{
return array_intersect_key($options, [
self::SIZE => null,
self::CONFLICTS => null,
]);
}

/**
* @param array $sourceBody
*
* @return array
*/
private function setSourceQuery(array $sourceBody)
{
if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) {
$sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray();
}
return $sourceBody;
}

/**
* @param array $sourceBody
*
* @return array
*/
private function setSourceType(array $sourceBody)
{
if (isset($sourceBody[self::TYPE]) && !is_array($sourceBody[self::TYPE])) {
$sourceBody[self::TYPE] = [$sourceBody[self::TYPE]];
}
if (isset($sourceBody[self::TYPE])) {
foreach ($sourceBody[self::TYPE] as $key => $type) {
if ($type instanceof Type) {
$sourceBody[self::TYPE][$key] = $type->getName();
}
}
}
return $sourceBody;
}
}
2 changes: 2 additions & 0 deletions lib/Elastica/Tool/CrossIndex.php
Expand Up @@ -12,6 +12,8 @@
* Functions to move documents and types between indices.
*
* @author Manuel Andreo Garcia <andreo.garcia@gmail.com>
*
* @deprecated use Reindex instead. This class will be removed in further Elastica releases.
*/
class CrossIndex
{
Expand Down
181 changes: 181 additions & 0 deletions test/Elastica/ReindexTest.php
@@ -0,0 +1,181 @@
<?php
namespace Elastica\Test;

use Elastica\Document;
use Elastica\Index;
use Elastica\Query\Match;
use Elastica\Reindex;
use Elastica\Type;

class ReindexTest extends Base
{
/**
* Test default reindex.
*
* @group functional
*/
public function testReindex()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$this->_addDocs($oldIndex->getType('resetTest'), 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$reindex = new Reindex($oldIndex, $newIndex);
$this->assertInstanceOf(
Index::class,
$newIndex
);
$newIndex = $reindex->run();

$this->assertEquals(10, $newIndex->count());

$oldResult = [];

foreach ($oldIndex->search()->getResults() as $result) {
$oldResult[] = $result->getData();
}

$newResult = [];

foreach ($newIndex->search()->getResults() as $result) {
$newResult[] = $result->getData();
}

$this->assertEquals($oldResult, $newResult);
}

/**
* Test reindex type option.
*
* @group functional
*/
public function testReindexTypeOption()
{
$oldIndex = $this->_createIndex('', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$type2 = $oldIndex->getType('crossIndexTest_2');

$docs1 = $this->_addDocs($type1, 10);
$docs2 = $this->_addDocs($type2, 10);

$newIndex = $this->_createIndex(null, true, 2);

$reindex = new Reindex($oldIndex, $newIndex, [
Reindex::TYPE => 'crossIndexTest_1',
]);
$reindex->run();

$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs1);

// string
$reindex = new Reindex($oldIndex, $newIndex, [
Reindex::TYPE => 'crossIndexTest_2',
]);
$reindex->run();
$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs2);

// array
$reindex = new Reindex($oldIndex, $newIndex, [
Reindex::TYPE => [
$type1,
'crossIndexTest_2',
],
]);
$reindex->run();
$this->assertEquals(20, $newIndex->count());
}

/**
* @group functional
*/
public function testReindexOpTypeOptionWithProceedSetOnConflicts()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');

$docs1 = $this->_addDocs($type1, 10);

$subDocs1 = array_splice($docs1, 0, 5);

$newIndex = $this->_createIndex('idx2', true, 2);
$newIndex->addDocuments($subDocs1);
$newIndex->refresh();

$this->assertEquals(5, $newIndex->count());

$reindex = new Reindex($oldIndex, $newIndex, [
Reindex::OPERATION_TYPE => Reindex::OPERATION_TYPE_CREATE,
Reindex::CONFLICTS => Reindex::CONFLICTS_PROCEED,
]);

$reindex->run();

$this->assertEquals(10, $newIndex->count());
}

/**
* @group functional
*/
public function testReindexWithQueryOption()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$docs1 = $this->_addDocs($type1, 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$query = new Match('id', 8);

$reindex = new Reindex($oldIndex, $newIndex, [
Reindex::QUERY => $query,
]);
$reindex->run();

$results = $newIndex->search()->getResults();
$this->assertEquals(1, $newIndex->count());
foreach ($results as $result) {
$this->assertEquals($docs1[7]->getData(), $result->getData());
}
}

/**
* @group functional
*/
public function testReindexWithSizeOption()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$this->_addDocs($type1, 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$reindex = new Reindex($oldIndex, $newIndex, [
Reindex::SIZE => 5,
]);
$reindex->run();

$this->assertEquals(5, $newIndex->count());
}

/**
* @param Type $type
* @param int $docs
*
* @return array
*/
private function _addDocs(Type $type, $docs)
{
$insert = [];
for ($i = 1; $i <= $docs; ++$i) {
$insert[] = new Document($i, ['id' => $i, 'key' => 'value']);
}

$type->addDocuments($insert);
$type->getIndex()->refresh();

return $insert;
}
}

0 comments on commit 497f298

Please sign in to comment.