Skip to content

Commit

Permalink
Experimenting with result formatters and MapReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzo committed Aug 15, 2013
1 parent 4b00a01 commit 8e27bae
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 2 deletions.
71 changes: 71 additions & 0 deletions lib/Cake/ORM/MapReduce.php
@@ -0,0 +1,71 @@
<?php
/**
* PHP Version 5.4
*
* CakePHP(tm) : Rapid Development Framework (http://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
* @link http://cakephp.org CakePHP(tm) Project
* @since CakePHP(tm) v 3.0.0
* @license MIT License (http://www.opensource.org/licenses/mit-license.php)
*/
namespace Cake\ORM;

use \IteratorAggregate;
use \ArrayIterator;

class MapReduce implements IteratorAggregate {

protected $_intermediate = [];

protected $_result = [];

protected $_executed = false;

protected $_data;

protected $_mapper;

protected $_reducer;

protected $_counter = 0;

public function __construct($data, callable $mapper, callable $reducer) {
$this->_data = $data;
$this->_mapper = $mapper;
$this->_reducer = $reducer;
}

public function getIterator() {
if (!$this->_executed) {
$this->_execute();
}
return new ArrayIterator($this->_result);
}

public function emitIntermediate($key, $value) {
$this->_intermediate[$key][] = $value;
}

public function emit($value, $slot = null) {
$this->_result[$slot === null ? $this->_counter : $slot] = $value;
}

protected function _execute() {
foreach ($this->_data as $key => $value) {
$this->_mapper->__invoke($key, $value, $this);
}

foreach ($this->_intermediate as $key => $list) {
$this->_reducer->__invoke($key, $list, $this);
$this->_counter++;
}
$this->_execute = true;
}

}
33 changes: 31 additions & 2 deletions lib/Cake/ORM/Query.php
Expand Up @@ -67,6 +67,10 @@ class Query extends DatabaseQuery {
*/
protected $_loadEagerly = [];

protected $_mapper;

protected $_reducer;

/**
* List of options accepted by associations in contain()
* index by key for faster access
Expand Down Expand Up @@ -100,6 +104,8 @@ class Query extends DatabaseQuery {
*/
protected $_useBufferedResults = false;

protected $_formatters = [];

/**
* @param Cake\Database\Connection $connection
* @param Cake\ORM\Table $table
Expand Down Expand Up @@ -349,6 +355,11 @@ public function bufferResults() {
return $this;
}

public function formatResults($current = null, $key = null) {
$this->_formatters[] = compact('current', 'key');
return $this;
}

/**
* Set the result set for a query.
*
Expand Down Expand Up @@ -382,9 +393,9 @@ public function execute() {
return $this->_results;
}
if ($this->_useBufferedResults) {
return new BufferedResultSet($this, parent::execute());
return $this->_applyFormatters(new BufferedResultSet($this, parent::execute()));
}
return new ResultSet($this, parent::execute());
return $this->_applyFormatters(new ResultSet($this, parent::execute()));
}

/**
Expand Down Expand Up @@ -518,6 +529,24 @@ public function applyOptions(array $options) {
return $this;
}

public function mapReduce(callable $mapper, callable $reducer) {
$this->_mapper = $mapper;
$this->_reducer = $reducer;
return $this;
}

protected function _applyFormatters($result) {
foreach ($this->_formatters as $formatter) {
$result = new ResultSetDecorator($result, $formatter);
}
if (!empty($this->_mapper) && !empty($this->_reducer)) {
$result = new ResultSetDecorator(
new MapReduce($result, $this->_mapper, $this->_reducer)
);
}
return $result;
}

/**
* Auxiliary function used to wrap the original statement from the driver with
* any registered callbacks. This will also setup the correct statement class
Expand Down
83 changes: 83 additions & 0 deletions lib/Cake/ORM/ResultSetDecorator.php
@@ -0,0 +1,83 @@
<?php
/**
* PHP Version 5.4
*
* CakePHP(tm) : Rapid Development Framework (http://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
* @link http://cakephp.org CakePHP(tm) Project
* @since CakePHP(tm) v 3.0.0
* @license MIT License (http://www.opensource.org/licenses/mit-license.php)
*/
namespace Cake\ORM;

use \IteratorIterator;
use \Iterator;

class ResultSetDecorator implements Iterator {

protected $_keyCallback;

protected $_currentCallback;

protected $_results;

protected $_current;

public function __construct(\Traversable $results, $callbacks = []) {
if (!empty($callbacks['key'])) {
$this->_keyCallback = $callbacks['key'];
}
if (!empty($callbacks['current'])) {
$this->_currentCallback = $callbacks['current'];
}
$this->_results = $results;

if ($results instanceOf \IteratorAggregate) {
$this->_results = $results->getIterator();
}
}

public function key() {
$key = $this->_results->key();
if ($this->_keyCallback) {
$current = $this->_results->current();
$key = $this->_keyCallback->__invoke($current, $key);
}
return $key;
}

public function current() {
if ($this->_current) {
return $this->_current;
}
$current = $this->_results->current();
if ($current && $this->_currentCallback) {
$current = $this->_currentCallback->__invoke($current);
}
return $current;
}

public function next() {
$this->_current = null;
$this->_results->next();
}

public function rewind() {
$this->_current = null;
$this->_results->rewind();
}

public function valid() {
return $this->_results->valid();
}

public function toArray() {
return iterator_to_array($this);
}
}
26 changes: 26 additions & 0 deletions lib/Cake/ORM/Table.php
Expand Up @@ -524,6 +524,32 @@ public function findAll(Query $query, array $options = []) {
return $query;
}

public function findList(Query $query, array $options = []) {
$mapper = function($key, $row, $mr) use ($query, &$columns) {
if (empty($columns)) {
$columns = array_slice(array_keys($row), 0, 3);
}

$key = isset($columns[2]) ? $row[$columns[2]] : $key;
list($rowKey, $rowVal) = $columns;
$mr->emitIntermediate($key, [$row[$rowKey] => $row[$rowVal]]);
};

$reducer = function($key, $values, $mr) use (&$columns) {
if (!isset($columns[2])) {
$mr->emit(current(current($values)), key(current($values)));
return;
}
$result = [];
foreach ($values as $value) {
$result += $value;
}
$mr->emit($result, $key);
};

return $query->mapReduce($mapper, $reducer);
}

/**
* Creates a new Query instance for this table
*
Expand Down
33 changes: 33 additions & 0 deletions lib/Cake/Test/TestCase/ORM/TableTest.php
Expand Up @@ -506,4 +506,37 @@ public function testFindApplyOptions() {
->with($options);
$table->find('all', $options);
}

/**
* Tests find('list')
*
* @return void
*/
public function testFindList() {
$table = new Table(['table' => 'users', 'connection' => $this->connection]);
$query = $table->find('list', ['fields' => ['id', 'username']])->order('id');
$expected = [
1 => 'mariano',
2 => 'nate',
3 => 'larry',
4 => 'garrett'
];
$this->assertSame($expected, $query->toArray());

$query = $table->find('list')
->select(['id', 'username', 'odd' => 'id % 2 = 0'])
->order('id');
$expected = [
0 => [
1 => 'mariano',
3 => 'larry'
],
1 => [
2 => 'nate',
4 => 'garret'
]
];
$this->assertSame($expected, $query->toArray());
}

}

0 comments on commit 8e27bae

Please sign in to comment.