/
Fsck.java
1221 lines (1134 loc) · 50.6 KB
/
Fsck.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// This file is part of OpenTSDB.
// Copyright (C) 2014 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.tools;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.hbase.async.Bytes;
import org.hbase.async.Bytes.ByteMap;
import org.hbase.async.DeleteRequest;
import org.hbase.async.KeyValue;
import org.hbase.async.PutRequest;
import org.hbase.async.Scanner;
import com.stumbleupon.async.Deferred;
import net.opentsdb.core.Const;
import net.opentsdb.core.IllegalDataException;
import net.opentsdb.core.Internal;
import net.opentsdb.core.Internal.Cell;
import net.opentsdb.core.Query;
import net.opentsdb.core.RowKey;
import net.opentsdb.core.TSDB;
import net.opentsdb.core.Tags;
import net.opentsdb.meta.Annotation;
import net.opentsdb.uid.NoSuchUniqueId;
import net.opentsdb.uid.UniqueId;
import net.opentsdb.utils.Config;
/**
* Tool to look for and fix corrupted data in a TSDB. FSCK can be used to
* recover space, resolve duplicate data points, remove orphaned time series and
* remove data errors. If one or more command line queries are provided, only
* rows matching the query will be FSCK'd. Alternatively a full table scan can
* be performed.
* <p>
* Scanning is done in three stages:
* 1) Each row key is parsed to make sure it's a valid OpenTSDB row. If it isn't
* then the user can decide to delete it. If one or more UIDs cannot be resolved
* to names (metric or tags) then the user can decide to purge it.
* 2) All key value pairs in a row are parsed to determine the type of object.
* If it's a single data point, it's added to a tree map based on the data point
* timestamp. If it's a compacted column, the data points are exploded and
* added to the data point map. If it's some other object it may be purged if
* told to, or if it's a known type (e.g. annotations) simply ignored.
* 3) If any data points were found, we iterate over each one looking for
* duplicates, malformed encodings or potential value-length-encoding savings.
* At the end, if told to, FSCK will fix up the values and optionally write a
* new compacted cell, deleting all of the old values.
* <p>
* A number of metrics are tracked during the run and a report will be dumped
* to the log at the end.
* <p>
* When iterating over the datapoints in step 3, the workers will usually compile
* a set of compacted qualifiers and values so that at the end, if necessary, a
* new compacted cell can be written and the old cells purged.
* <p>
* Note: some fields are package private so that we can easily unit test.
*/
final class Fsck {
private static final Logger LOG = LoggerFactory.getLogger(Fsck.class);
/** The TSDB to use for access */
private final TSDB tsdb;
/** Options to use while iterating over rows */
private final FsckOptions options;
/** Counters incremented during processing. They have to be atomic counters
* as we may be running multiple fsck threads. */
final AtomicLong kvs_processed = new AtomicLong();
final AtomicLong rows_processed = new AtomicLong();
final AtomicLong valid_datapoints = new AtomicLong();
final AtomicLong annotations = new AtomicLong();
final AtomicLong bad_key = new AtomicLong();
final AtomicLong bad_key_fixed = new AtomicLong();
final AtomicLong duplicates = new AtomicLong();
final AtomicLong duplicates_fixed = new AtomicLong();
final AtomicLong duplicates_fixed_comp = new AtomicLong();
final AtomicLong orphans = new AtomicLong();
final AtomicLong orphans_fixed = new AtomicLong();
final AtomicLong future = new AtomicLong();
final AtomicLong unknown = new AtomicLong();
final AtomicLong unknown_fixed = new AtomicLong();
final AtomicLong bad_values = new AtomicLong();
final AtomicLong bad_values_deleted = new AtomicLong();
final AtomicLong value_encoding = new AtomicLong();
final AtomicLong value_encoding_fixed = new AtomicLong();
final AtomicLong fixable_compacted_columns = new AtomicLong();
final AtomicLong bad_compacted_columns = new AtomicLong();
final AtomicLong bad_compacted_columns_deleted = new AtomicLong();
final AtomicLong vle = new AtomicLong();
final AtomicLong vle_bytes = new AtomicLong();
final AtomicLong vle_fixed = new AtomicLong();
/** Length of the metric + timestamp for key validation */
private static int key_prefix_length = TSDB.metrics_width() +
Const.TIMESTAMP_BYTES;
/** Length of a tagk + tagv pair for key validation */
private static int key_tags_length = TSDB.tagk_width() + TSDB.tagv_width();
/** How often to report progress */
private static long report_rows = 10000;
/**
* Default Ctor
* @param tsdb The TSDB to use for access
* @param options Options to use when iterating over rows
*/
public Fsck(final TSDB tsdb, final FsckOptions options) {
this.tsdb = tsdb;
this.options = options;
}
/**
* Fetches the max metric ID and splits the data table up amongst threads on
* a naive split. By default we execute cores * 2 threads but the user can
* specify more or fewer.
* @throws Exception If something goes pear shaped.
*/
public void runFullTable() throws Exception {
LOG.info("Starting full table scan");
final long start_time = System.currentTimeMillis() / 1000;
final long max_id = CliUtils.getMaxMetricID(tsdb);
final int workers = options.threads() > 0 ? options.threads() :
Runtime.getRuntime().availableProcessors() * 2;
final double quotient = (double)max_id / (double)workers;
LOG.info("Max metric ID is [" + max_id + "]");
LOG.info("Spooling up [" + workers + "] worker threads");
long index = 1;
final Thread[] threads = new Thread[workers];
for (int i = 0; i < workers; i++) {
threads[i] = new FsckWorker(index, quotient, i);
threads[i].setName("Fsck #" + i);
threads[i].start();
index += quotient;
if (index < max_id) {
index++;
}
}
final Thread reporter = new ProgressReporter();
reporter.start();
for (int i = 0; i < workers; i++) {
threads[i].join();
LOG.info("Thread [" + i + "] Finished");
}
reporter.interrupt();
logResults();
final long duration = (System.currentTimeMillis() / 1000) - start_time;
LOG.info("Completed fsck in [" + duration + "] seconds");
}
/**
* Scans the rows matching one or more standard queries. An aggregator is still
* required though it's ignored.
* @param queries The queries to execute
* @throws Exception If something goes pear shaped.
*/
public void runQueries(final List<Query> queries) throws Exception {
final long start_time = System.currentTimeMillis() / 1000;
// TODO - threadify it. We *could* have hundreds of queries and we don't
// want to create that many threads. For now we'll just execute each one
// serially
final Thread reporter = new ProgressReporter();
reporter.start();
for (final Query query : queries) {
final FsckWorker worker = new FsckWorker(query, 0);
worker.run();
}
reporter.interrupt();
logResults();
final long duration = (System.currentTimeMillis() / 1000) - start_time;
LOG.info("Completed fsck in [" + duration + "] seconds");
}
/** @return The total number of errors detected during the run */
long totalErrors() {
return bad_key.get() + duplicates.get() + orphans.get() + unknown.get() +
bad_values.get() + bad_compacted_columns.get() +
fixable_compacted_columns.get() + value_encoding.get();
}
/** @return The total number of errors fixed during the run */
long totalFixed() {
return bad_key_fixed.get() + duplicates_fixed.get() + orphans_fixed.get() +
unknown_fixed.get() + value_encoding_fixed.get() +
bad_values_deleted.get();
}
/** @return The total number of errors that could be (or may have been) fixed */
long correctable() {
return bad_key.get() + duplicates.get() + orphans.get() + unknown.get() +
bad_values.get() + bad_compacted_columns.get() +
fixable_compacted_columns.get() + value_encoding.get();
}
/**
* A worker thread that takes a query or a chunk of the main data table and
* performs the actual FSCK process.
*/
final class FsckWorker extends Thread {
/** Optional value of the first metric this worker should start on, should
* be >0 */
final long start_id;
/** Value of the metric this worker should end on */
final long end_id;
/** Id of the thread this worker belongs to */
final int thread_id;
/** Optional query to execute instead of a full table scan */
final Query query;
/** Set of TSUIDs this worker has seen. Used to avoid UID resolution for
* previously processed row keys */
final Set<String> tsuids = new HashSet<String>();
/** Shared flags and values for compiling a compacted column */
byte[] compact_qualifier = null;
int qualifier_index = 0;
byte[] compact_value = null;
int value_index = 0;
boolean compact_row = false;
int qualifier_bytes = 0;
int value_bytes = 0;
/**
* Ctor for running a worker on a chunk of the data table
* @param start_id The first metric this worker should start on
* @param quotient How many metrics the worker should cover
* @param thread_id Id of the thread this worker is assigned for logging
*/
FsckWorker(final long start_id, final double quotient, final int thread_id) {
this.start_id = start_id;
this.end_id = start_id + (long) quotient + 1; // teensy bit of overlap
this.thread_id = thread_id;
query = null;
}
/**
* Ctor for running an FSCK over a specific query, scanning only rows that
* match the filter.
* @param query The query to execute
* @param thread_id Id of the thread this worker is assigned for logging
*/
FsckWorker(final Query query, final int thread_id) {
start_id = 0;
end_id = 0;
this.thread_id = thread_id;
this.query = query;
}
/**
* Determines the type of scanner to use, i.e. a specific query scanner or
* for a portion of the whole table. It then performs the actual scan,
* compiling a list of data points and fixing/compacting them when
* appropriate.
*/
public void run() {
final Scanner scanner = query != null ? Internal.getScanner(query) :
CliUtils.getDataTableScanner(tsdb, start_id, end_id);
// store every data point for the row in here
final TreeMap<Long, ArrayList<DP>> datapoints =
new TreeMap<Long, ArrayList<DP>>();
byte[] last_key = null;
ArrayList<ArrayList<KeyValue>> rows;
try {
while ((rows = scanner.nextRows().joinUninterruptibly()) != null) {
// keep in mind that with annotations and millisecond values, a row
// can now have more than 4069 key values, the default for a scanner.
// Since we don't know how many values may actually be in a row, we
// don't want to set the KV limit too high. Instead we'll just keep
// working through the sets until we hit a different row key, then
// process all of the data points. It puts more of a burden on fsck
// memory but we should be able to keep ~3M data points in memory
// without a problem.
for (final ArrayList<KeyValue> row : rows) {
if (last_key != null && Bytes.memcmp(row.get(0).key(), last_key) != 0) {
// new row so flush the old one
rows_processed.getAndIncrement();
if (!datapoints.isEmpty()) {
compact_qualifier = new byte[qualifier_bytes];
compact_value = new byte[value_bytes+1];
fsckDataPoints(datapoints);
resetCompaction();
datapoints.clear();
}
}
last_key = row.get(0).key();
fsckRow(row, datapoints);
}
}
// handle the last row
if (!datapoints.isEmpty()) {
rows_processed.getAndIncrement();
compact_qualifier = new byte[qualifier_bytes];
compact_value = new byte[value_bytes+1];
fsckDataPoints(datapoints);
}
} catch (Exception e) {
LOG.error("Shouldn't be here", e);
}
}
/**
* Parses the row of KeyValues. First it validates the row key, then parses
* each KeyValue to determine what kind of object it is. Data points are
* stored in the tree map and non-data point columns are handled per the
* option flags
* @param row The row of data to parse
* @param datapoints The map of datapoints to append to.
* @throws Exception If something goes pear shaped.
*/
private void fsckRow(final ArrayList<KeyValue> row,
final TreeMap<Long, ArrayList<DP>> datapoints) throws Exception {
// The data table should contain only rows with a metric, timestamp and
// one or more tag pairs. Future version may use different prefixes or
// key formats but for now, we can safely delete any rows with invalid
// keys. This may check the same row key multiple times but that's good
// as it will keep the data points from being pushed to the dp map
if (!fsckKey(row.get(0).key())) {
return;
}
final long base_time = Bytes.getUnsignedInt(row.get(0).key(),
TSDB.metrics_width());
for (final KeyValue kv : row) {
kvs_processed.getAndIncrement();
// these are not final as they may be modified when fixing is enabled
byte[] value = kv.value();
byte[] qual = kv.qualifier();
// all qualifiers must be at least 2 bytes long, i.e. a single data point
if (qual.length < 2) {
unknown.getAndIncrement();
LOG.error("Invalid qualifier, must be on 2 bytes or more.\n\t" + kv);
if (options.fix() && options.deleteUnknownColumns()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), kv);
tsdb.getClient().delete(delete);
unknown_fixed.getAndIncrement();
}
continue;
}
// All data point columns have an even number of bytes, so if we find
// one that has an odd length, it could be an OpenTSDB object or it
// could be junk that made it into the table.
if (qual.length % 2 != 0) {
// If this test fails, the column is not a TSDB object such as an
// annotation or blob. Future versions may be able to compact TSDB
// objects so that their qualifier would be of a different length, but
// for now we'll consider it an error.
if (qual.length != 3 && qual.length != 5) {
unknown.getAndIncrement();
LOG.error("Unknown qualifier, must be 2, 3, 5 or an even number " +
"of bytes.\n\t" + kv);
if (options.fix() && options.deleteUnknownColumns()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), kv);
tsdb.getClient().delete(delete);
unknown_fixed.getAndIncrement();
}
continue;
}
// TODO - create a list of TSDB objects and fsck them. Maybe a plugin
// or interface.
// TODO - perform validation of the annotation
if (qual[0] == Annotation.PREFIX()) {
annotations.getAndIncrement();
continue;
}
LOG.warn("Found an object possibly from a future version of OpenTSDB\n\t"
+ kv);
future.getAndIncrement();
continue;
}
// This is (hopefully) a compacted column with multiple data points. It
// could have two points with second qualifiers or multiple points with
// a mix of second and millisecond qualifiers
if (qual.length == 4 && !Internal.inMilliseconds(qual[0])
|| qual.length > 4) {
if (value[value.length - 1] > Const.MS_MIXED_COMPACT) {
// TODO - figure out a way to fix these. Maybe lookup a row before
// or after and try parsing this for values. If the values are
// somewhat close to the others, then we could just set the last
// byte. Otherwise it could be a bad compaction and we'd need to
// toss it.
bad_compacted_columns.getAndIncrement();
LOG.error("The last byte of a compacted should be 0 or 1. Either"
+ " this value is corrupted or it was written by a"
+ " future version of OpenTSDB.\n\t" + kv);
continue;
}
// add every cell in the compacted column to the data point tree so
// that we can scan for duplicate timestamps
try {
final ArrayList<Cell> cells = Internal.extractDataPoints(kv);
// the extractDataPoints() method will automatically fix up some
// issues such as setting proper lengths on floats and sorting the
// cells to be in order. Rather than reproduce the extraction code or
// add another method, we can just recompile the compacted qualifier
// as we run through it. If the new one is different (indicating a fix)
// then we'll replace it later on.
final byte[] recompacted_qualifier = new byte[kv.qualifier().length];
int qualifier_index = 0;
for (final Cell cell : cells) {
final long ts = cell.timestamp(base_time);
ArrayList<DP> dps = datapoints.get(ts);
if (dps == null) {
dps = new ArrayList<DP>(1);
datapoints.put(ts, dps);
}
dps.add(new DP(kv, cell));
qualifier_bytes += cell.qualifier().length;
value_bytes += cell.value().length;
System.arraycopy(cell.qualifier(), 0, recompacted_qualifier,
qualifier_index, cell.qualifier().length);
qualifier_index += cell.qualifier().length;
}
if (Bytes.memcmp(recompacted_qualifier, kv.qualifier()) != 0) {
LOG.error("Compacted column was out of order or requires a "
+ "fixup: " + kv);
fixable_compacted_columns.getAndIncrement();
}
compact_row = true;
} catch (IllegalDataException e) {
bad_compacted_columns.getAndIncrement();
LOG.error(e.getMessage());
if (options.fix() && options.deleteBadCompacts()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), kv);
tsdb.getClient().delete(delete);
bad_compacted_columns_deleted.getAndIncrement();
}
}
continue;
}
// at this point we *should* be dealing with a single data point encoded
// in seconds or milliseconds.
final long timestamp =
Internal.getTimestampFromQualifier(qual, base_time);
ArrayList<DP> dps = datapoints.get(timestamp);
if (dps == null) {
dps = new ArrayList<DP>(1);
datapoints.put(timestamp, dps);
}
dps.add(new DP(kv));
qualifier_bytes += kv.qualifier().length;
value_bytes += kv.value().length;
}
}
/**
* Validates the row key. It must match the format
* {@code <metric><timestamp><tagpair>[...<tagpair>]}. If it doesn't, then
* the row is considered an error. If the UIDs in a row key do not resolve
* to a name, then the row is considered an orphan and the values contained
* therein are NOT fsck'd. Also, if the TSUID in the row key has been seen
* before, then we don't re-resolve the UIDs. Saves a bit of CPU time.
* NOTE: We do not currently validate the timestamp in the row key. This
* would be a good TODO.
* NOTE: Global annotations are of the format {@code <metric=0><timestamp>}
* but fsck will not scan over those rows. Full table scans start at metric
* 1 and queries must match a valid name.
* @param key The row key to validate
* @return True if the row key is valid, false if it is not
* @throws Exception If something goes pear shaped.
*/
private boolean fsckKey(final byte[] key) throws Exception {
if (key.length < key_prefix_length ||
(key.length - key_prefix_length) % key_tags_length != 0) {
LOG.error("Invalid row key.\n\tKey: " + UniqueId.uidToString(key));
bad_key.getAndIncrement();
if (options.fix() && options.deleteBadRows()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), key);
tsdb.getClient().delete(delete);
bad_key_fixed.getAndIncrement();
}
return false;
}
// Process the time series ID by resolving the UIDs to names if we haven't
// already seen this particular TSUID
final byte[] tsuid = UniqueId.getTSUIDFromKey(key, TSDB.metrics_width(),
Const.TIMESTAMP_BYTES);
if (!tsuids.contains(tsuid)) {
try {
RowKey.metricNameAsync(tsdb, key).joinUninterruptibly();
} catch (NoSuchUniqueId nsui) {
LOG.error("Unable to resolve the metric from the row key.\n\tKey: "
+ UniqueId.uidToString(key) + "\n\t" + nsui.getMessage());
orphans.getAndIncrement();
if (options.fix() && options.deleteOrphans()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), key);
tsdb.getClient().delete(delete);
orphans_fixed.getAndIncrement();
}
return false;
}
try {
Tags.resolveIds(tsdb, (ArrayList<byte[]>)
UniqueId.getTagPairsFromTSUID(tsuid));
} catch (NoSuchUniqueId nsui) {
LOG.error("Unable to resolve the a tagk or tagv from the row key.\n\tKey: "
+ UniqueId.uidToString(key) + "\n\t" + nsui.getMessage());
orphans.getAndIncrement();
if (options.fix() && options.deleteOrphans()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), key);
tsdb.getClient().delete(delete);
orphans_fixed.getAndIncrement();
}
return false;
}
}
return true;
}
/**
* Processes each data point parsed from the row. Validates the qualifiers
* and values, fixing what it can and deleting those it can't. At the end
* it may write a new compacted column and remove the others. Also handles
* duplicate data point resolution.
* @param datapoints The list of data points parsed from the row
* @throws Exception If something goes pear shaped.
*/
private void fsckDataPoints(final Map<Long, ArrayList<DP>> datapoints)
throws Exception {
// store a unique set of qualifier/value columns to help us later when
// we need to delete or update the row
final ByteMap<byte[]> unique_columns = new ByteMap<byte[]>();
byte[] key = null;
boolean has_seconds = false;
boolean has_milliseconds = false;
boolean has_duplicates = false;
boolean has_uncorrected_value_error = false;
for (final Map.Entry<Long, ArrayList<DP>> time_map : datapoints.entrySet()) {
if (key == null) {
key = time_map.getValue().get(0).kv.key();
}
if (time_map.getValue().size() < 2) {
// there was only one data point for this timestamp, no conflicts
final DP dp = time_map.getValue().get(0);
valid_datapoints.getAndIncrement();
has_uncorrected_value_error |= Internal.isFloat(dp.qualifier()) ?
fsckFloat(dp) : fsckInteger(dp);
if (Internal.inMilliseconds(dp.qualifier())) {
has_milliseconds = true;
} else {
has_seconds = true;
}
unique_columns.put(dp.kv.qualifier(), dp.kv.value());
continue;
}
// sort so we can figure out which one we're going to keep, i.e. oldest
// or newest
Collections.sort(time_map.getValue());
has_duplicates = true;
// We want to keep either the first or the last incoming datapoint
// and ignore delete the middle.
final StringBuilder buf = new StringBuilder();
buf.append("More than one column had a value for the same timestamp: ")
.append("(")
.append(time_map.getKey())
.append(" - ")
.append(new Date(time_map.getKey()))
.append(")\n row key: (")
.append(UniqueId.uidToString(key))
.append(")\n");
int num_dupes = time_map.getValue().size();
final int delete_range_start;
final int delete_range_stop;
final DP dp_to_keep;
if (options.lastWriteWins()) {
// Save the latest datapoint from extinction.
delete_range_start = 0;
delete_range_stop = num_dupes - 1;
dp_to_keep = time_map.getValue().get(num_dupes - 1);
} else {
// Save the oldest datapoint from extinction.
delete_range_start = 1;
delete_range_stop = num_dupes;
dp_to_keep = time_map.getValue().get(0);
appendDatapointInfo(buf, dp_to_keep, " <--- keep oldest").append("\n");
}
unique_columns.put(dp_to_keep.kv.qualifier(), dp_to_keep.kv.value());
valid_datapoints.getAndIncrement();
has_uncorrected_value_error |= Internal.isFloat(dp_to_keep.qualifier()) ?
fsckFloat(dp_to_keep) : fsckInteger(dp_to_keep);
if (Internal.inMilliseconds(dp_to_keep.qualifier())) {
has_milliseconds = true;
} else {
has_seconds = true;
}
for (int dp_index = delete_range_start; dp_index < delete_range_stop;
dp_index++) {
duplicates.getAndIncrement();
DP dp = time_map.getValue().get(dp_index);
final byte flags = (byte)Internal.getFlagsFromQualifier(dp.kv.qualifier());
buf.append(" ")
.append("write time: (")
.append(dp.kv.timestamp())
.append(" - ")
.append(new Date(dp.kv.timestamp()))
.append(") ")
.append(" compacted: (")
.append(dp.compacted)
.append(") qualifier: ")
.append(Arrays.toString(dp.kv.qualifier()))
.append(" value: ")
.append(Internal.isFloat(dp.kv.qualifier()) ?
Internal.extractFloatingPointValue(dp.value(), 0, flags) :
Internal.extractIntegerValue(dp.value(), 0, flags))
.append("\n");
unique_columns.put(dp.kv.qualifier(), dp.kv.value());
if (options.fix() && options.resolveDupes()) {
if (compact_row) {
// Scheduled for deletion by compaction.
duplicates_fixed_comp.getAndIncrement();
} else if (!dp.compacted) {
LOG.debug("Removing duplicate data point: " + dp.kv);
tsdb.getClient().delete(
new DeleteRequest(
tsdb.dataTable(), dp.kv.key(), dp.kv.family(), dp.qualifier()
)
);
duplicates_fixed.getAndIncrement();
}
}
}
if (options.lastWriteWins()) {
appendDatapointInfo(buf, dp_to_keep, " <--- keep latest").append("\n");
}
LOG.info(buf.toString());
}
// if an error was found in this row that was not marked for repair, then
// we should bail at this point and not write a new compacted column.
if ((has_duplicates && !options.resolveDupes()) ||
(has_uncorrected_value_error && !options.deleteBadValues())) {
LOG.warn("One or more errors found in row that were not marked for repair");
return;
}
if ((options.compact() || compact_row) && options.fix()
&& qualifier_index > 0) {
if (qualifier_index == 2 || (qualifier_index == 4 &&
Internal.inMilliseconds(compact_qualifier))) {
// we may have deleted all but one value from the row and that one
// value may have a different qualifier than it originally had. We
// can't write a compacted column with a single data point as the length
// will be off due to the flag at the end. Therefore we just rollback
// the length of the value array.
value_index--;
} else if (has_seconds && has_milliseconds) {
// set mixed compact flag at end of the values array
compact_value[value_index] = 1;
}
value_index++;
final byte[] new_qualifier = Arrays.copyOfRange(compact_qualifier, 0,
qualifier_index);
final byte[] new_value = Arrays.copyOfRange(compact_value, 0,
value_index);
final PutRequest put = new PutRequest(tsdb.dataTable(), key,
TSDB.FAMILY(), new_qualifier, new_value);
// it's *possible* that the hash of our new compacted qualifier is in
// the delete list so double check before we delete everything
if (unique_columns.containsKey(new_qualifier)) {
if (Bytes.memcmp(unique_columns.get(new_qualifier), new_value) != 0) {
final StringBuilder buf = new StringBuilder();
buf.append("Overwriting compacted column with new value: ")
.append("\n row key: (")
.append(UniqueId.uidToString(key))
.append(")\n qualifier: ")
.append(Bytes.pretty(new_qualifier))
.append("\n value: ")
.append(Bytes.pretty(new_value));
LOG.info(buf.toString());
// Important: Make sure to wait for the write to complete before
// proceeding with the deletes.
tsdb.getClient().put(put).joinUninterruptibly();
} else if (has_duplicates) {
if (LOG.isDebugEnabled()) {
final StringBuilder buf = new StringBuilder();
buf.append("Re-compacted column is the same as the existing column: ")
.append("\n row key: (")
.append(UniqueId.uidToString(key))
.append(")\n qualifier: ")
.append(Bytes.pretty(new_qualifier))
.append("\n value: ")
.append(Bytes.pretty(new_value));
LOG.debug(buf.toString());
}
}
unique_columns.remove(new_qualifier);
} else {
// Important: Make sure to wait for the write to complete before
// proceeding with the deletes.
tsdb.getClient().put(put).joinUninterruptibly();
}
final List<Deferred<Object>> deletes =
new ArrayList<Deferred<Object>>(unique_columns.size());
for (byte[] qualifier : unique_columns.keySet()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), key,
TSDB.FAMILY(), qualifier);
if (LOG.isDebugEnabled()) {
final StringBuilder buf = new StringBuilder();
buf.append("Deleting column: ")
.append("\n row key: (")
.append(UniqueId.uidToString(key))
.append(")\n qualifier: ")
.append(Bytes.pretty(qualifier));
LOG.debug(buf.toString());
}
deletes.add(tsdb.getClient().delete(delete));
}
Deferred.group(deletes).joinUninterruptibly();
duplicates_fixed.getAndAdd(duplicates_fixed_comp.longValue());
duplicates_fixed_comp.set(0);
}
}
/**
* Handles validating a floating point value. Floats must be encoded on 4
* bytes for a Float and 8 bytes for a Double. The qualifier is compared to
* the actual length in the case of single data points. In previous versions
* of OpenTSDB, the qualifier flag may have been on 4 bytes but the actual
* value on 8. This method will fix those issues as well as an old bug
* where the first 4 bytes of the 8 byte value were sign-extended.
* @param dp The data point to process
* @return True if value was NOT fixed so the caller can avoid compacting.
* If false, then the value was good or it was repaired.
* @throws Exception If something goes pear shaped
*/
private boolean fsckFloat(final DP dp) throws Exception {
byte[] qual = dp.qualifier();
byte[] value = dp.value();
final byte length = Internal.getValueLengthFromQualifier(qual);
// The qualifier says the value is on 4 bytes, and the value is
// on 8 bytes, then the 4 MSBs must be 0s. Old versions of the
// code were doing this. It's kinda sad. Some versions had a
// bug whereby the value would be sign-extended, so we can
// detect these values and fix them here.
if (length == 4 && value.length == 8) {
if (value[0] == -1 && value[1] == -1
&& value[2] == -1 && value[3] == -1 && qual.length == 2) {
value_encoding.getAndIncrement();
LOG.error("Floating point value with 0xFF most significant"
+ " bytes, probably caused by sign extension bug"
+ " present in revisions [96908436..607256fc].\n"
+ "\t" + dp.kv);
if (options.fix()) {
final float value_as_float =
Float.intBitsToFloat(Bytes.getInt(value, 4));
value = Bytes.fromInt(
Float.floatToRawIntBits((float)value_as_float));
if (compact_row || options.compact()) {
appendDP(qual, value, 4);
} else if (!dp.compacted){
final PutRequest put = new PutRequest(tsdb.dataTable(),
dp.kv.key(), dp.kv.family(), qual, value);
tsdb.getClient().put(put);
} else {
LOG.error("SHOULDN'T be here as we didn't compact or fix a "
+ "single value");
}
value_encoding_fixed.getAndIncrement();
} else {
return true;
}
} else if (value[0] != 0 || value[1] != 0
|| value[2] != 0 || value[3] != 0) {
// can't happen if it was compacted
LOG.error("Floating point value was marked as 4 bytes long but"
+ " was actually 8 bytes long and the first four bytes were"
+ " not zeroed\n\t" + dp);
bad_values.getAndIncrement();
if (options.fix() && options.deleteBadValues() && !dp.compacted) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(),
dp.kv);
tsdb.getClient().delete(delete);
bad_values_deleted.getAndIncrement();
} else if (dp.compacted) {
LOG.error("The value was in a compacted column. This should "
+ "not be possible\n\t" + dp);
bad_compacted_columns.getAndIncrement();
return true;
} else {
return true;
}
} else {
// can't happen if it was compacted
LOG.warn("Floating point value was marked as 4 bytes long but"
+ " was actually 8 bytes long\n\t" + dp.kv);
value_encoding.getAndIncrement();
if (options.fix() && !dp.compacted) {
final float value_as_float =
Float.intBitsToFloat(Bytes.getInt(value, 4));
value = Bytes.fromInt(
Float.floatToRawIntBits((float)value_as_float));
if (compact_row || options.compact()) {
appendDP(qual, value, 4);
} else if (!dp.compacted) {
final PutRequest put = new PutRequest(tsdb.dataTable(),
dp.kv.key(), dp.kv.family(), qual, value);
tsdb.getClient().put(put);
} else {
LOG.error("SHOULDN'T be here as we didn't compact or fix a single value");
}
value_encoding_fixed.getAndIncrement();
} else {
return true;
}
}
} else if (length == 8 && value.length == 4) {
// could be a marked as a Double but actually encoded as a Float. BUT we
// don't know that and can't parse it accurately so tank it
bad_values.getAndIncrement();
LOG.error("This floating point value was marked as 8 bytes long but"
+ " was only " + value.length + " bytes.\n\t" + dp.kv);
if (options.fix() && options.deleteBadValues() && !dp.compacted) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), dp.kv);
tsdb.getClient().delete(delete);
bad_values_deleted.getAndIncrement();
} else if (dp.compacted) {
LOG.error("The previous value was in a compacted column. This should "
+ "not be possible.");
bad_compacted_columns.getAndIncrement();
} else {
return true;
}
} else if (value.length != 4 && value.length != 8) {
bad_values.getAndIncrement();
LOG.error("This floating point value must be encoded either on"
+ " 4 or 8 bytes, but it's on " + value.length
+ " bytes.\n\t" + dp.kv);
if (options.fix() && options.deleteBadValues() && !dp.compacted) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), dp.kv);
tsdb.getClient().delete(delete);
bad_values_deleted.getAndIncrement();
} else if (dp.compacted) {
LOG.error("The previous value was in a compacted column. This should "
+ "not be possible.");
bad_compacted_columns.getAndIncrement();
return true;
} else {
return true;
}
} else {
if (compact_row || options.compact()) {
appendDP(qual, value, value.length);
}
}
return false;
}
/**
* Handles validating an integer value. Integers must be encoded on 1, 2, 4
* or 8 bytes. Older versions of OpenTSDB wrote all integers on 8 bytes
* regardless of value. If the --fix flag is specified, this method will
* attempt to re-encode small values to save space (up to 7 bytes!!). It also
* makes sure the value length matches the length specified in the qualifier
* @param dp The data point to process
* @return True if value was NOT fixed so the caller can avoid compacting.
* If false, then the value was good or it was repaired.
* @throws Exception If something goes pear shaped
*/
private boolean fsckInteger(final DP dp) throws Exception {
byte[] qual = dp.qualifier();
byte[] value = dp.value();
// this should be a single integer value. Check the encoding to make
// sure it's the proper length, and if the flag is set to fix encoding
// we can save space with VLE.
final byte length = Internal.getValueLengthFromQualifier(qual);
if (value.length != length) {
// can't happen in a compacted column
bad_values.getAndIncrement();
LOG.error("The integer value is " + value.length + " bytes long but "
+ "should be " + length + " bytes.\n\t" + dp.kv);
if (options.fix() && options.deleteBadValues()) {
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(), dp.kv);
tsdb.getClient().delete(delete);
bad_values_deleted.getAndIncrement();
} else if (dp.compacted) {
LOG.error("The previous value was in a compacted column. This should "
+ "not be possible.");
bad_compacted_columns.getAndIncrement();
} else {
return true;
}
return false;
}
// OpenTSDB had support for VLE decoding of integers but only wrote
// on 8 bytes originally. Lets see how much space we could save.
// We'll assume that a length other than 8 bytes is already VLE'd
if (length == 8) {
final long decoded = Bytes.getLong(value);
if (Byte.MIN_VALUE <= decoded && decoded <= Byte.MAX_VALUE) {
vle.getAndIncrement();
vle_bytes.addAndGet(7);
value = new byte[] { (byte) decoded };
} else if (Short.MIN_VALUE <= decoded && decoded <= Short.MAX_VALUE) {
vle.getAndIncrement();
vle_bytes.addAndGet(6);
value = Bytes.fromShort((short) decoded);
} else if (Integer.MIN_VALUE <= decoded &&
decoded <= Integer.MAX_VALUE) {
vle.getAndIncrement();
vle_bytes.addAndGet(4);
value = Bytes.fromInt((int) decoded);
} // else it needs 8 bytes, it's on 8 bytes, yipee
if (length != value.length && options.fix()) {
final byte[] new_qualifier = Arrays.copyOf(qual, qual.length);
new_qualifier[new_qualifier.length - 1] &= 0xF0 | (value.length - 1);
if (compact_row || options.compact()) {
appendDP(new_qualifier, value, value.length);
} else {
// put the new value, THEN delete the old
final PutRequest put = new PutRequest(tsdb.dataTable(),
dp.kv.key(), dp.kv.family(), new_qualifier, value);
tsdb.getClient().put(put).joinUninterruptibly();
final DeleteRequest delete = new DeleteRequest(tsdb.dataTable(),
dp.kv.key(), dp.kv.family(), qual);
tsdb.getClient().delete(delete);
}
vle_fixed.getAndIncrement();
} // don't return true here as we don't consider a VLE an error.
} else {
if (compact_row || options.compact()) {
appendDP(qual, value, value.length);
}
}
return false;
}
/**
* Appends the given value to the running qualifier and value compaction
* byte arrays. It doesn't take a {@code DP} as we may be changing the
* arrays before they're re-written.
* @param new_qual The qualifier to append
* @param new_value The value to append
* @param value_length How much of the value to append
*/
private void appendDP(final byte[] new_qual, final byte[] new_value,
final int value_length) {
System.arraycopy(new_qual, 0, compact_qualifier, qualifier_index, new_qual.length);
qualifier_index += new_qual.length;
System.arraycopy(new_value, 0, compact_value, value_index, value_length);