From 223d052a6901289bcc6d9f1147568eca841b967d Mon Sep 17 00:00:00 2001 From: atharvai Date: Tue, 24 Jan 2017 17:16:56 +0000 Subject: [PATCH] [BAHIR-85] move getCommandDescription to invoke method --- .../apache/flink/streaming/connectors/redis/RedisSink.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java index 688f94ae..5d00aa93 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -111,9 +111,6 @@ public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper redi this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisSinkMapper = redisSinkMapper; - RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); - this.redisCommand = redisCommandDescription.getCommand(); - this.additionalKey = redisCommandDescription.getAdditionalKey(); } /** @@ -126,6 +123,10 @@ public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper redi */ @Override public void invoke(IN input) throws Exception { + RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); + this.redisCommand = redisCommandDescription.getCommand(); + this.additionalKey = redisCommandDescription.getAdditionalKey(); + String key = redisSinkMapper.getKeyFromData(input); String value = redisSinkMapper.getValueFromData(input);