Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
configClass = RedisSinkConfig.class
)
@Slf4j
public class RedisSink<T> implements Sink<T> {
public class RedisSink implements Sink<byte[]> {

private RedisSinkConfig redisSinkConfig;

Expand All @@ -61,7 +61,7 @@ public class RedisSink<T> implements Sink<T> {

private int batchSize;

private List<Record<T>> incomingList;
private List<Record<byte[]>> incomingList;

private ScheduledExecutorService flushExecutor;

Expand All @@ -84,7 +84,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}

@Override
public void write(Record<T> record) throws Exception {
public void write(Record<byte[]> record) throws Exception {
int currentSize;
synchronized (this) {
incomingList.add(record);
Expand All @@ -108,7 +108,7 @@ public void close() throws Exception {

private void flush() {
final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
final List<Record<T>> recordsToFlush;
final List<Record<byte[]>> recordsToFlush;

synchronized (this) {
if (incomingList.isEmpty()) {
Expand All @@ -119,11 +119,11 @@ private void flush() {
}

if (CollectionUtils.isNotEmpty(recordsToFlush)) {
for (Record<T> record: recordsToFlush) {
for (Record<byte[]> record: recordsToFlush) {
try {
// records with null keys or values will be ignored
byte[] key = toBytes("key", record.getKey().orElse(null));
byte[] value = toBytes("value", record.getValue());
byte[] key = record.getKey().isPresent() ? record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
byte[] value = record.getValue();
recordsToSet.put(key, value);
} catch (Exception e) {
record.fail();
Expand Down Expand Up @@ -155,19 +155,4 @@ private void flush() {
log.error("Redis mset data interrupted exception ", e);
}
}

private byte[] toBytes(String src, Object obj) {
final byte[] result;
if (obj instanceof String) {
String s = (String) obj;
result = s.getBytes(StandardCharsets.UTF_8);
} else if (obj instanceof byte[]) {
result = (byte[]) obj;
} else if (null == obj) {
result = null;
} else {
throw new IllegalArgumentException(String.format("The %s for the record must be String or Bytes.", src));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,18 @@
*/
package org.apache.pulsar.io.redis.sink;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* redis Sink test
Expand All @@ -48,17 +39,6 @@ public class RedisSinkTest {

private EmbeddedRedisUtils embeddedRedisUtils;

/**
* A Simple class to test redis class
*/
@Data
@ToString
@EqualsAndHashCode
public static class Foo {
private String field1;
private String field2;
}

@BeforeMethod
public void setUp() throws Exception {
embeddedRedisUtils = new EmbeddedRedisUtils(getClass().getSimpleName());
Expand All @@ -83,26 +63,7 @@ public void TestOpenAndWriteSink() throws Exception {
RedisSink sink = new RedisSink();

// prepare a foo Record
Foo obj = new Foo();
obj.setField1("FakeFiled1");
obj.setField2("FakeFiled1");
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);

byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));

Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();

log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
record.getValue().toString());
Record<byte[]> record = build("fakeTopic", "fakeKey", "fakeValue");

// open should success
sink.open(configs, null);
Expand All @@ -115,4 +76,29 @@ public void TestOpenAndWriteSink() throws Exception {
Thread.sleep(1000);

}

private Record<byte[]> build(String topic, String key, String value) {
// prepare a SinkRecord
SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
@Override
public Optional<String> getKey() {
return Optional.empty();
}

@Override
public byte[] getValue() {
return value.getBytes(StandardCharsets.UTF_8);
}

@Override
public Optional<String> getDestinationTopic() {
if (topic != null) {
return Optional.of(topic);
} else {
return Optional.empty();
}
}
}, value.getBytes(StandardCharsets.UTF_8));
return record;
}
}