-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
MapReduce.php
182 lines (166 loc) · 4.72 KB
/
MapReduce.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
<?php
/**
* PHP Version 5.4
*
* CakePHP(tm) : Rapid Development Framework (http://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (http://cakefoundation.org)
* @link http://cakephp.org CakePHP(tm) Project
* @since CakePHP(tm) v 3.0.0
* @license MIT License (http://www.opensource.org/licenses/mit-license.php)
*/
namespace Cake\ORM;
use \IteratorAggregate;
use \ArrayIterator;
/**
* Implements a simplistic version of the popular Map-Reduce algorithm. Acts
* like an iterator for the original passed data after each result has been
* processed, thus offering a transparent wrapper for results coming from any
* source.
*/
class MapReduce implements IteratorAggregate {
/**
* Holds the shuffled results that were emitted from the map
* phase
*
* @var array
*/
protected $_intermediate = [];
/**
* Holds the results as emitted during the reduce phase
*
* @var array
*/
protected $_result = [];
/**
* Whether the Map-Reduce routine has been executed already on the data
*
* @var boolean
*/
protected $_executed = false;
/**
* Holds the original data that needs to be processed
*
* @var \Traversable
*/
protected $_data;
/**
* A callable that will be executed for each record in the original data
*
* @var callable
*/
protected $_mapper;
/**
* A callable that will be executed for each intermediate record emitted during
* the Map phase
*
* @var callable
*/
protected $_reducer;
/**
* Count of elements emitted during the Reduce phase
*
* @var string
*/
protected $_counter = 0;
/**
* Constructor
*
* ## Example:
*
* Separate all unique odd and even numbers in an array
*
* {{{
* $data = new \ArrayObject([1, 2, 3, 4, 5, 3]);
* $mapper = function ($key, $value, $mr) {
* $type = ($value % 2 === 0) ? 'even' : 'odd';
* $mr->emitIntermediate($type, $value);
* };
*
* $reducer = function ($type, $numbers, $mr) {
* $mr->emit(array_unique($numbers), $type);
* };
* $results = new MapReduce($data, compact('mapper', 'reducer'));
* }}}
*
* Previous example will generate the following result:
*
* {{{
* ['odd' => [1, 2, 5], 'even' => [2, 4]]
* }}}
*
* @param \Traversable $data the original data to be processed
* @param callable $mapper the mapper callback. This function will receive 3 arguments.
* The first one is the current results key, second the current value and third is
* this class instance so you can call the result emitters.
* @param callable $reducer the reducer callback. This function will receive 3 arguments.
* The first one is a bucket name that was mapped before, second one is the list
* of values inside the bucket and third one is an instance of this class.
* @return void
*/
public function __construct(\Traversable $data, callable $mapper, callable $reducer = null) {
$this->_data = $data;
$this->_mapper = $mapper;
$this->_reducer = $reducer;
}
/**
* Returns an iterator with the end result of running the Map and Reduce
* phases on the original data
*
* @return \ArrayIterator
*/
public function getIterator() {
if (!$this->_executed) {
$this->_execute();
}
return new ArrayIterator($this->_result);
}
/**
* Appends a new record to the bucket labelled with $key, usually as a result
* of mapping a single record from the original data.
*
* @param string $bucket the name of the bucket where to put the record
* @param mixed $value the record itself to store in the bucket
* @return void
*/
public function emitIntermediate($bucket, $value) {
$this->_intermediate[$bucket][] = $value;
}
/**
* Appends a new record to the final list of results and optionally assign a key
* for this record.
*
* @param mixed $value The value to be appended to the final list of results
* @param string $key and optional key to assign to the value
* @return void
*/
public function emit($value, $key = null) {
$this->_result[$key === null ? $this->_counter : $key] = $value;
$this->_counter++;
}
/**
* Runs the actual Map-Reduce algorithm. This is iterate the original data
* and call the mapper function for each , then for each intermediate
* bucket created during the Map phase call the reduce function.
*
* @return void
*/
protected function _execute() {
$mapper = $this->_mapper;
foreach ($this->_data as $key => $value) {
$mapper($key, $value, $this);
}
$this->_data = null;
$reducer = $this->_reducer;
foreach ($this->_intermediate as $key => $list) {
$reducer($key, $list, $this);
}
$this->_intermediate = [];
$this->_executed = true;
}
}