Skip to content

Commit

Permalink
[BEAM-4610] Add SSL support for RedisIO
Browse files Browse the repository at this point in the history
  • Loading branch information
EdgarLGB committed Mar 28, 2019
1 parent cea9c32 commit a36af54
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class RedisConnectionConfiguration implements Serializable {

abstract int timeout();

abstract boolean ssl();

abstract Builder builder();

@AutoValue.Builder
Expand All @@ -54,6 +56,8 @@ abstract static class Builder {

abstract Builder setTimeout(int timeout);

abstract Builder setSsl(boolean ssl);

abstract RedisConnectionConfiguration build();
}

Expand All @@ -62,6 +66,7 @@ public static RedisConnectionConfiguration create() {
.setHost(Protocol.DEFAULT_HOST)
.setPort(Protocol.DEFAULT_PORT)
.setTimeout(Protocol.DEFAULT_TIMEOUT)
.setSsl(false)
.build();
}

Expand All @@ -70,6 +75,7 @@ public static RedisConnectionConfiguration create(String host, int port) {
.setHost(host)
.setPort(port)
.setTimeout(Protocol.DEFAULT_TIMEOUT)
.setSsl(false)
.build();
}

Expand Down Expand Up @@ -99,9 +105,18 @@ public RedisConnectionConfiguration withTimeout(int timeout) {
return builder().setTimeout(timeout).build();
}

/**
* Enable SSL connection to Redis server.
*
* @return a {@link RedisConnectionConfiguration} object
*/
public RedisConnectionConfiguration enableSSL() {
return builder().setSsl(true).build();
}

/** Connect to the Redis instance. */
public Jedis connect() {
Jedis jedis = new Jedis(host(), port(), timeout());
Jedis jedis = new Jedis(host(), port(), timeout(), ssl());
if (auth() != null) {
jedis.auth(auth());
}
Expand All @@ -113,5 +128,6 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("host", host()));
builder.add(DisplayData.item("port", port()));
builder.addIfNotNull(DisplayData.item("timeout", timeout()));
builder.add(DisplayData.item("ssl", ssl()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@
*
* }</pre>
*
* <p>It's also possible to enable Redis SSL connection with the corresponding methods:
*
* <pre>{@code
* pipeline.apply(RedisIO.read()
* .withEndpoint("::1", 6379)
* .withKeyPattern("foo*"))
* .enableSSL()
*
* }</pre>
*
* <p>{@link #readAll()} can be used to request Redis server using input PCollection elements as key
* pattern (as String).
*
Expand Down Expand Up @@ -199,6 +209,10 @@ public Read withBatchSize(int batchSize) {
return builder().setBatchSize(batchSize).build();
}

public Read enableSSL() {
return builder().setConnectionConfiguration(connectionConfiguration().enableSSL()).build();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ public void testReadBuildsCorrectly() {
Assert.assertEquals(111, read.connectionConfiguration().port());
Assert.assertEquals("pass", read.connectionConfiguration().auth());
Assert.assertEquals(5, read.connectionConfiguration().timeout());
Assert.assertEquals(false, read.connectionConfiguration().ssl());
}

@Test
public void testReadBuildsWithSslCorrectly() {
RedisIO.Read read =
RedisIO.read().withEndpoint("test", 111).withAuth("pass").withTimeout(5).enableSSL();
Assert.assertEquals("test", read.connectionConfiguration().host());
Assert.assertEquals(111, read.connectionConfiguration().port());
Assert.assertEquals("pass", read.connectionConfiguration().auth());
Assert.assertEquals(5, read.connectionConfiguration().timeout());
Assert.assertEquals(true, read.connectionConfiguration().ssl());
}

@Test
Expand All @@ -258,6 +270,7 @@ public void testWriteBuildsCorrectly() {
Assert.assertEquals(111, write.connectionConfiguration().port());
Assert.assertEquals("pass", write.connectionConfiguration().auth());
Assert.assertEquals(5, write.connectionConfiguration().timeout());
Assert.assertEquals(false, write.connectionConfiguration().ssl());
Assert.assertEquals(Method.APPEND, write.method());
}

Expand Down

0 comments on commit a36af54

Please sign in to comment.