Skip to content

Commit

Permalink
[FLINK-15571] Change Redis Sink to AsyncSink implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
eskabetxe committed Nov 29, 2022
1 parent d272e01 commit fdc3d79
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.redis.streams.sink.command;

package org.apache.flink.connector.redis.streams.sink;

import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;

import redis.clients.jedis.StreamEntryID;

import java.io.Serializable;
import java.util.Map;

public class StreamRedisCommand implements RedisCommand {
/** A Redis Streams Command. */
public class RedisStreamsCommand implements Serializable {

private transient StreamEntryID streamId = null;
public final String key;
public final Map<String, String> value;

private StreamRedisCommand(String key, Map<String, String> value) {
private RedisStreamsCommand(String key, Map<String, String> value) {
this.key = key;
this.value = value;
}
Expand All @@ -35,11 +40,33 @@ public static Builder builder() {
return new Builder();
}

@Override
public void send(JedisConnector connector) {
connector.getJedisCommands().xadd(key, StreamEntryID.NEW_ENTRY, value);
this.streamId =
connector
.getJedisCommands()
.xadd(
key,
(this.streamId != null) ? this.streamId : StreamEntryID.NEW_ENTRY,
value);
}

public boolean sendCorrectly() {
return true;
}

public boolean sendIncorrectly() {
return !sendCorrectly();
}

public long getMessageSize() {
return this.key.length()
+ this.value.entrySet().stream()
.map(k -> k.getKey().length() + k.getValue().length())
.reduce(Integer::sum)
.orElse(0);
}

/** The builder for {@link RedisStreamsCommand}. */
public static class Builder {
private String key;
private Map<String, String> value;
Expand All @@ -54,8 +81,8 @@ public Builder withValue(Map<String, String> value) {
return this;
}

public StreamRedisCommand build() {
return new StreamRedisCommand(key, value);
public RedisStreamsCommand build() {
return new RedisStreamsCommand(key, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.redis.streams.sink;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.connector.redis.streams.sink.command.RedisCommand;
package org.apache.flink.connector.redis.streams.sink;

import java.io.Serializable;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

/**
* Function that creates the description how the input data should be mapped to redis type.
*
* @param <T> The type of the element handled by this {@code RedisSerializer}
*/
public interface RedisSerializer<T> extends Function, Serializable {

RedisCommand getMessage(T input);
}
public interface RedisStreamsCommandSerializer<T>
extends ElementConverter<T, RedisStreamsCommand> {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,72 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.redis.streams.sink;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.redis.streams.sink.config.JedisConfig;
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnectorBuilder;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

public class RedisStreamsSink<T> implements Sink<T> {
/**
* A sink for publishing data into Redis.
*
* @param <T>
*/
public class RedisStreamsSink<T> extends AsyncSinkBase<T, RedisStreamsCommand> {

private final JedisConfig jedisConfig;
private final RedisSerializer<T> serializer;

public RedisStreamsSink(JedisConfig jedisConfig, RedisSerializer<T> serializer) {
public RedisStreamsSink(
JedisConfig jedisConfig,
RedisStreamsCommandSerializer<T> converter,
AsyncSinkWriterConfiguration asyncConfig) {
super(
converter,
asyncConfig.getMaxBatchSize(),
asyncConfig.getMaxInFlightRequests(),
asyncConfig.getMaxBufferedRequests(),
asyncConfig.getMaxBatchSizeInBytes(),
asyncConfig.getMaxTimeInBufferMS(),
asyncConfig.getMaxRecordSizeInBytes());
this.jedisConfig = jedisConfig;
this.serializer = serializer;
}

@Override
public SinkWriter<T> createWriter(InitContext initContext) throws IOException {
public RedisStreamsWriter<T> createWriter(InitContext initContext) throws IOException {
return restoreWriter(initContext, Collections.emptyList());
}

@Override
public RedisStreamsWriter<T> restoreWriter(
InitContext initContext,
Collection<BufferedRequestState<RedisStreamsCommand>> recoveredState)
throws IOException {
AsyncSinkWriterConfiguration asyncConfig =
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.build();
JedisConnector connection = JedisConnectorBuilder.build(jedisConfig);
return new RedisStreamsWriter<>(connection, this.serializer);
return new RedisStreamsWriter<>(
connection, getElementConverter(), asyncConfig, initContext, recoveredState);
}

@Override
public SimpleVersionedSerializer<BufferedRequestState<RedisStreamsCommand>>
getWriterStateSerializer() {
return new RedisStreamsStateSerializer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.redis.streams.sink;

import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** The Redis implementation for {@link SimpleVersionedSerializer}. */
public class RedisStreamsStateSerializer
implements SimpleVersionedSerializer<BufferedRequestState<RedisStreamsCommand>> {

@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(BufferedRequestState<RedisStreamsCommand> obj) throws IOException {
Collection<RequestEntryWrapper<RedisStreamsCommand>> bufferState =
obj.getBufferedRequestEntries();

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {

out.writeInt(getVersion());
out.writeInt(bufferState.size());

for (RequestEntryWrapper<RedisStreamsCommand> wrapper : bufferState) {
RedisStreamsCommand command = wrapper.getRequestEntry();
writeString(out, command.key);
out.writeInt(command.value.size());
for (Map.Entry<String, String> entry : command.value.entrySet()) {
writeString(out, entry.getKey());
writeString(out, entry.getValue());
}
}

out.flush();
return baos.toByteArray();
}
}

@Override
public BufferedRequestState<RedisStreamsCommand> deserialize(int version, byte[] serialized)
throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {

int byteVersion = in.readInt();

int bufferSize = in.readInt();
List<RequestEntryWrapper<RedisStreamsCommand>> state = new ArrayList<>();
for (int bs = 0; bs < bufferSize; bs++) {
String key = readString(in);

int valueSize = in.readInt();
Map<String, String> values = new HashMap<>();
for (int i = 0; i < valueSize; i++) {
String eKey = readString(in);
String eValue = readString(in);
values.put(eKey, eValue);
}

RedisStreamsCommand command =
RedisStreamsCommand.builder().withKey(key).withValue(values).build();

state.add(new RequestEntryWrapper<>(command, command.getMessageSize()));
}
return new BufferedRequestState<>(state);
}
}

private void writeString(final DataOutputStream out, String value) throws IOException {
out.writeInt(value.length());
out.writeBytes(value);
}

private String readString(final DataInputStream in) throws IOException {
int sizeToRead = in.readInt();
byte[] bytesRead = new byte[sizeToRead];
int sizeRead = in.read(bytesRead);

if (sizeToRead != sizeRead) {
throw new IOException(
String.format("Expected to read %s but read %s", sizeToRead, sizeRead));
}

return new String(bytesRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.redis.streams.sink;

import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.redis.streams.sink.command.RedisCommand;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class RedisStreamsWriter<T> implements SinkWriter<T> {
class RedisStreamsWriter<T> extends AsyncSinkWriter<T, RedisStreamsCommand> {

private final JedisConnector jedisConnector;
private final RedisSerializer<T> serializer;
private final Queue<RedisCommand> queue = new ArrayDeque<>();

public RedisStreamsWriter(JedisConnector jedisConnector, RedisSerializer<T> serializer) {
public RedisStreamsWriter(
JedisConnector jedisConnector,
ElementConverter<T, RedisStreamsCommand> elementConverter,
AsyncSinkWriterConfiguration asyncConfig,
Sink.InitContext initContext,
Collection<BufferedRequestState<RedisStreamsCommand>> recoveredState) {
super(elementConverter, initContext, asyncConfig, recoveredState);
this.jedisConnector = jedisConnector;
this.serializer = serializer;
}


@Override
public void write(T input, Context context) throws IOException, InterruptedException {
RedisCommand message = serializer.getMessage(input);
queue.add(message);
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
flush();
}

private void flush() {
while(!this.queue.isEmpty()) {
RedisCommand element = this.queue.remove();
element.send(this.jedisConnector);
}
protected void submitRequestEntries(
List<RedisStreamsCommand> requestEntries,
Consumer<List<RedisStreamsCommand>> requestResult) {
List<RedisStreamsCommand> errors =
requestEntries.stream()
.peek(command -> command.send(this.jedisConnector))
.filter(RedisStreamsCommand::sendIncorrectly)
.collect(Collectors.toList());

requestResult.accept(errors);
}

@Override
public void close() throws Exception {
jedisConnector.close();
protected long getSizeInBytes(RedisStreamsCommand command) {
return command.getMessageSize();
}
}
Loading

0 comments on commit fdc3d79

Please sign in to comment.