6
6
use MongoDB \Driver \Manager ;
7
7
use MongoDB \Driver \ReadConcern ;
8
8
use MongoDB \Driver \ReadPreference ;
9
- use Tequila \MongoDB \Command \Options \AggregateOptions ;
9
+ use Symfony \Component \OptionsResolver \Options ;
10
+ use Tequila \MongoDB \Exception \InvalidArgumentException ;
11
+ use Tequila \MongoDB \Exception \UnexpectedResultException ;
12
+ use Tequila \MongoDB \Options \Driver \DriverOptions ;
13
+ use Tequila \MongoDB \Options \Driver \TypeMapOptions ;
14
+ use Tequila \MongoDB \Options \OptionsResolver ;
10
15
11
16
class Aggregate implements CommandInterface
12
17
{
@@ -30,6 +35,21 @@ class Aggregate implements CommandInterface
30
35
*/
31
36
private $ options ;
32
37
38
+ /**
39
+ * @var ReadPreference|null
40
+ */
41
+ private $ readPreference ;
42
+
43
+ /**
44
+ * @var array
45
+ */
46
+ private $ typeMap ;
47
+
48
+ /**
49
+ * @var bool
50
+ */
51
+ private $ useCursor ;
52
+
33
53
/**
34
54
* @param string $databaseName
35
55
* @param string $collectionName
@@ -41,38 +61,118 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
41
61
$ this ->databaseName = (string )$ databaseName ;
42
62
$ this ->collectionName = (string )$ collectionName ;
43
63
$ this ->pipeline = $ pipeline ;
44
- $ this ->options = AggregateOptions:: resolve ($ options );
64
+ $ this ->options = $ this -> resolve ($ options );
45
65
}
46
66
47
67
public function execute (Manager $ manager )
48
68
{
49
- if (isset ($ this ->options ['readConcern ' ])) {
50
- /** @var ReadConcern $readConcern */
51
- $ readConcern = $ this ->options ['readConcern ' ];
52
- if ($ this ->hasOutStage () && ReadConcern::MAJORITY === $ readConcern ->getLevel ()) {
53
- unset($ this ->options ['readConcern ' ]);
54
- } else {
55
- $ this ->options ['readConcern ' ] = ['level ' => $ readConcern ->getLevel ()];
56
- }
69
+ $ options = ['aggregate ' => $ this ->collectionName , 'pipeline ' => $ this ->pipeline ];
70
+ $ options += $ this ->options ;
71
+ $ command = new Command ($ options );
72
+
73
+ $ cursor = $ manager ->executeCommand ($ this ->databaseName , $ command , $ this ->readPreference );
74
+ if ($ this ->useCursor ) {
75
+ $ cursor ->setTypeMap ($ this ->typeMap );
76
+
77
+ return $ cursor ;
78
+ }
79
+
80
+ $ cursor ->setTypeMap (TypeMapOptions::getArrayTypeMap ());
81
+ $ resultDocument = current ($ cursor ->toArray ());
82
+ if (!isset ($ resultDocument ['result ' ]) || !is_array ($ resultDocument ['result ' ])) {
83
+ throw new UnexpectedResultException (
84
+ 'Command "aggregate" did not return expected "result" array '
85
+ );
57
86
}
58
87
59
- if ($ this ->hasOutStage ()) {
60
- $ readPreference = new ReadPreference (ReadPreference::RP_PRIMARY );
61
- } else {
62
- if (isset ($ this ->options ['readPreference ' ])) {
63
- $ readPreference = $ this ->options ['readPreference ' ];
88
+ return $ resultDocument ['result ' ];
89
+ }
90
+
91
+ /**
92
+ * @param array $options
93
+ * @return array
94
+ */
95
+ private function resolve (array $ options )
96
+ {
97
+ $ resolver = new OptionsResolver ();
98
+ $ this ->configureOptions ($ resolver );
99
+ $ options = $ resolver ->resolve ($ options );
100
+
101
+ $ this ->readPreference = isset ($ this ->options ['readPreference ' ]) ? $ this ->options ['readPreference ' ] : null ;
102
+ unset($ options ['readPreference ' ]);
103
+
104
+ $ this ->typeMap = $ options ['typeMap ' ];
105
+ unset($ options ['typeMap ' ]);
106
+
107
+ $ this ->useCursor = $ options ['useCursor ' ];
108
+ unset($ options ['useCursor ' ]);
109
+
110
+ if ($ this ->useCursor ) {
111
+ if (isset ($ options ['batchSize ' ])) {
112
+ $ options ['cursor ' ] = ['batchSize ' => $ options ['batchSize ' ]];
113
+ unset ($ options ['batchSize ' ]);
64
114
} else {
65
- $ readPreference = null ;
115
+ $ options [ ' cursor ' ] = new \ stdClass () ;
66
116
}
67
117
}
68
118
69
- unset($ this ->options ['readPreference ' ]);
119
+ return $ options ;
120
+ }
70
121
71
- $ options = ['aggregate ' => $ this ->collectionName , 'pipeline ' => $ this ->pipeline ];
72
- $ options += $ this ->options ;
73
- $ command = new Command ($ options );
122
+ private function configureOptions (OptionsResolver $ resolver )
123
+ {
124
+ DriverOptions::configureOptions ($ resolver );
125
+
126
+ $ resolver ->setDefined ([
127
+ 'allowDiskUse ' ,
128
+ 'batchSize ' ,
129
+ 'bypassDocumentValidation ' ,
130
+ 'maxTimeMS ' ,
131
+ 'readConcern ' ,
132
+ 'readPreference ' ,
133
+ 'useCursor ' ,
134
+ ]);
135
+
136
+ $ resolver
137
+ ->setAllowedTypes ('allowDiskUse ' , 'bool ' )
138
+ ->setAllowedTypes ('batchSize ' , 'integer ' )
139
+ ->setAllowedTypes ('bypassDocumentValidation ' , 'bool ' )
140
+ ->setAllowedTypes ('maxTimeMS ' , 'integer ' )
141
+ ->setAllowedTypes ('readConcern ' , ReadConcern::class)
142
+ ->setAllowedTypes ('readPreference ' , ReadPreference::class)
143
+ ->setAllowedTypes ('useCursor ' , 'bool ' );
144
+
145
+ $ resolver ->setDefault ('useCursor ' , true );
74
146
75
- return $ manager ->executeCommand ($ this ->databaseName , $ command , $ readPreference );
147
+ $ resolver ->setNormalizer ('batchSize ' , function (Options $ options , $ batchSize ) {
148
+ if (!isset ($ options ['useCursor ' ]) || false === $ options ['useCursor ' ]) {
149
+ throw new InvalidArgumentException (
150
+ 'Option "batchSize" is meaningless unless option "useCursor" is set to true '
151
+ );
152
+ }
153
+
154
+ return $ batchSize ;
155
+ });
156
+
157
+ $ resolver ->setNormalizer ('readConcern ' , function (Options $ options , ReadConcern $ readConcern ) {
158
+ if (null === $ readConcern ->getLevel () ) {
159
+ return null ;
160
+ }
161
+
162
+ if ($ this ->hasOutStage () && ReadConcern::MAJORITY === $ readConcern ->getLevel ()) {
163
+ return null ;
164
+ }
165
+
166
+ return ['level ' => $ readConcern ->getLevel ()];
167
+ });
168
+
169
+ $ resolver ->setNormalizer ('readPreference ' , function (Options $ options , ReadPreference $ readPreference ) {
170
+ if ($ this ->hasOutStage ()) {
171
+ return new ReadPreference (ReadPreference::RP_PRIMARY );
172
+ }
173
+
174
+ return $ readPreference ;
175
+ });
76
176
}
77
177
78
178
private function hasOutStage ()
@@ -81,4 +181,5 @@ private function hasOutStage()
81
181
82
182
return '$out ' === key ($ lastStage );
83
183
}
184
+
84
185
}
0 commit comments