/
MapReduce.php
193 lines (176 loc) · 5.59 KB
/
MapReduce.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
<?php
/**
* CakePHP(tm) : Rapid Development Framework (http://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
* @link http://cakephp.org CakePHP(tm) Project
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Cake\Collection\Iterator;
use ArrayIterator;
use IteratorAggregate;
use LogicException;
use Traversable;
/**
* Implements a simplistic version of the popular Map-Reduce algorithm. Acts
* like an iterator for the original passed data after each result has been
* processed, thus offering a transparent wrapper for results coming from any
* source.
*/
class MapReduce implements IteratorAggregate
{
/**
* Holds the shuffled results that were emitted from the map
* phase
*
* @var array
*/
protected $_intermediate = [];
/**
* Holds the results as emitted during the reduce phase
*
* @var array
*/
protected $_result = [];
/**
* Whether the Map-Reduce routine has been executed already on the data
*
* @var bool
*/
protected $_executed = false;
/**
* Holds the original data that needs to be processed
*
* @var \Traversable|null
*/
protected $_data;
/**
* A callable that will be executed for each record in the original data
*
* @var callable
*/
protected $_mapper;
/**
* A callable that will be executed for each intermediate record emitted during
* the Map phase
*
* @var callable
*/
protected $_reducer;
/**
* Count of elements emitted during the Reduce phase
*
* @var int
*/
protected $_counter = 0;
/**
* Constructor
*
* ### Example:
*
* Separate all unique odd and even numbers in an array
*
* ```
* $data = new \ArrayObject([1, 2, 3, 4, 5, 3]);
* $mapper = function ($value, $key, $mr) {
* $type = ($value % 2 === 0) ? 'even' : 'odd';
* $mr->emitIntermediate($value, $type);
* };
*
* $reducer = function ($numbers, $type, $mr) {
* $mr->emit(array_unique($numbers), $type);
* };
* $results = new MapReduce($data, $mapper, $reducer);
* ```
*
* Previous example will generate the following result:
*
* ```
* ['odd' => [1, 3, 5], 'even' => [2, 4]]
* ```
*
* @param \Traversable $data the original data to be processed
* @param callable $mapper the mapper callback. This function will receive 3 arguments.
* The first one is the current value, second the current results key and third is
* this class instance so you can call the result emitters.
* @param callable|null $reducer the reducer callback. This function will receive 3 arguments.
* The first one is the list of values inside a bucket, second one is the name
* of the bucket that was created during the mapping phase and third one is an
* instance of this class.
*/
public function __construct(Traversable $data, callable $mapper, callable $reducer = null)
{
$this->_data = $data;
$this->_mapper = $mapper;
$this->_reducer = $reducer;
}
/**
* Returns an iterator with the end result of running the Map and Reduce
* phases on the original data
*
* @return \ArrayIterator
*/
public function getIterator()
{
if (!$this->_executed) {
$this->_execute();
}
return new ArrayIterator($this->_result);
}
/**
* Appends a new record to the bucket labelled with $key, usually as a result
* of mapping a single record from the original data.
*
* @param mixed $value The record itself to store in the bucket
* @param string $bucket the name of the bucket where to put the record
* @return void
*/
public function emitIntermediate($value, $bucket)
{
$this->_intermediate[$bucket][] = $value;
}
/**
* Appends a new record to the final list of results and optionally assign a key
* for this record.
*
* @param mixed $value The value to be appended to the final list of results
* @param string|null $key and optional key to assign to the value
* @return void
*/
public function emit($value, $key = null)
{
$this->_result[$key === null ? $this->_counter : $key] = $value;
$this->_counter++;
}
/**
* Runs the actual Map-Reduce algorithm. This is iterate the original data
* and call the mapper function for each , then for each intermediate
* bucket created during the Map phase call the reduce function.
*
* @return void
* @throws \LogicException if emitIntermediate was called but no reducer function
* was provided
*/
protected function _execute()
{
$mapper = $this->_mapper;
foreach ($this->_data as $key => $value) {
$mapper($value, $key, $this);
}
$this->_data = null;
if (!empty($this->_intermediate) && empty($this->_reducer)) {
throw new LogicException('No reducer function was provided');
}
$reducer = $this->_reducer;
foreach ($this->_intermediate as $key => $list) {
$reducer($list, $key, $this);
}
$this->_intermediate = [];
$this->_executed = true;
}
}