-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
test_record_reader_xml.groovy
86 lines (73 loc) · 3.85 KB
/
test_record_reader_xml.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
class GroovyXmlRecordReader implements RecordReader {
def recordIterator
def recordSchema
GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) {
recordSchema = schema
def xml = new XmlSlurper().parse(inputStream)
// Change the XML fields to a MapRecord for each incoming record
recordIterator = xml[recordTag].collect {r ->
// Create a map of field names to values, using the field names from the schema as keys into the XML object
def fields = recordSchema.fieldNames.inject([:]) {result, fieldName ->
result[fieldName] = r[fieldName].toString()
result
}
new MapRecord(recordSchema, fields)
}.iterator()
}
Record nextRecord() throws IOException, MalformedRecordException {
return recordIterator?.hasNext() ? recordIterator.next() : null
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException {
}
}
class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue()
if (!schemaText) return null
def jsonSchema = new JsonSlurper().parseText(schemaText)
def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field ->
def entry = field.entrySet()[0]
new RecordField(entry.key, RecordFieldType.of(entry.value).dataType)
} as List<RecordField>)
return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream)
}
}
// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
reader = new GroovyXmlRecordReaderFactory()