Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4756: Updated PublishKafkaRecord processors to include attribute… #2396

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,23 +21,45 @@
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema;

public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
@Override
protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> eldest) {
return size() > 10;
}
};

@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
}

@Override
public Map<String, String> getAttributes(final RecordSchema schema) {
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
final String schemaText = avroSchema.toString();
// First, check if schema has the Avro Text available already.
final Optional<String> schemaFormat = schema.getSchemaFormat();
if (schemaFormat.isPresent() && AvroTypeUtil.AVRO_SCHEMA_FORMAT.equals(schemaFormat.get())) {
final Optional<String> schemaText = schema.getSchemaText();
if (schemaText.isPresent()) {
return Collections.singletonMap("avro.schema", schemaText.get());
}
}

String schemaText = avroSchemaTextCache.get(schema);
if (schemaText == null) {
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
schemaText = avroSchema.toString();
avroSchemaTextCache.put(schema, schemaText);
}

return Collections.singletonMap("avro.schema", schemaText);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -39,6 +40,7 @@
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
Expand Down Expand Up @@ -164,16 +166,18 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet
recordCount++;
baos.reset();

Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
writer.write(record);
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();
}

final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);

publish(flowFile, messageKey, messageContent, topic, tracker);
publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);

if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
Expand All @@ -195,7 +199,7 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet
}
}

private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
if (attributeNameRegex == null) {
return;
}
Expand All @@ -206,11 +210,23 @@ private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> reco
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
}
}

for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
if (attributeNameRegex.matcher(entry.getKey()).matches()) {
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
}
}
}

protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
}

protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {

final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
addHeaders(flowFile, record);
addHeaders(flowFile, additionalAttributes, record);

producer.send(record, new Callback() {
@Override
Expand Down
Expand Up @@ -177,7 +177,7 @@ public void testMultipleMessagesPerFlowFile() throws IOException {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_11.REL_SUCCESS, 2);

verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
Expand Down Expand Up @@ -270,13 +271,12 @@ public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException,
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");

final String topic = "unit-test";
final String keyField = "person_id";

final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));

Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);

Expand Down

This file was deleted.

Expand Up @@ -108,7 +108,7 @@ public String getMimeType() {

@Override
public WriteResult write(Record record) throws IOException {
return null;
return WriteResult.of(1, Collections.emptyMap());
}

@Override
Expand Down