-
Notifications
You must be signed in to change notification settings - Fork 27
/
ALABasicTransform.java
126 lines (113 loc) · 5.63 KB
/
ALABasicTransform.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package au.org.ala.pipelines.transforms;
import static org.gbif.api.model.pipelines.InterpretationType.RecordType.BASIC;
import static org.gbif.pipelines.common.PipelinesVariables.Metrics.BASIC_RECORDS_COUNT;
import au.org.ala.pipelines.interpreters.ALABasicInterpreter;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import lombok.Builder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.gbif.api.vocabulary.OccurrenceStatus;
import org.gbif.kvs.KeyValueStore;
import org.gbif.pipelines.core.functions.SerializableConsumer;
import org.gbif.pipelines.core.functions.SerializableSupplier;
import org.gbif.pipelines.core.interpreters.Interpretation;
import org.gbif.pipelines.core.interpreters.core.BasicInterpreter;
import org.gbif.pipelines.core.interpreters.core.CoreInterpreter;
import org.gbif.pipelines.core.interpreters.core.VocabularyInterpreter;
import org.gbif.pipelines.core.parsers.vocabulary.VocabularyService;
import org.gbif.pipelines.io.avro.BasicRecord;
import org.gbif.pipelines.io.avro.ExtendedRecord;
import org.gbif.pipelines.transforms.Transform;
/**
* Beam level transformations for the DWC Occurrence, reads an avro, writs an avro, maps from value
* to keyValue and transforms form {@link ExtendedRecord} to {@link BasicRecord}.
*
* @see <a href="https://dwc.tdwg.org/terms/#occurrence</a>
*/
public class ALABasicTransform extends Transform<ExtendedRecord, BasicRecord> {
private final SerializableSupplier<KeyValueStore<String, OccurrenceStatus>>
occStatusKvStoreSupplier;
private final SerializableSupplier<KeyValueStore<String, List<String>>> recordedByKvStoreSupplier;
private final SerializableSupplier<VocabularyService> vocabularyServiceSupplier;
private KeyValueStore<String, OccurrenceStatus> occStatusKvStore;
private KeyValueStore<String, List<String>> recordedByKvStore;
private VocabularyService vocabularyService;
@Builder(buildMethodName = "create")
private ALABasicTransform(
SerializableSupplier<KeyValueStore<String, OccurrenceStatus>> occStatusKvStoreSupplier,
SerializableSupplier<KeyValueStore<String, List<String>>> recordedByKvStoreSupplier,
SerializableSupplier<VocabularyService> vocabularyServiceSupplier) {
super(BasicRecord.class, BASIC, ALABasicTransform.class.getName(), BASIC_RECORDS_COUNT);
this.occStatusKvStoreSupplier = occStatusKvStoreSupplier;
this.recordedByKvStoreSupplier = recordedByKvStoreSupplier;
this.vocabularyServiceSupplier = vocabularyServiceSupplier;
}
/** Maps {@link BasicRecord} to key value, where key is {@link BasicRecord#getId} */
public MapElements<BasicRecord, KV<String, BasicRecord>> toKv() {
return MapElements.into(new TypeDescriptor<KV<String, BasicRecord>>() {})
.via((BasicRecord br) -> KV.of(br.getId(), br));
}
public ALABasicTransform counterFn(SerializableConsumer<String> counterFn) {
setCounterFn(counterFn);
return this;
}
/** Beam @Setup initializes resources */
@Setup
public void setup() {
if (occStatusKvStore == null && occStatusKvStoreSupplier != null) {
occStatusKvStore = occStatusKvStoreSupplier.get();
}
if (recordedByKvStore == null && recordedByKvStoreSupplier != null) {
recordedByKvStore = recordedByKvStoreSupplier.get();
}
if (vocabularyService == null && vocabularyServiceSupplier != null) {
vocabularyService = vocabularyServiceSupplier.get();
}
}
/** Beam @Teardown closes initialized resources */
@Teardown
public void tearDown() {
// NOP
}
@Override
public Optional<BasicRecord> convert(ExtendedRecord source) {
BasicRecord br =
BasicRecord.newBuilder()
.setId(source.getId())
.setCreated(Instant.now().toEpochMilli())
.build();
return Interpretation.from(source)
.to(br)
.when(er -> !er.getCoreTerms().isEmpty())
.via(BasicInterpreter::interpretBasisOfRecord)
.via(BasicInterpreter::interpretTypifiedName)
.via(BasicInterpreter::interpretSex)
.via(BasicInterpreter::interpretTypeStatus)
.via(BasicInterpreter::interpretIndividualCount)
.via((e, r) -> CoreInterpreter.interpretReferences(e, r, r::setReferences))
.via(BasicInterpreter::interpretOrganismQuantity)
.via(BasicInterpreter::interpretOrganismQuantityType)
.via((e, r) -> CoreInterpreter.interpretSampleSizeUnit(e, r::setSampleSizeUnit))
.via((e, r) -> CoreInterpreter.interpretSampleSizeValue(e, r::setSampleSizeValue))
.via(BasicInterpreter::interpretRelativeOrganismQuantity)
.via(BasicInterpreter::interpretIdentifiedByIds)
.via(BasicInterpreter::interpretRecordedByIds)
.via(BasicInterpreter.interpretOccurrenceStatus(occStatusKvStore))
.via(VocabularyInterpreter.interpretEstablishmentMeans(vocabularyService))
.via(VocabularyInterpreter.interpretDegreeOfEstablishment(vocabularyService))
.via(VocabularyInterpreter.interpretLifeStage(vocabularyService))
.via(ALABasicInterpreter::interpretLicense)
.via(ALABasicInterpreter.interpretRecordedBy(recordedByKvStore))
.via((e, r) -> CoreInterpreter.interpretDatasetID(e, r::setDatasetID))
.via((e, r) -> CoreInterpreter.interpretDatasetName(e, r::setDatasetName))
.via(BasicInterpreter::interpretOtherCatalogNumbers)
.via(BasicInterpreter::interpretIdentifiedBy)
.via(BasicInterpreter::interpretPreparations)
.via((e, r) -> CoreInterpreter.interpretSamplingProtocol(e, r::setSamplingProtocol))
.via(BasicInterpreter::setCoreId)
.getOfNullable();
}
}