forked from Qbix/Platform
/
Mysql.php
2914 lines (2707 loc) · 92.2 KB
/
Mysql.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<?php
/**
* @module Db
*/
class Db_Mysql implements Db_Interface
{
/**
* This class lets you create and use PDO database connections.
* @class Db_Mysql
* @extends Db_Interface
* @constructor
*
* @param {string} $connectionName The name of the connection out of the connections added with Db::setConnection()
* This is required for actually connecting to the database.
* @param {PDO} [$pdo=null] Existing PDO connection. Only accepts connections to MySQL.
*/
function __construct ($connectionName, PDO $pdo = null)
{
$this->connectionName = $connectionName;
if ($pdo) {
// The following statement may throw an exception, which is fine.
$driver_name = $pdo->getAttribute(PDO::ATTR_DRIVER_NAME);
if (strtolower($driver_name) != 'mysql')
throw new Exception("the PDO object is not for mysql", -1);
$this->pdo = $pdo;
}
}
/**
* The PDO connection that this object uses
* @property $pdo
* @type PDO
*/
public $pdo;
/**
* The shard info after calling reallyConnect
* @property $shardInfo
* @type array
*/
public $shardInfo;
/**
* The name of the connection
* @property $connectionName
* @type string
* @protected
*/
protected $connectionName;
/**
* The name of the shard currently selected with reallyConnect, if any
* @property $shardName
* @type string
* @protected
*/
protected $shardName;
/**
* The database name of the shard currently selected with reallyConnect, if any
* @property $dbname
* @type string
*/
public $dbname;
/**
* The prefix of the shard currently selected with reallyConnect, if any
* @property $prefix
* @type string
*/
public $prefix;
/**
* The cutoff after which strlen gets too expensive to check automatically
* @property $maxCheckStrlen
* @type string
*/
public $maxCheckStrlen = 1000000;
/**
* Record whether we already set the timezone
* @property $setTimezoneDone
* @type string
* @protected
*/
protected static $setTimezoneDone;
/**
* Actually makes a connection to the database (by creating a PDO instance)
* @method reallyConnect
* @param {array} [$shardName=null] A shard name that was added using Db::setShard.
* This modifies how we connect to the database.
* @return {PDO} The PDO object for connection
*/
function reallyConnect($shardName = null, &$shardInfo = null)
{
if ($this->pdo) {
$shardInfo = $this->shardInfo;
return $this->pdo;
}
$connectionName = $this->connectionName;
$connectionInfo = Db::getConnection($connectionName);
if (empty($connectionInfo)) {
throw new Exception("database connection \"$connectionName\" wasn't registered with Db.", -1);
}
if (empty($shardName)) {
$shardName = '';
}
$modifications = Db::getShard($connectionName, $shardName);
if (!isset($modifications)) {
$modifications = array();
}
if (class_exists('Q')) {
/**
* Occurs before a real connection to the database is made
* @event Db/reallyConnect {before}
* @param {Db_Mysql} db
* @param {string} shardName
* @param {array} modifications
* @return {array}
* Extra modifications
*/
$more = Q::event('Db/reallyConnect', array(
'db' => $this,
'shardName' => $shardName,
'modifications' => $modifications
), 'before');
if ($more) {
$modifications = array_merge($modifications, $more);
}
}
$dsn = isset($modifications['dsn']) ? $modifications['dsn'] : $connectionInfo['dsn'];
$prefix = isset($modifications['prefix']) ? $modifications['prefix'] : $connectionInfo['prefix'];
$username = isset($modifications['username']) ? $modifications['username'] : $connectionInfo['username'];
$password = isset($modifications['password']) ? $modifications['password'] : $connectionInfo['password'];
$driver_options = isset($modifications['driver_options'])
? $modifications['driver_options']
: (isset($connectionInfo['driver_options']) ? $connectionInfo['driver_options'] : null);
$this->shardInfo = $shardInfo = compact('dsn', 'prefix', 'username', 'password', 'driver_options');
// More dsn changes
$dsn_fields = array();
foreach (array('host', 'port', 'dbname', 'unix_socket', 'charset') as $f) {
if (isset($modifications[$f])) {
$dsn_fields[$f] = $modifications[$f];
}
}
if ($dsn_fields) {
$dsn_array = array_merge(Db::parseDsnString($dsn), $dsn_fields);
$dsn = 'mysql:'.http_build_query($dsn_array, '', ';');
} else {
$dsn_array = Db::parseDsnString($dsn);
}
// The connection may have already been made with these parameters,
// in which case we will just retrieve the existing connection.
$this->pdo = Db::pdo($dsn, $username, $password, $driver_options, $connectionName, $shardName);
$this->pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$this->shardName = $shardName;
$this->dbname = $dsn_array['dbname'];
$this->prefix = $prefix;
if (class_exists('Q')) {
/**
* Occurs when a real connection to the database has been made
* @event Db/reallyConnect {after}
* @param {Db_Mysql} db
* @param {string} shardName
* @param {array} modifications
*/
Q::event('Db/reallyConnect', array(
'db' => $this,
'shardName' => $shardName,
'modifications' => $modifications
), 'after');
}
$this->pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
$this->pdo->setAttribute(PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, true);
if (empty(self::$setTimezoneDone[$dsn])) {
$this->setTimezone();
self::$setTimezoneDone[$dsn] = true;
}
return $this->pdo;
}
/**
* Sets the timezone in the database to match the one in PHP
* @param {integer} [$offset=timezone_offset_get()] in seconds
* @method setTimezone
*/
function setTimezone($offset = null)
{
if (!isset($offset)) {
$offset = (int)date('Z');
}
if (!$offset) {
$offset = 0;
}
$abs = abs($offset);
$hours = sprintf("%02d", floor($abs / 3600));
$minutes = sprintf("%02d", floor(($abs % 3600) / 60));
$sign = ($offset > 0) ? '+' : '-';
$this->rawQuery("SET time_zone = '$sign$hours:$minutes';")->execute();
}
/**
* Returns the name of the shard currently selected with reallyConnect, if any
* @method shardName
* @return {string}
*/
function shardName()
{
return $this->shardName;
}
/**
* Forwards all other calls to the PDO object
* @method __call
* @param {string} $name The function name
* @param {array} $arguments The arguments
* @return {mixed} The result of method call
*/
function __call ($name, array $arguments)
{
$this->reallyConnect();
if (!is_callable(array($this->pdo, $name))) {
throw new Exception("neither Db_Mysql nor PDO supports the $name function");
}
return call_user_func_array(array($this->pdo, $name), $arguments);
}
/**
* Returns the name of the connection with which this Db object was created.
* @method connectionName
* @return {string}
*/
function connectionName ()
{
return isset($this->connectionName) ? $this->connectionName : null;
}
/**
* Returns the connection info with which this Db object was created.
* @method connection
* @return {string}
*/
function connection()
{
if (isset($this->connectionName)) {
return Db::getConnection($this->connectionName);
}
return null;
}
/**
* Returns an associative array representing the dsn
* @method dsn
* @return {array}
*/
function dsn()
{
$connectionInfo = Db::getConnection($this->connectionName);
if (empty($connectionInfo['dsn'])) {
throw new Exception(
'No dsn string found for the connection '
. $this->connectionName
);
}
return Db::parseDsnString($connectionInfo['dsn']);
}
/**
* Returns the lowercase name of the dbms (e.g. "mysql")
* @method dbms
* @return {string}
*/
function dbms()
{
return 'mysql';
}
/**
* Returns the name of the database used
* @method dbName
* @return {string}
*/
function dbName()
{
$dsn = $this->dsn();
if (empty($dsn))
return null;
return $dsn['dbname'];
}
/**
* Creates a query to select fields from a table. Needs to be used with Db_Query::from().
* @method select
* @param {string|array} [$fields='*'] The fields as strings, or "*", or array of alias=>field
* @param {string|array} [$tables=''] The tables as strings, or array of alias=>table
* @return {Db_Query_Mysql} The resulting Db_Query object
*/
function select ($fields = '*', $tables = '')
{
if (empty($fields))
throw new Exception("fields not specified in call to 'select'.");
if (!isset($tables))
throw new Exception("tables not specified in call to 'select'.");
$query = new Db_Query_Mysql($this, Db_Query::TYPE_SELECT);
return $query->select($fields, $tables);
}
/**
* Creates a query to insert a row into a table
* @method insert
* @param {string} $table_into The name of the table to insert into
* @param {array} $fields=array() The fields as an array of column=>value pairs
* @return {Db_Query_Mysql} The resulting Db_Query_Mysql object
*/
function insert ($table_into, array $fields = array())
{
if (empty($table_into))
throw new Exception("table not specified in call to 'insert'.");
// $fields might be an empty array,
// but the insert will still be attempted.
$columnsList = array();
$valuesList = array();
foreach ($fields as $column => $value) {
$columnsList[] = Db_Query_Mysql::column($column);
if ($value instanceof Db_Expression) {
$valuesList[] = "$value";
} else {
$valuesList[] = ":$column";
}
}
$columnsString = implode(', ', $columnsList);
$valuesString = implode(', ', $valuesList);
$clauses = array(
'INTO' => "$table_into ($columnsString)", 'VALUES' => $valuesString
);
return new Db_Query_Mysql($this, Db_Query::TYPE_INSERT, $clauses, $fields, $table_into);
}
/**
* Inserts multiple rows into a single table, preparing the statement only once,
* and executes all the queries.
* @method insertManyAndExecute
* @param {string} $table_into The name of the table to insert into
* @param {array} [$rows=array()] The array of rows to insert.
* Each row should be an array of ($field => $value) pairs, with the exact
* same set of keys (field names) in each array. It can also be a Db_Row.
* @param {array} [$options=array()] An associative array of options, including:
* @param {array} [$options.columns] Pass an array of column names, otherwise
* they are automatically taken from the first row being inserted.
* @param {string} [$options.className]
* If you provide the class name, the system will be able to use any sharding
* indexes under that class name in the config.
* @param {integer} [$options.chunkSize]
* The number of rows to insert at a time. Defaults to 20.
* You can also put 0 here, which means unlimited chunks, but it's not recommended.
* @param {array} [$options.onDuplicateKeyUpdate]
* You can put an array of fieldname => value pairs here,
* which will add an ON DUPLICATE KEY UPDATE clause to the query.
*/
function insertManyAndExecute ($table_into, array $rows = array(), $options = array())
{
// Validate and get options
if (empty($table_into)) {
throw new Exception("table not specified in call to 'insertManyAndExecute'.");
}
if (empty($rows)) {
return false;
}
$chunkSize = isset($options['chunkSize']) ? $options['chunkSize'] : 20;
if ($chunkSize < 0) {
return false;
}
$onDuplicateKeyUpdate = isset($options['onDuplicateKeyUpdate'])
? $options['onDuplicateKeyUpdate'] : null;
$className = isset($options['className']) ? $options['className'] : null;
// Get the columns list
$rawColumns = array();
if (isset($options['columns'])) {
$columnsList = $options['columns'];
foreach ($columnsList as $c) {
$rawColumns[$c] = $c;
}
} else {
$row = reset($rows);
$record = ($row instanceof Db_Row) ? $row->fields : $row;
foreach ($record as $column => $value) {
$columnsList[] = $c = Db_Query_Mysql::column($column);
$rawColumns[$c] = $column;
}
}
$columnsString = implode(', ', $columnsList);
$into = "$table_into ($columnsString)";
// On duplicate key update clause (optional)
$update_fields = array();
$odku_clause = '';
if (isset($onDuplicateKeyUpdate)) {
$odku_clause = "\n\t ON DUPLICATE KEY UPDATE ";
$parts = array();
foreach ($onDuplicateKeyUpdate as $k => $v) {
if ($v instanceof Db_Expression) {
$part = "= $v";
} else {
$part = " = :__update_$k";
$update_fields["__update_$k"] = $v;
}
$parts[] .= Db_Query_Mysql::column($k) . $part;
}
$odku_clause .= implode(",\n\t", $parts);
}
// Start filling
$queries = array();
$queryCounts = array();
$bindings = array();
$last_q = array();
$last_queries = array();
foreach ($rows as $row) {
if ($row instanceof Db_Row) {
if (class_exists('Q') and class_exists($className)) {
Q::event("Db/Row/$className/save", array(
'row' => $row
), 'before');
}
$callback = array($row, "beforeSave");
if (is_callable($callback)) {
call_user_func(
$callback, $row->fields, false, false
);
}
$fieldNames = method_exists($row, 'fieldNames')
? $row->fieldNames()
: null;
$record = array();
if (is_array($fieldNames)) {
foreach ($fieldNames as $name) {
$record[$name] = $row->fields[$name];
}
} else {
foreach ($row->fields as $name => $value) {
$record[$name] = $value;
}
}
} else {
$record = $row;
}
$query = new Db_Query_Mysql($this, Db_Query::TYPE_INSERT);
// get shard, if any
$shard = '';
if (isset($className)) {
$query->className = $className;
$sharded = $query->shard(null, $record);
if (count($sharded) > 1 or $shard === '*') { // should be only one shard
throw new Exception("Db_Mysql::insertManyAndExecute row should be stored on exactly one shard: " . Q::json_encode($record));
}
$shard = key($sharded);
}
// start filling out the query data
$qc = empty($queryCounts[$shard]) ? 1 : $queryCounts[$shard] + 1;
if (!isset($bindings[$shard])) {
$bindings[$shard] = array();
}
$valuesList = array();
$index = 0;
foreach ($columnsList as $column) {
++$index;
$raw = $rawColumns[$column];
$value = isset($record[$raw]) ? $record[$raw] : null;
if ($value instanceof Db_Expression) {
$valuesList[] = "$value";
} else {
$valuesList[] = ':_'.$qc.'_'.$index;
$bindings[$shard]['_'.$qc.'_'.$index] = $value;
}
}
$valuesString = implode(', ', $valuesList);
if (empty($queryCounts[$shard])) {
$q = $queries[$shard] = "INSERT INTO $into\nVALUES ($valuesString) ";
$queryCounts[$shard] = 1;
} else {
$q = $queries[$shard] .= ",\n ($valuesString) ";
++$queryCounts[$shard];
}
// if chunk filled up for this shard, execute it
if ($qc === $chunkSize) {
if ($onDuplicateKeyUpdate) {
$q .= $odku_clause;
}
$query = $this->rawQuery($q)->bind($bindings[$shard]);
if ($onDuplicateKeyUpdate) {
$query = $query->bind($update_fields);
}
if (isset($last_q[$shard]) and $last_q[$shard] === $q) {
// re-use the prepared statement, save round-trips to the db
$query->reuseStatement($last_queries[$shard]);
}
$query->execute(true, $shard);
$last_q[$shard] = $q;
$last_queries[$shard] = $query; // save for re-use
$bindings[$shard] = $queries[$shard] = array();
$queryCounts[$shard] = 0;
}
}
// Now execute the remaining queries, if any
foreach ($queries as $shard => $q) {
if (!$q) continue;
if ($onDuplicateKeyUpdate) {
$q .= $odku_clause;
}
$query = $this->rawQuery($q)->bind($bindings[$shard]);
if ($onDuplicateKeyUpdate) {
$query = $query->bind($update_fields);
}
if (isset($last_q[$shard]) and $last_q[$shard] === $q) {
// re-use the prepared statement, save round-trips to the db
$query->reuseStatement($last_queries[$shard]);
}
$query->execute(true, $shard);
}
foreach ($rows as $row) {
if ($row instanceof Db_Row) {
$row->wasInserted(true);
$row->wasRetrieved(true);
}
}
}
/**
* Creates a query to update rows. Needs to be used with {@link Db_Query::set}
* @method update
* @param {string} $table The table to update
* @return {Db_Query_Mysql} The resulting Db_Query object
*/
function update ($table)
{
if (empty($table))
throw new Exception("table not specified in call to 'update'.");
$clauses = array('UPDATE' => "$table");
return new Db_Query_Mysql($this, Db_Query::TYPE_UPDATE, $clauses, array(), $table);
}
/**
* Creates a query to delete rows.
* @method delete
* @param {string} $table_from The table to delete from
* @param {string} [$table_using=null] If set, adds a USING clause with this table. You can then use ->join() with the resulting Db_Query.
* @return {Db_Query_Mysql}
*/
function delete ($table_from, $table_using = null)
{
if (empty($table_from))
throw new Exception("table not specified in call to 'delete'.");
if (isset($table_using) and !is_string($table_using)) {
throw new Exception("table_using field must be a string");
}
if (isset($table_using))
$clauses = array('FROM' => "$table_from USING $table_using");
else
$clauses = array('FROM' => "$table_from");
return new Db_Query_Mysql($this, Db_Query::TYPE_DELETE, $clauses, array(), $table_from);
}
/**
* Creates a query from raw SQL
* @method rawQuery
* @param {string|null} $sql May contain one or more SQL statements.
* Pass null here for an empty query that you can add other clauses to, e.g. ->commit().
* @param {array} [$bind=array()] An array of parameters to bind to the query, using
* the Db_Query_Mysql->bind method. They are used to replace foo=:foo and bar=?
* @return {Db_Query_Mysql}
*/
function rawQuery ($sql = null, $bind = array())
{
$clauses = array('RAW' => $sql);
$query = new Db_Query_Mysql($this, Db_Query::TYPE_RAW, $clauses);
if ($bind) {
$query->bind($bind);
}
return $query;
}
/**
* Creates a query to rollback a previously started transaction.
* @method update
* @param {array} $criteria The criteria to use, for sharding
* @return {Db_Query_Mysql} The resulting Db_Query object
*/
function rollback ($criteria = null)
{
$query = new Db_Query_Mysql($this, Db_Query::TYPE_ROLLBACK, array('ROLLBACK' => true));
$query->rollback($criteria);
return $query;
}
/**
* Sorts a table in chunks
* @method rank
* @param {string} $table The name of the table in the database
* @param {string} $pts_field The name of the field to rank by.
* @param {string} $rank_field The rank field to update in all the rows
* @param {integer} [$start=1] The value of the first rank
* @param {integer} [$chunk_size=1000] The number of rows to process at a time. Default is 1000.
* This is so the queries don't tie up the database server for very long,
* letting it service website requests and other things.
* @param {integer} [$rank_level2=0] Since the ranking is done in chunks, the function must know
* which rows have not been processed yet. If this field is empty (default)
* then the function sets the rank_field to 0 in all the rows, before
* starting the ranking process.
* (That might be a time consuming operation.)
* Otherwise, if $rank is a nonzero integer, then the function alternates
* between the ranges
* $start to $rank_level2, and $rank_level2 + $start to $rank_level2 * 2.
* That is, after it is finished, all the ratings will be in one of these
* two ranges.
* If not empty, this should be a very large number, like a billion.
* @param {array} [$order_by] The order clause to use when calculating ranks.
* Default is array($pts_field, false)
* @param {array} [$where=null] Any additional criteria to filter the table by.
* The ranking algorithm will do its work within the results that match this criteria.
* If your table is sharded, then all the work must be done within one shard.
*/
function rank(
$table,
$pts_field,
$rank_field,
$start = 1,
$chunk_size = 1000,
$rank_level2 = 0,
$order_by = null,
$where = array())
{
if (!isset($order_by)) {
$order_by = array($pts_field, false);
}
if (!isset($where)) {
$where = '1';
}
// Count all the rows
$query = $this->select('COUNT(1) _count', $table)->where($where);
$sharded = $query->shard();
$shard = key($sharded);
if (count($sharded) > 1 or $shard === '*') { // should be only one shard
throw new Exception("Db_Mysql::rank can work within at most one shard");
}
$row = $query->execute()->fetch(PDO::FETCH_ASSOC);
$count = $row['_count'];
if (empty($rank_level2)) {
$this->update($table)
->set(array($rank_field => 0))
->where($where)
->execute();
$rank_base = 0;
$condition = "$rank_field = 0 OR $rank_field IS NULL";
} else {
$rows = $this->select($pts_field, $table)
->where("$rank_field < $rank_level2")
->andWhere($where)
->limit(1)
->fetchAll();
if (!empty($rows)) {
// There are no ranks above $rank_level2. Create ranks on level 2.
$rank_base = $rank_level2;
$condition = "$rank_field < $rank_level2";
} else {
// The ranks are all above $rank_level2. Create ranks on level 1.
$rank_base = 0;
$condition = "$rank_field >= $rank_level2";
}
}
// Here comes the magic:
$offset = 0;
$rank_base += $start;
$this->rawQuery("set @rank = $offset - 1")->execute(false, $shard);
do {
$query = $this->update($table)->set(array(
$rank_field => new Db_Expression("$rank_base + (@rank := @rank + 1)")
))->where($condition);
if ($where) {
$query = $query->andWhere($where);
}
if ($order_by) {
$query = call_user_func_array(array($query, 'orderBy'), $order_by);
}
$query->limit($chunk_size)->execute();
$offset += $chunk_size;
} while ($count-$offset > 0);
}
/**
* Generate an ID that is unique in a table
* @method uniqueId
* @param {string} $table The name of the table
* @param {string} $field The name of the field to check for uniqueness.
* You should probably have an index starting with this field.
* @param {array} [$where=array()] You can indicate conditions here to limit the search for
* an existing value. The result is an id that is unique within a certain partition.
* @param {array} [$options=array()] Optional array used to override default options:
* @param {integer} [$options.length=8] The length of the ID to generate, after the prefix.
* @param {string} [$options.characters='abcdefghijklmnopqrstuvwxyz'] All the characters from which to construct the id
* @param {string} [$options.prefix=''] The prefix to prepend to the unique id.
* @param {callable} [$options.filter]
* The name of a function that will take the generated string and
* check it. The filter function can modify the string by returning another string,
* or simply reject the string by returning false, in which another string will be
*/
function uniqueId(
$table,
$field,
$where = array(),
$options = array())
{
$length = 8;
$characters = 'abcdefghijklmnopqrstuvwxyz';
$prefix = '';
extract($options);
$count = strlen($characters);
do {
$id = $prefix;
for ($i=0; $i<$length; ++$i) {
$id .= $characters[mt_rand(0, $count-1)];
}
if (!empty($options['filter'])) {
$ret = Q::call($options['filter'], array(@compact('id', 'table', 'field', 'where', 'options')));
if ($ret === false) {
continue;
} else if ($ret) {
$id = $ret;
}
}
$q = $this->select($field, $table)
->where(array($field => $id));
if ($where) {
$q->andWhere($where);
}
$rows = $q->limit(1)->fetchAll();
} while ($rows);
return $id;
}
/**
* Returns a timestamp from a Date string
* @method fromDate
* @param {string} $datetime The Date string that comes from the db
* @return {integer} The timestamp
*/
function fromDate ($date)
{
$year = (int)substr($date, 0, 4);
$month = (int)substr($date, 5, 2);
$day = (int)substr($date, 8, 2);
return mktime(0, 0, 0, $month, $day, $year);
}
/**
* Returns a timestamp from a DateTime string
* @method fromDateTime
* @param {string} $datetime The DateTime string that comes from the db
* @return {integer} The timestamp
*/
function fromDateTime ($datetime)
{
if (is_numeric($datetime)) {
return $datetime;
}
$year = (int)substr($datetime, 0, 4);
$month = (int)substr($datetime, 5, 2);
$day = (int)substr($datetime, 8, 2);
$hour = (int)substr($datetime, 11, 2);
$min = (int)substr($datetime, 14, 2);
$sec = (int)substr($datetime, 17, 2);
return mktime($hour, $min, $sec, $month, $day, $year);
}
/**
* Returns a Date string to store in the database
* @method toDate
* @param {string} $timestamp The UNIX timestamp, e.g. from a strtotime function
* @return {string}
*/
function toDate ($timestamp)
{
if (!is_numeric($timestamp)) {
$timestamp = strtotime($timestamp);
}
if ($timestamp > 10000000000) {
$timestamp = $timestamp / 1000;
}
return date('Y-m-d', $timestamp);
}
/**
* Returns a DateTime string to store in the database
* @method toDateTime
* @param {string} $timestamp The UNIX timestamp, e.g. from a strtotime function
* @return {string}
*/
function toDateTime ($timestamp)
{
if (!is_numeric($timestamp)) {
$timestamp = strtotime($timestamp);
}
if ($timestamp > 10000000000) {
$timestamp = $timestamp / 1000;
}
return date('Y-m-d H:i:s', $timestamp);
}
/**
* Returns the timestamp the db server would have, based on synchronization
* @method timestamp
* @return {integer}
*/
function getCurrentTimestamp()
{
static $dbtime = null, $phptime = null;
if (!isset($dbtime)) {
$phptime1 = time();
$row = $this->select('CURRENT_TIMESTAMP', '')
->execute()
->fetch(PDO::FETCH_NUM);
$dbtime = $this->fromDateTime($row[0]);
$phptime2 = time();
$phptime = round(($phptime1 + $phptime2) / 2);
}
return $dbtime + (time() - $phptime);
}
/**
* Takes a MySQL script and returns an array of queries.
* When DELIMITER is changed, respects that too.
* @method scriptToQueries
* @param {string} $script The text of the script
* @param {callable} [$callback=null] Optional callback to call for each query.
* @return {array} An array of the SQL queries.
*/
function scriptToQueries($script, $callback = null)
{
$this->reallyConnect();
$version_string = $this->pdo->getAttribute(PDO::ATTR_SERVER_VERSION);
$version_parts = explode('.', $version_string);
sprintf("%1d%02d%02d", $version_parts[0], $version_parts[1], $version_parts[2]);
$script_stripped = $script;
return $this->scriptToQueries_internal($script_stripped, $callback);
}
/**
* Takes stripped MySQL script and returns an array of queries.
* When DELIMITER is changed, respects that too.
* @method scriptToQueries_internal
* @protected
* @param {string} $script The text of the script
* @param {callable} [$callback=null] Optional callback to call for each query.
* @return {array} An array of the SQL queries.
*/
protected function scriptToQueries_internal($script, $callback = null)
{
$queries = array();
$script_len = strlen($script);
$this->reallyConnect();
$version_string = $this->pdo->getAttribute(PDO::ATTR_SERVER_VERSION);
$version_parts = explode('.', $version_string);
$version = sprintf("%1d%02d%02d", $version_parts[0], $version_parts[1], $version_parts[2]);
//$mode_n = 0; // normal
$mode_c = 1; // comments
$mode_sq = 2; // single quotes
$mode_dq = 3; // double quotes
$mode_bt = 4; // backticks
$mode_lc = 5; // line comment (hash or double-dash)
$mode_ds = 6; // delimiter statement
$cur_pos = 0;
$d = ';'; // delimiter
$d_len = strlen($d);
$query_start_pos = 0;
$del_start_pos_array = array();
$del_end_pos_array = array();
if (class_exists('Q_Config')) {
$separator = Q_Config::expect('Db', 'sql', 'querySeparator');
} else {
$separator = "-------- NEXT QUERY STARTS HERE --------";
}
$found = strpos($script, $separator);
if ($found !== false) {
// This script was specially crafted for quick parsing
$queries = explode($separator, $script);
foreach ($queries as $i => $query) {
if (!trim($query)) {
unset($queries[$i]);
}
}
return $queries;
}
while (1) {
$c_pos = strpos($script, "/*", $cur_pos);
$sq_pos = strpos($script, "'", $cur_pos);
$dq_pos = strpos($script, "\"", $cur_pos);
$bt_pos = strpos($script, "`", $cur_pos);
$c2_pos = strpos($script, "--", $cur_pos);
$c3_pos = strpos($script, "#", $cur_pos);
$ds_pos = stripos($script, "\nDELIMITER ", $cur_pos);
if ($cur_pos === 0 and substr($script, 0, 9) === 'DELIMITER') {
$ds_pos = 0;
}
$next_pos = false;
if ($c_pos !== false) {
$next_mode = $mode_c;
$next_pos = $c_pos;
$next_end_str = "*/";
$next_end_str_len = 2;
}
if ($sq_pos !== false and ($next_pos === false or $sq_pos < $next_pos)) {
$next_mode = $mode_sq;
$next_pos = $sq_pos;
$next_end_str = "'";
$next_end_str_len = 1;
}
if ($dq_pos !== false and ($next_pos === false or $dq_pos < $next_pos)) {
$next_mode = $mode_dq;
$next_pos = $dq_pos;
$next_end_str = "\"";
$next_end_str_len = 1;
}
if ($bt_pos !== false and ($next_pos === false or $bt_pos < $next_pos)) {
$next_mode = $mode_bt;
$next_pos = $bt_pos;
$next_end_str = "`";
$next_end_str_len = 1;
}
if ($c2_pos !== false and ($next_pos === false or $c2_pos < $next_pos)
and ($script[$c2_pos+2] == " " or $script[$c2_pos+2] == "\t")) {
$next_mode = $mode_lc;
$next_pos = $c2_pos;
$next_end_str = "\n";
$next_end_str_len = 1;
}
if ($c3_pos !== false and ($next_pos === false or $c3_pos < $next_pos)) {
$next_mode = $mode_lc;
$next_pos = $c3_pos;
$next_end_str = "\n";
$next_end_str_len = 1;
}
if ($ds_pos !== false and ($next_pos === false or $ds_pos < $next_pos)) {
$next_mode = $mode_ds;
$next_pos = $ds_pos;
$next_end_str = "\n";
$next_end_str_len = 1;
}
// If at this point, $next_pos === false, then
// we are in the final stretch.
// Until the end of the string, we have normal mode.
// Right now, we are in normal mode.
$d_pos = strpos($script, $d, $cur_pos);
while ($d_pos !== false and ($next_pos === false or $d_pos < $next_pos)) {
$query = substr($script, $query_start_pos, $d_pos - $query_start_pos);
// remove parts of the query string based on the "del_" arrays
$del_pos_count = count($del_start_pos_array);
if ($del_pos_count == 0) {
$query2 = $query;
} else {
$query2 = substr($query, 0, $del_start_pos_array[0] - $query_start_pos);
for ($i=1; $i < $del_pos_count; ++$i) {
$query2 .= substr($query, $del_end_pos_array[$i-1] - $query_start_pos,