Skip to content

Commit

Permalink
[FLINK-15571] Add RedisSink
Browse files Browse the repository at this point in the history
  • Loading branch information
eskabetxe committed Jul 29, 2022
1 parent 06f6fca commit ef42ced
Show file tree
Hide file tree
Showing 10 changed files with 597 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.redis.sink2;

import java.io.Serializable;

public class RedisAction implements Serializable {
public final RedisCommand command;
public final String key;
public final String value;

private RedisAction(RedisCommand command, String key, String value) {
this.command = command;
this.key = key;
this.value = value;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private RedisCommand command;
private String key;
private String value;

public Builder withCommand(RedisCommand command) {
this.command = command;
return this;
}

public Builder withKey(String key) {
this.key = key;
return this;
}

public Builder withValue(String value) {
this.value = value;
return this;
}

public RedisAction build() {
return new RedisAction(command, key, value);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.redis.sink2;

public enum RedisCommand {

SET
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.redis.sink2;

import org.apache.flink.api.common.functions.Function;

import java.io.Serializable;
import java.util.Optional;

/**
* Function that creates the description how the input data should be mapped to redis type.
*
* @param <T> The type of the element handled by this {@code RedisSerializer}
*/
public interface RedisSerializer<T> extends Function, Serializable {

/**
* Returns a redis command.
*
* @return RedisCommand
*/
RedisCommand getCommand(T data);

/**
* Extracts key from data.
*
* @param data source data
* @return key
*/
String getKeyFromData(T data);

/**
* Extracts value from data.
*
* @param data source data
* @return value
*/
String getValueFromData(T data);

/**
* Extracts the additional key from data as an {@link Optional<String>}.
* The default implementation returns an empty Optional.
*
* @param data
* @return Optional
*/
default Optional<String> getAdditionalKey(T data) {
return Optional.empty();
}

/**
* Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}.
* The default implementation returns an empty Optional.
*
* @param data
* @return Optional
*/
default Optional<Integer> getAdditionalTTL(T data) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.redis.sink2;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.redis.shared.JedisConnectorBuilder;
import org.apache.flink.connector.redis.shared.config.JedisConfig;
import org.apache.flink.connector.redis.sink2.async.RedisAsyncWriter;
import org.apache.flink.connector.redis.sink2.async.RedisConverter;

import java.io.IOException;

public class RedisSink<T> implements Sink<T> {

private final JedisConfig jedisConfig;
private final RedisSerializer<T> serializer;

private final RedisSinkConfig sinkConfig;

public RedisSink(JedisConfig jedisConfig, RedisSinkConfig sinkConfig, RedisSerializer<T> serializer) {
this.jedisConfig = jedisConfig;
this.sinkConfig = sinkConfig;
this.serializer = serializer;
}

@Override
public SinkWriter<T> createWriter(InitContext initContext) throws IOException {
return new RedisAsyncWriter<>(
JedisConnectorBuilder.build(jedisConfig),
new RedisConverter<>(serializer),
sinkConfig,
initContext
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.redis.sink2;

import java.io.Serializable;

public class RedisSinkConfig implements Serializable {

public final int maxBatchSize;
public final int maxInFlightRequests;
public final int maxBufferedRequests;
public final long maxBatchSizeInBytes;
public final long maxTimeInBufferMS;
public final long maxRecordSizeInBytes;

public RedisSinkConfig(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxTimeInBufferMS = maxTimeInBufferMS;
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private int maxBatchSize = 10;
private int maxInFlightRequests = 1;
private int maxBufferedRequests = 100;
private long maxBatchSizeInBytes = 110;
private long maxTimeInBufferMS = 1_000;
private long maxRecordSizeInBytes = maxBatchSizeInBytes;

public Builder withMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

public Builder withMaxInFlightRequests(int maxInFlightRequests) {
this.maxInFlightRequests = maxInFlightRequests;
return this;
}

public Builder withMaxBufferedRequests(int maxBufferedRequests) {
this.maxBufferedRequests = maxBufferedRequests;
return this;
}

public Builder withMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
return this;
}

public Builder withMaxTimeInBufferMS(long maxTimeInBufferMS) {
this.maxTimeInBufferMS = maxTimeInBufferMS;
return this;
}

public Builder withMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
return this;
}

public RedisSinkConfig build() {
return new RedisSinkConfig(
maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.redis.sink2.async;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.redis.shared.JedisConnector;
import org.apache.flink.connector.redis.sink2.RedisAction;
import org.apache.flink.connector.redis.sink2.RedisSinkConfig;
import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.commands.JedisCommands;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static redis.clients.jedis.Protocol.Command.SET;

public class RedisAsyncWriter<T> extends AsyncSinkWriter<T, RedisAction> {

private final JedisConnector jedisConnector;

public RedisAsyncWriter(JedisConnector jedisConnector,
RedisConverter<T> converter,
RedisSinkConfig sinkConfig,
Sink.InitContext context) {
super(converter, context,
sinkConfig.maxBatchSize, sinkConfig.maxInFlightRequests, sinkConfig.maxBufferedRequests,
sinkConfig.maxBatchSizeInBytes, sinkConfig.maxTimeInBufferMS, sinkConfig.maxRecordSizeInBytes);
this.jedisConnector = jedisConnector;
}

@Override
protected void submitRequestEntries(List<RedisAction> actions, Consumer<List<RedisAction>> toRetry) {
List<RedisAction> errors = new ArrayList<>();
for(RedisAction action : actions) {
try {
this.jedisConnector.execute(action);
} catch (Exception e) {
errors.add(action);
}
}
toRetry.accept(errors);
}

@Override
protected long getSizeInBytes(RedisAction redisAction) {
return redisAction.value.length();
}

public void close() {
jedisConnector.close();
}
}
Loading

0 comments on commit ef42ced

Please sign in to comment.