Skip to content

Commit

Permalink
[FLINK-32014][doc][hotfix] Add Cassandra source documentation to webs…
Browse files Browse the repository at this point in the history
…ite (#15)
  • Loading branch information
echauchot committed May 11, 2023
1 parent 6088c45 commit 26112ba
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
33 changes: 33 additions & 0 deletions docs/content.zh/docs/connectors/datastream/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,39 @@ There are multiple ways to bring up a Cassandra instance on local machine:
1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)

## Cassandra Source
Flink provides a [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
bounded source to read from Cassandra and return a collection of entities as `DataStream<Entity>`.
An entity is built by Cassandra mapper ([MappingManager](https://javadoc.io/static/com.datastax.cassandra/cassandra-driver-mapping/3.11.2/com/datastax/driver/mapping/MappingManager.html))
based on a POJO containing annotations (as described in Cassandra object mapper).

To use the source, do the following:

```java
ClusterBuilder clusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT))
.withQueryOptions(new QueryOptions().setConsistencyLevel(CL))
.withSocketOptions(new SocketOptions()
.setConnectTimeoutMillis(CONNECT_TIMEOUT)
.setReadTimeoutMillis(READ_TIMEOUT))
.build();
}
};
long maxSplitMemorySize = ... //optional max split size in bytes minimum is 10MB. If not set, maxSplitMemorySize = 64 MB
Source cassandraSource = new CassandraSource(clusterBuilder,
maxSplitMemorySize,
Pojo.class,
"select ... from KEYSPACE.TABLE ...;",
() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)});
DataStream<Pojo> stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), "CassandraSource");
```
Regarding performances, the source splits table data like this:
`numSplits = tableSize/maxSplitMemorySize`.

If tableSize cannot be determined or previous numSplits computation makes too few splits, it falls back to `numSplits = parallelism`

## Cassandra Sinks

### Configurations
Expand Down
33 changes: 33 additions & 0 deletions docs/content/docs/connectors/datastream/cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,39 @@ There are multiple ways to bring up a Cassandra instance on local machine:
1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)

## Cassandra Source
Flink provides a [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
bounded source to read from Cassandra and return a collection of entities as `DataStream<Entity>`.
An entity is built by Cassandra mapper ([MappingManager](https://javadoc.io/static/com.datastax.cassandra/cassandra-driver-mapping/3.11.2/com/datastax/driver/mapping/MappingManager.html))
based on a POJO containing annotations (as described in Cassandra object mapper).

To use the source, do the following:

```java
ClusterBuilder clusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT))
.withQueryOptions(new QueryOptions().setConsistencyLevel(CL))
.withSocketOptions(new SocketOptions()
.setConnectTimeoutMillis(CONNECT_TIMEOUT)
.setReadTimeoutMillis(READ_TIMEOUT))
.build();
}
};
long maxSplitMemorySize = ... //optional max split size in bytes minimum is 10MB. If not set, maxSplitMemorySize = 64 MB
Source cassandraSource = new CassandraSource(clusterBuilder,
maxSplitMemorySize,
Pojo.class,
"select ... from KEYSPACE.TABLE ...;",
() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)});
DataStream<Pojo> stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), "CassandraSource");
```
Regarding performances, the source splits table data like this:
`numSplits = tableSize/maxSplitMemorySize`.

If tableSize cannot be determined or previous numSplits computation makes too few splits, it falls back to `numSplits = parallelism`

## Cassandra Sinks

### Configurations
Expand Down

0 comments on commit 26112ba

Please sign in to comment.