/
TransformProcess.java
1495 lines (1359 loc) · 68.3 KB
/
TransformProcess.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*******************************************************************************
* Copyright (c) 2015-2018 Skymind, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/
package org.datavec.api.transform;
import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.transform.analysis.DataAnalysis;
import org.datavec.api.transform.analysis.columns.ColumnAnalysis;
import org.datavec.api.transform.analysis.columns.NumericalColumnAnalysis;
import org.datavec.api.transform.condition.Condition;
import org.datavec.api.transform.filter.ConditionFilter;
import org.datavec.api.transform.filter.Filter;
import org.datavec.api.transform.ndarray.NDArrayColumnsMathOpTransform;
import org.datavec.api.transform.ndarray.NDArrayDistanceTransform;
import org.datavec.api.transform.ndarray.NDArrayMathFunctionTransform;
import org.datavec.api.transform.ndarray.NDArrayScalarOpTransform;
import org.datavec.api.transform.rank.CalculateSortedRank;
import org.datavec.api.transform.reduce.IAssociativeReducer;
import org.datavec.api.transform.schema.Schema;
import org.datavec.api.transform.schema.SequenceSchema;
import org.datavec.api.transform.sequence.*;
import org.datavec.api.transform.sequence.trim.SequenceTrimToLengthTransform;
import org.datavec.api.transform.sequence.trim.SequenceTrimTransform;
import org.datavec.api.transform.sequence.window.ReduceSequenceByWindowTransform;
import org.datavec.api.transform.sequence.window.WindowFunction;
import org.datavec.api.transform.serde.JsonMappers;
import org.datavec.api.transform.transform.categorical.*;
import org.datavec.api.transform.transform.column.*;
import org.datavec.api.transform.transform.condition.ConditionalCopyValueTransform;
import org.datavec.api.transform.transform.condition.ConditionalReplaceValueTransform;
import org.datavec.api.transform.transform.condition.ConditionalReplaceValueTransformWithDefault;
import org.datavec.api.transform.transform.doubletransform.*;
import org.datavec.api.transform.transform.floattransform.FloatColumnsMathOpTransform;
import org.datavec.api.transform.transform.floattransform.FloatMathFunctionTransform;
import org.datavec.api.transform.transform.floattransform.FloatMathOpTransform;
import org.datavec.api.transform.transform.integer.ConvertToInteger;
import org.datavec.api.transform.transform.integer.IntegerColumnsMathOpTransform;
import org.datavec.api.transform.transform.integer.IntegerMathOpTransform;
import org.datavec.api.transform.transform.integer.IntegerToOneHotTransform;
import org.datavec.api.transform.transform.longtransform.LongColumnsMathOpTransform;
import org.datavec.api.transform.transform.longtransform.LongMathOpTransform;
import org.datavec.api.transform.transform.normalize.Normalize;
import org.datavec.api.transform.transform.sequence.SequenceMovingWindowReduceTransform;
import org.datavec.api.transform.transform.sequence.SequenceOffsetTransform;
import org.datavec.api.transform.transform.string.*;
import org.datavec.api.transform.transform.time.StringToTimeTransform;
import org.datavec.api.transform.transform.time.TimeMathOpTransform;
import org.datavec.api.writable.*;
import org.datavec.api.writable.comparator.WritableComparator;
import org.joda.time.DateTimeZone;
import org.nd4j.linalg.primitives.Pair;
import org.nd4j.shade.jackson.annotation.JsonProperty;
import org.nd4j.shade.jackson.core.JsonProcessingException;
import org.nd4j.shade.jackson.databind.exc.InvalidTypeIdException;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* A TransformProcess defines
* an ordered list of transformations
* to be executed on some data
*
* @author Alex Black
*/
@Data
@Slf4j
public class TransformProcess implements Serializable {
private final Schema initialSchema;
private List<DataAction> actionList;
public TransformProcess(@JsonProperty("initialSchema") Schema initialSchema,
@JsonProperty("actionList") List<DataAction> actionList) {
this.initialSchema = initialSchema;
this.actionList = actionList;
//Calculate and set the schemas for each tranformation:
Schema currInputSchema = initialSchema;
for (DataAction d : actionList) {
if (d.getTransform() != null) {
Transform t = d.getTransform();
t.setInputSchema(currInputSchema);
currInputSchema = t.transform(currInputSchema);
} else if (d.getFilter() != null) {
//Filter -> doesn't change schema. But we DO need to set the schema in the filter...
d.getFilter().setInputSchema(currInputSchema);
} else if (d.getConvertToSequence() != null) {
if (currInputSchema instanceof SequenceSchema) {
throw new RuntimeException("Cannot convert to sequence: schema is already a sequence schema: "
+ currInputSchema);
}
ConvertToSequence cts = d.getConvertToSequence();
cts.setInputSchema(currInputSchema);
currInputSchema = cts.transform(currInputSchema);
} else if (d.getConvertFromSequence() != null) {
ConvertFromSequence cfs = d.getConvertFromSequence();
if (!(currInputSchema instanceof SequenceSchema)) {
throw new RuntimeException("Cannot convert from sequence: schema is not a sequence schema: "
+ currInputSchema);
}
cfs.setInputSchema((SequenceSchema) currInputSchema);
currInputSchema = cfs.transform((SequenceSchema) currInputSchema);
} else if (d.getSequenceSplit() != null) {
d.getSequenceSplit().setInputSchema(currInputSchema);
continue; //no change to sequence schema
} else if (d.getReducer() != null) {
IAssociativeReducer reducer = d.getReducer();
reducer.setInputSchema(currInputSchema);
currInputSchema = reducer.transform(currInputSchema);
} else if (d.getCalculateSortedRank() != null) {
CalculateSortedRank csr = d.getCalculateSortedRank();
csr.setInputSchema(currInputSchema);
currInputSchema = csr.transform(currInputSchema);
} else {
throw new RuntimeException("Unknown action: " + d);
}
}
}
private TransformProcess(Builder builder) {
this(builder.initialSchema, builder.actionList);
}
/**
* Get the action list that this transform process
* will execute
* @return
*/
public List<DataAction> getActionList() {
return actionList;
}
/**
* Get the Schema of the output data, after executing the process
*
* @return Schema of the output data
*/
public Schema getFinalSchema() {
return getSchemaAfterStep(actionList.size());
}
/**
* Return the schema after executing all steps up to and including the specified step.
* Steps are indexed from 0: so getSchemaAfterStep(0) is after one transform has been executed.
*
* @param step Index of the step
* @return Schema of the data, after that (and all prior) steps have been executed
*/
public Schema getSchemaAfterStep(int step) {
Schema currInputSchema = initialSchema;
int i = 0;
for (DataAction d : actionList) {
if (d.getTransform() != null) {
Transform t = d.getTransform();
currInputSchema = t.transform(currInputSchema);
} else if (d.getFilter() != null) {
i++;
continue; //Filter -> doesn't change schema
} else if (d.getConvertToSequence() != null) {
if (currInputSchema instanceof SequenceSchema) {
throw new RuntimeException("Cannot convert to sequence: schema is already a sequence schema: "
+ currInputSchema);
}
ConvertToSequence cts = d.getConvertToSequence();
currInputSchema = cts.transform(currInputSchema);
} else if (d.getConvertFromSequence() != null) {
ConvertFromSequence cfs = d.getConvertFromSequence();
if (!(currInputSchema instanceof SequenceSchema)) {
throw new RuntimeException("Cannot convert from sequence: schema is not a sequence schema: "
+ currInputSchema);
}
currInputSchema = cfs.transform((SequenceSchema) currInputSchema);
} else if (d.getSequenceSplit() != null) {
continue; //Sequence split -> no change to schema
} else if (d.getReducer() != null) {
IAssociativeReducer reducer = d.getReducer();
currInputSchema = reducer.transform(currInputSchema);
} else if (d.getCalculateSortedRank() != null) {
CalculateSortedRank csr = d.getCalculateSortedRank();
currInputSchema = csr.transform(currInputSchema);
} else {
throw new RuntimeException("Unknown action: " + d);
}
if (i++ == step)
return currInputSchema;
}
return currInputSchema;
}
/**
* Execute the full sequence of transformations for a single example. May return null if example is filtered
* <b>NOTE:</b> Some TransformProcess operations cannot be done on examples individually. Most notably, ConvertToSequence
* and ConvertFromSequence operations require the full data set to be processed at once
*
* @param input
* @return
*/
public List<Writable> execute(List<Writable> input) {
List<Writable> currValues = input;
for (DataAction d : actionList) {
if (d.getTransform() != null) {
Transform t = d.getTransform();
currValues = t.map(currValues);
} else if (d.getFilter() != null) {
Filter f = d.getFilter();
if (f.removeExample(currValues))
return null;
} else if (d.getConvertToSequence() != null) {
throw new RuntimeException(
"Cannot execute examples individually: TransformProcess contains a ConvertToSequence operation");
} else if (d.getConvertFromSequence() != null) {
throw new RuntimeException(
"Unexpected operation: TransformProcess contains a ConvertFromSequence operation");
} else if (d.getSequenceSplit() != null) {
throw new RuntimeException(
"Cannot execute examples individually: TransformProcess contains a SequenceSplit operation");
} else {
throw new RuntimeException("Unknown action: " + d);
}
}
return currValues;
}
/**
*
* @param input
* @return
*/
public List<List<Writable>> executeSequenceToSequence(List<List<Writable>> input) {
List<List<Writable>> currValues = input;
for (DataAction d : actionList) {
if (d.getTransform() != null) {
Transform t = d.getTransform();
currValues = t.mapSequence(currValues);
} else if (d.getFilter() != null) {
if (d.getFilter().removeSequence(currValues)) {
return null;
}
} else if (d.getConvertToSequence() != null) {
throw new RuntimeException(
"Cannot execute examples individually: TransformProcess contains a ConvertToSequence operation");
} else if (d.getConvertFromSequence() != null) {
throw new RuntimeException(
"Unexpected operation: TransformProcess contains a ConvertFromSequence operation");
} else if (d.getSequenceSplit() != null) {
throw new RuntimeException(
"Cannot execute examples individually: TransformProcess contains a SequenceSplit operation");
} else {
throw new RuntimeException("Unknown or not supported action: " + d);
}
}
return currValues;
}
/**
* Execute the full sequence of transformations for a single time series (sequence). May return null if example is filtered
*/
public List<List<Writable>> executeSequence(List<List<Writable>> inputSequence) {
return executeSequenceToSequence(inputSequence);
}
/**
* Execute a TransformProcess that starts with a single (non-sequence) record,
* and converts it to a sequence record.
* <b>NOTE</b>: This method has the following significant limitation:
* if it contains a ConvertToSequence op,
* it MUST be using singleStepSequencesMode - see {@link ConvertToSequence} for details.<br>
* This restriction is necessary, as ConvertToSequence.singleStepSequencesMode is false, this requires a group by
* operation - i.e., we need to group multiple independent records together by key(s) - this isn't possible here,
* when providing a single example as input
*
* @param inputExample Input example
* @return Sequence, after processing (or null, if it was filtered out)
*/
public List<List<List<Writable>>> executeToSequenceBatch(List<List<Writable>> inputExample){
List<List<List<Writable>>> ret = new ArrayList<>();
for(List<Writable> record : inputExample)
ret.add(execute(record, null).getRight());
return ret;
}
/**
* Execute a TransformProcess that starts with a single (non-sequence) record,
* and converts it to a sequence record.
* <b>NOTE</b>: This method has the following significant limitation:
* if it contains a ConvertToSequence op,
* it MUST be using singleStepSequencesMode - see {@link ConvertToSequence} for details.<br>
* This restriction is necessary, as ConvertToSequence.singleStepSequencesMode is false, this requires a group by
* operation - i.e., we need to group multiple independent records together by key(s) - this isn't possible here,
* when providing a single example as input
*
* @param inputExample Input example
* @return Sequence, after processing (or null, if it was filtered out)
*/
public List<List<Writable>> executeToSequence(List<Writable> inputExample){
return execute(inputExample, null).getRight();
}
/**
* Execute a TransformProcess that starts with a sequence
* record, and converts it to a single (non-sequence) record
*
* @param inputSequence Input sequence
* @return Record after processing (or null if filtered out)
*/
public List<Writable> executeSequenceToSingle(List<List<Writable>> inputSequence){
return execute(null, inputSequence).getLeft();
}
private Pair<List<Writable>, List<List<Writable>>> execute(List<Writable> currEx, List<List<Writable>> currSeq){
for (DataAction d : actionList) {
if (d.getTransform() != null) {
Transform t = d.getTransform();
if(currEx != null){
currEx = t.map(currEx);
currSeq = null;
} else {
currEx = null;
currSeq = t.mapSequence(currSeq);
}
} else if (d.getFilter() != null) {
if( (currEx != null && d.getFilter().removeExample(currEx)) || d.getFilter().removeSequence(currEx)){
return new Pair<>(null, null);
}
} else if (d.getConvertToSequence() != null) {
if(d.getConvertToSequence().isSingleStepSequencesMode()){
if(currSeq != null){
throw new RuntimeException("Cannot execute ConvertToSequence op: current records are already a sequence");
} else {
currSeq = Collections.singletonList(currEx);
currEx = null;
}
} else {
//Can't execute this - would require a group-by operation, and we only have 1 example!
throw new RuntimeException( "Cannot execute examples individually: TransformProcess contains a" +
" ConvertToSequence operation, with singleStepSequnceeMode == false. Only " +
" ConvertToSequence operations with singleStepSequnceeMode == true can be executed individually " +
"as other types require a groupBy operation (which cannot be executed when only a sinlge record) " +
"is provided as input");
}
} else if (d.getConvertFromSequence() != null) {
throw new RuntimeException("Unexpected operation: TransformProcess contains a ConvertFromSequence" +
" operation. This would produce multiple output records, which cannot be executed using this method");
} else if (d.getSequenceSplit() != null) {
throw new RuntimeException( "Cannot execute examples individually: TransformProcess contains a" +
" SequenceSplit operation. This would produce multiple output records, which cannot be executed" +
" using this method");
} else {
throw new RuntimeException("Unknown or not supported action: " + d);
}
}
return new Pair<>(currEx, currSeq);
}
/**
* Convert the TransformProcess to a JSON string
*
* @return TransformProcess, as JSON
*/
public String toJson() {
try {
return JsonMappers.getMapper().writeValueAsString(this);
} catch (JsonProcessingException e) {
//TODO proper exception message
throw new RuntimeException(e);
}
}
/**
* Convert the TransformProcess to a YAML string
*
* @return TransformProcess, as YAML
*/
public String toYaml() {
try {
return JsonMappers.getMapper().writeValueAsString(this);
} catch (JsonProcessingException e) {
//TODO proper exception message
throw new RuntimeException(e);
}
}
/**
* Deserialize a JSON String (created by {@link #toJson()}) to a TransformProcess
*
* @return TransformProcess, from JSON
*/
public static TransformProcess fromJson(String json) {
try {
return JsonMappers.getMapper().readValue(json, TransformProcess.class);
} catch (InvalidTypeIdException e){
if(e.getMessage().contains("@class")){
//JSON may be legacy (1.0.0-alpha or earlier), attempt to load it using old format
try{
return JsonMappers.getLegacyMapper().readValue(json, TransformProcess.class);
} catch (IOException e2){
throw new RuntimeException(e2);
}
}
throw new RuntimeException(e);
} catch (IOException e) {
//TODO proper exception message
throw new RuntimeException(e);
}
}
/**
* Deserialize a JSON String (created by {@link #toJson()}) to a TransformProcess
*
* @return TransformProcess, from JSON
*/
public static TransformProcess fromYaml(String yaml) {
try {
return JsonMappers.getMapper().readValue(yaml, TransformProcess.class);
} catch (IOException e) {
//TODO proper exception message
throw new RuntimeException(e);
}
}
/**
* Infer the categories for the given record reader for a particular column
* Note that each "column index" is a column in the context of:
* List<Writable> record = ...;
* record.get(columnIndex);
*
* Note that anything passed in as a column will be automatically converted to a
* string for categorical purposes.
*
* The *expected* input is strings or numbers (which have sensible toString() representations)
*
* Note that the returned categories will be sorted alphabetically
*
* @param recordReader the record reader to iterate through
* @param columnIndex te column index to get categories for
* @return
*/
public static List<String> inferCategories(RecordReader recordReader,int columnIndex) {
Set<String> categories = new HashSet<>();
while(recordReader.hasNext()) {
List<Writable> next = recordReader.next();
categories.add(next.get(columnIndex).toString());
}
//Sort categories alphabetically - HashSet and RecordReader orders are not deterministic in general
List<String> ret = new ArrayList<>(categories);
Collections.sort(ret);
return ret;
}
/**
* Infer the categories for the given record reader for
* a particular set of columns (this is more efficient than
* {@link #inferCategories(RecordReader, int)}
* if you have more than one column you plan on inferring categories for)
*
* Note that each "column index" is a column in the context of:
* List<Writable> record = ...;
* record.get(columnIndex);
*
*
* Note that anything passed in as a column will be automatically converted to a
* string for categorical purposes. Results may vary depending on what's passed in.
* The *expected* input is strings or numbers (which have sensible toString() representations)
*
* Note that the returned categories will be sorted alphabetically, for each column
*
* @param recordReader the record reader to scan
* @param columnIndices the column indices the get
* @return the inferred categories
*/
public static Map<Integer,List<String>> inferCategories(RecordReader recordReader,int[] columnIndices) {
if(columnIndices == null || columnIndices.length < 1) {
return Collections.emptyMap();
}
Map<Integer,List<String>> categoryMap = new HashMap<>();
Map<Integer,Set<String>> categories = new HashMap<>();
for(int i = 0; i < columnIndices.length; i++) {
categoryMap.put(columnIndices[i],new ArrayList<String>());
categories.put(columnIndices[i],new HashSet<String>());
}
while(recordReader.hasNext()) {
List<Writable> next = recordReader.next();
for(int i = 0; i < columnIndices.length; i++) {
if(columnIndices[i] >= next.size()) {
log.warn("Filtering out example: Invalid length of columns");
continue;
}
categories.get(columnIndices[i]).add(next.get(columnIndices[i]).toString());
}
}
for(int i = 0; i < columnIndices.length; i++) {
categoryMap.get(columnIndices[i]).addAll(categories.get(columnIndices[i]));
//Sort categories alphabetically - HashSet and RecordReader orders are not deterministic in general
Collections.sort(categoryMap.get(columnIndices[i]));
}
return categoryMap;
}
/**
* Transforms a sequence
* of strings in to a sequence of writables
* (very similar to {@link #transformRawStringsToInput(String...)}
* for sequences
* @param sequence the sequence to transform
* @return the transformed input
*/
public List<List<Writable>> transformRawStringsToInputSequence(List<List<String>> sequence) {
List<List<Writable>> ret = new ArrayList<>();
for(List<String> input : sequence)
ret.add(transformRawStringsToInputList(input));
return ret;
}
/**
* Based on the input schema,
* map raw string values to the appropriate
* writable
* @param values the values to convert
* @return the transformed values based on the schema
*/
public List<Writable> transformRawStringsToInputList(List<String> values) {
List<Writable> ret = new ArrayList<>();
if (values.size() != initialSchema.numColumns())
throw new IllegalArgumentException(
String.format("Number of values %d does not match the number of input columns %d for schema",
values.size(), initialSchema.numColumns()));
for (int i = 0; i < values.size(); i++) {
switch (initialSchema.getType(i)) {
case String:
ret.add(new Text(values.get(i)));
break;
case Integer:
ret.add(new IntWritable(Integer.parseInt(values.get(i))));
break;
case Double:
ret.add(new DoubleWritable(Double.parseDouble(values.get(i))));
break;
case Float:
ret.add(new FloatWritable(Float.parseFloat(values.get(i))));
break;
case Categorical:
ret.add(new Text(values.get(i)));
break;
case Boolean:
ret.add(new BooleanWritable(Boolean.parseBoolean(values.get(i))));
break;
case Time:
break;
case Long:
ret.add(new LongWritable(Long.parseLong(values.get(i))));
}
}
return ret;
}
/**
* Based on the input schema,
* map raw string values to the appropriate
* writable
* @param values the values to convert
* @return the transformed values based on the schema
*/
public List<Writable> transformRawStringsToInput(String... values) {
return transformRawStringsToInputList(Arrays.asList(values));
}
/**
* Builder class for constructing a TransformProcess
*/
public static class Builder {
private List<DataAction> actionList = new ArrayList<>();
private Schema initialSchema;
public Builder(Schema initialSchema) {
this.initialSchema = initialSchema;
}
/**
* Add a transformation to be executed after the previously-added operations have been executed
*
* @param transform Transform to execute
*/
public Builder transform(Transform transform) {
actionList.add(new DataAction(transform));
return this;
}
/**
* Add a filter operation to be executed after the previously-added operations have been executed
*
* @param filter Filter operation to execute
*/
public Builder filter(Filter filter) {
actionList.add(new DataAction(filter));
return this;
}
/**
* Add a filter operation, based on the specified condition.
*
* If condition is satisfied (returns true): remove the example or sequence<br>
* If condition is not satisfied (returns false): keep the example or sequence
*
* @param condition Condition to filter on
*/
public Builder filter(Condition condition) {
return filter(new ConditionFilter(condition));
}
/**
* Remove all of the specified columns, by name
*
* @param columnNames Names of the columns to remove
*/
public Builder removeColumns(String... columnNames) {
return transform(new RemoveColumnsTransform(columnNames));
}
/**
* Remove all of the specified columns, by name
*
* @param columnNames Names of the columns to remove
*/
public Builder removeColumns(Collection<String> columnNames) {
return transform(new RemoveColumnsTransform(columnNames.toArray(new String[columnNames.size()])));
}
/**
* Remove all columns, except for those that are specified here
* @param columnNames Names of the columns to keep
*/
public Builder removeAllColumnsExceptFor(String... columnNames) {
return transform(new RemoveAllColumnsExceptForTransform(columnNames));
}
/**
* Remove all columns, except for those that are specified here
* @param columnNames Names of the columns to keep
*/
public Builder removeAllColumnsExceptFor(Collection<String> columnNames) {
return removeAllColumnsExceptFor(columnNames.toArray(new String[columnNames.size()]));
}
/**
* Rename a single column
*
* @param oldName Original column name
* @param newName New column name
*/
public Builder renameColumn(String oldName, String newName) {
return transform(new RenameColumnsTransform(oldName, newName));
}
/**
* Rename multiple columns
*
* @param oldNames List of original column names
* @param newNames List of new column names
*/
public Builder renameColumns(List<String> oldNames, List<String> newNames) {
return transform(new RenameColumnsTransform(oldNames, newNames));
}
/**
* Reorder the columns using a partial or complete new ordering.
* If only some of the column names are specified for the new order, the remaining columns will be placed at
* the end, according to their current relative ordering
*
* @param newOrder Names of the columns, in the order they will appear in the output
*/
public Builder reorderColumns(String... newOrder) {
return transform(new ReorderColumnsTransform(newOrder));
}
/**
* Duplicate a single column
*
* @param column Name of the column to duplicate
* @param newName Name of the new (duplicate) column
*/
public Builder duplicateColumn(String column, String newName) {
return transform(new DuplicateColumnsTransform(Collections.singletonList(column),
Collections.singletonList(newName)));
}
/**
* Duplicate a set of columns
*
* @param columnNames Names of the columns to duplicate
* @param newNames Names of the new (duplicated) columns
*/
public Builder duplicateColumns(List<String> columnNames, List<String> newNames) {
return transform(new DuplicateColumnsTransform(columnNames, newNames));
}
/**
* Perform a mathematical operation (add, subtract, scalar max etc) on the specified integer column, with a scalar
*
* @param column The integer column to perform the operation on
* @param mathOp The mathematical operation
* @param scalar The scalar value to use in the mathematical operation
*/
public Builder integerMathOp(String column, MathOp mathOp, int scalar) {
return transform(new IntegerMathOpTransform(column, mathOp, scalar));
}
/**
* Calculate and add a new integer column by performing a mathematical operation on a number of existing columns.
* New column is added to the end.
*
* @param newColumnName Name of the new/derived column
* @param mathOp Mathematical operation to execute on the columns
* @param columnNames Names of the columns to use in the mathematical operation
*/
public Builder integerColumnsMathOp(String newColumnName, MathOp mathOp, String... columnNames) {
return transform(new IntegerColumnsMathOpTransform(newColumnName, mathOp, columnNames));
}
/**
* Perform a mathematical operation (add, subtract, scalar max etc) on the specified long column, with a scalar
*
* @param columnName The long column to perform the operation on
* @param mathOp The mathematical operation
* @param scalar The scalar value to use in the mathematical operation
*/
public Builder longMathOp(String columnName, MathOp mathOp, long scalar) {
return transform(new LongMathOpTransform(columnName, mathOp, scalar));
}
/**
* Calculate and add a new long column by performing a mathematical operation on a number of existing columns.
* New column is added to the end.
*
* @param newColumnName Name of the new/derived column
* @param mathOp Mathematical operation to execute on the columns
* @param columnNames Names of the columns to use in the mathematical operation
*/
public Builder longColumnsMathOp(String newColumnName, MathOp mathOp, String... columnNames) {
return transform(new LongColumnsMathOpTransform(newColumnName, mathOp, columnNames));
}
/**
* Perform a mathematical operation (add, subtract, scalar max etc) on the specified double column, with a scalar
*
* @param columnName The float column to perform the operation on
* @param mathOp The mathematical operation
* @param scalar The scalar value to use in the mathematical operation
*/
public Builder floatMathOp(String columnName, MathOp mathOp, float scalar) {
return transform(new FloatMathOpTransform(columnName, mathOp, scalar));
}
/**
* Calculate and add a new float column by performing a mathematical operation on a number of existing columns.
* New column is added to the end.
*
* @param newColumnName Name of the new/derived column
* @param mathOp Mathematical operation to execute on the columns
* @param columnNames Names of the columns to use in the mathematical operation
*/
public Builder floatColumnsMathOp(String newColumnName, MathOp mathOp, String... columnNames) {
return transform(new FloatColumnsMathOpTransform(newColumnName, mathOp, columnNames));
}
/**
* Perform a mathematical operation (such as sin(x), ceil(x), exp(x) etc) on a column
*
* @param columnName Column name to operate on
* @param mathFunction MathFunction to apply to the column
*/
public Builder floatMathFunction(String columnName, MathFunction mathFunction) {
return transform(new FloatMathFunctionTransform(columnName, mathFunction));
}
/**
* Perform a mathematical operation (add, subtract, scalar max etc) on the specified double column, with a scalar
*
* @param columnName The double column to perform the operation on
* @param mathOp The mathematical operation
* @param scalar The scalar value to use in the mathematical operation
*/
public Builder doubleMathOp(String columnName, MathOp mathOp, double scalar) {
return transform(new DoubleMathOpTransform(columnName, mathOp, scalar));
}
/**
* Calculate and add a new double column by performing a mathematical operation on a number of existing columns.
* New column is added to the end.
*
* @param newColumnName Name of the new/derived column
* @param mathOp Mathematical operation to execute on the columns
* @param columnNames Names of the columns to use in the mathematical operation
*/
public Builder doubleColumnsMathOp(String newColumnName, MathOp mathOp, String... columnNames) {
return transform(new DoubleColumnsMathOpTransform(newColumnName, mathOp, columnNames));
}
/**
* Perform a mathematical operation (such as sin(x), ceil(x), exp(x) etc) on a column
*
* @param columnName Column name to operate on
* @param mathFunction MathFunction to apply to the column
*/
public Builder doubleMathFunction(String columnName, MathFunction mathFunction) {
return transform(new DoubleMathFunctionTransform(columnName, mathFunction));
}
/**
* Perform a mathematical operation (add, subtract, scalar min/max only) on the specified time column
*
* @param columnName The integer column to perform the operation on
* @param mathOp The mathematical operation
* @param timeQuantity The quantity used in the mathematical op
* @param timeUnit The unit that timeQuantity is specified in
*/
public Builder timeMathOp(String columnName, MathOp mathOp, long timeQuantity, TimeUnit timeUnit) {
return transform(new TimeMathOpTransform(columnName, mathOp, timeQuantity, timeUnit));
}
/**
* Convert the specified column(s) from a categorical representation to a one-hot representation.
* This involves the creation of multiple new columns each.
*
* @param columnNames Names of the categorical column(s) to convert to a one-hot representation
*/
public Builder categoricalToOneHot(String... columnNames) {
for (String s : columnNames) {
transform(new CategoricalToOneHotTransform(s));
}
return this;
}
/**
* Convert the specified column(s) from a categorical representation to an integer representation.
* This will replace the specified categorical column(s) with an integer repreesentation, where
* each integer has the value 0 to numCategories-1.
*
* @param columnNames Name of the categorical column(s) to convert to an integer representation
*/
public Builder categoricalToInteger(String... columnNames) {
for (String s : columnNames) {
transform(new CategoricalToIntegerTransform(s));
}
return this;
}
/**
* Convert the specified column from an integer representation (assume values 0 to numCategories-1) to
* a categorical representation, given the specified state names
*
* @param columnName Name of the column to convert
* @param categoryStateNames Names of the states for the categorical column
*/
public Builder integerToCategorical(String columnName, List<String> categoryStateNames) {
return transform(new IntegerToCategoricalTransform(columnName, categoryStateNames));
}
/**
* Convert the specified column from an integer representation to a categorical representation, given the specified
* mapping between integer indexes and state names
*
* @param columnName Name of the column to convert
* @param categoryIndexNameMap Names of the states for the categorical column
*/
public Builder integerToCategorical(String columnName, Map<Integer, String> categoryIndexNameMap) {
return transform(new IntegerToCategoricalTransform(columnName, categoryIndexNameMap));
}
/**
* Convert an integer column to a set of 1 hot columns, based on the value in integer column
*
* @param columnName Name of the integer column
* @param minValue Minimum value possible for the integer column (inclusive)
* @param maxValue Maximum value possible for the integer column (inclusive)
*/
public Builder integerToOneHot(String columnName, int minValue, int maxValue) {
return transform(new IntegerToOneHotTransform(columnName, minValue, maxValue));
}
/**
* Add a new column, where all values in the column are identical and as specified.
*
* @param newColumnName Name of the new column
* @param newColumnType Type of the new column
* @param fixedValue Value in the new column for all records
*/
public Builder addConstantColumn(String newColumnName, ColumnType newColumnType, Writable fixedValue) {
return transform(new AddConstantColumnTransform(newColumnName, newColumnType, fixedValue));
}
/**
* Add a new double column, where the value for that column (for all records) are identical
*
* @param newColumnName Name of the new column
* @param value Value in the new column for all records
*/
public Builder addConstantDoubleColumn(String newColumnName, double value) {
return addConstantColumn(newColumnName, ColumnType.Double, new DoubleWritable(value));
}
/**
* Add a new integer column, where th
* e value for that column (for all records) are identical
*
* @param newColumnName Name of the new column
* @param value Value of the new column for all records
*/
public Builder addConstantIntegerColumn(String newColumnName, int value) {
return addConstantColumn(newColumnName, ColumnType.Integer, new IntWritable(value));
}
/**
* Add a new integer column, where the value for that column (for all records) are identical
*
* @param newColumnName Name of the new column
* @param value Value in the new column for all records
*/
public Builder addConstantLongColumn(String newColumnName, long value) {
return addConstantColumn(newColumnName, ColumnType.Long, new LongWritable(value));
}
/**
* Convert the specified column to a string.
* @param inputColumn the input column to convert
* @return builder pattern
*/
public Builder convertToString(String inputColumn) {
return transform(new ConvertToString(inputColumn));
}
/**
* Convert the specified column to a double.
* @param inputColumn the input column to convert
* @return builder pattern
*/
public Builder convertToDouble(String inputColumn) {
return transform(new ConvertToDouble(inputColumn));
}
/**
* Convert the specified column to an integer.
* @param inputColumn the input column to convert
* @return builder pattern
*/
public Builder convertToInteger(String inputColumn) {
return transform(new ConvertToInteger(inputColumn));
}
/**
* Normalize the specified column with a given type of normalization
*
* @param column Column to normalize
* @param type Type of normalization to apply