Skip to content

Commit

Permalink
NIFI-4756: Updated PublishKafkaRecord processors to include attribute…
Browse files Browse the repository at this point in the history
…s generated from schema write strategy into the message headers when appropriate

This closes apache#2396.

Signed-off-by: Bryan Bende <bbende@apache.org>
  • Loading branch information
markap14 authored and mcgilman committed Jan 16, 2018
1 parent d309819 commit 6caf956
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 611 deletions.
Expand Up @@ -21,23 +21,45 @@
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema;

public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
@Override
protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> eldest) {
return size() > 10;
}
};

@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
}

@Override
public Map<String, String> getAttributes(final RecordSchema schema) {
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
final String schemaText = avroSchema.toString();
// First, check if schema has the Avro Text available already.
final Optional<String> schemaFormat = schema.getSchemaFormat();
if (schemaFormat.isPresent() && AvroTypeUtil.AVRO_SCHEMA_FORMAT.equals(schemaFormat.get())) {
final Optional<String> schemaText = schema.getSchemaText();
if (schemaText.isPresent()) {
return Collections.singletonMap("avro.schema", schemaText.get());
}
}

String schemaText = avroSchemaTextCache.get(schema);
if (schemaText == null) {
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
schemaText = avroSchema.toString();
avroSchemaTextCache.put(schema, schemaText);
}

return Collections.singletonMap("avro.schema", schemaText);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -39,6 +40,7 @@
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
Expand Down Expand Up @@ -164,16 +166,18 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet
recordCount++;
baos.reset();

Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
writer.write(record);
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();
}

final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);

publish(flowFile, messageKey, messageContent, topic, tracker);
publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);

if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
Expand All @@ -195,7 +199,7 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet
}
}

private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
if (attributeNameRegex == null) {
return;
}
Expand All @@ -206,11 +210,23 @@ private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> reco
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
}
}

for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
if (attributeNameRegex.matcher(entry.getKey()).matches()) {
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
}
}
}

protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
}

protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {

final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
addHeaders(flowFile, record);
addHeaders(flowFile, additionalAttributes, record);

producer.send(record, new Callback() {
@Override
Expand Down
Expand Up @@ -177,7 +177,7 @@ public void testMultipleMessagesPerFlowFile() throws IOException {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_11.REL_SUCCESS, 2);

verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
Expand Down
Expand Up @@ -38,12 +38,12 @@
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
Expand Down Expand Up @@ -270,13 +270,12 @@ public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException,
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");

final String topic = "unit-test";
final String keyField = "person_id";

final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));

Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);

Expand Down

This file was deleted.

Expand Up @@ -108,7 +108,7 @@ public String getMimeType() {

@Override
public WriteResult write(Record record) throws IOException {
return null;
return WriteResult.of(1, Collections.emptyMap());
}

@Override
Expand Down

0 comments on commit 6caf956

Please sign in to comment.