Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3311/FLINK-3332] Add Cassandra connector #1771

Closed
wants to merge 23 commits into from

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Mar 7, 2016

This PR is a combination of #1640 and #1660, essentially providing a mid-merge view of both PR's. As they reside in the same module and expose similar functionalities, several things were changed in both commits to make the code more similar.

Such changes include formatting, field ordering, code in-lining, package declarations, file names, test unification, and the addition of a central method to create sinks (CassandraSink.addSink(DataStream input)) to reduce complexity for the user resulting from different sink implementations.

@alkagin
Copy link

alkagin commented Mar 9, 2016

Great work 👍

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the guava.version variable here to use the same guava version across Flink

@alkagin
Copy link

alkagin commented Mar 21, 2016

Hi,
I am testing the Cassandra connector and I may have found a bug using CassandraSink-DSL. During the execution it throws java.io.NotSerializableException: org.apache.flink.streaming.connectors.cassandra.CassandraSink$CassandraTupleSinkBuilder CassandraSinkBuilder doesn't implement java.io.Serializable. I forked and added java.io.serializable to CassandraSinkBuilder, after that it throws java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.
Not using the DSL it works as expected.
Job source code: https://gist.github.com/alkagin/6620ebdd10ef6bb47eee

Hope it helps.

@zentol
Copy link
Contributor Author

zentol commented Mar 21, 2016

@alkagin Thank you for reporting this, will look into it. I think i may already know how to fix it...

}

public enum ConsistencyLevel {
At_LEAST_ONCE,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: AT_LEAST_ONCE

*
* @param <IN> Type of the elements emitted by this sink
*/
public abstract class GenericAtLeastOnceSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenericExactlyOnceSink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've struggled a bit with the name for this class, but came to the conclusion that since class itself does not provide exactly-once guarantees for all use-cases. As such the GenericAtLeastOnceSink name is intended.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I misunderstood CassandraIdempotentExactlyOnceSink's inheritance.

@zentol
Copy link
Contributor Author

zentol commented Mar 21, 2016

Just pushed the following changes:

  • Fixed AT_LEAST_ONCE type
  • CassandraInputFormat.close properly propagates exceptions in close()
  • cassandra pom now uses guava.version attribute
  • fixed NotSerializableException For CassandraAtLeastOnceTuple-/PojoSink

try {
session.close();
} catch (Exception e) {
LOG.info("Inputformat couldn't be closed - " + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a WARN and forward the cause to the log entry

* @return this builder
*/
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
if (builder != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.builderinstead of builder, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. i distinctly recall having fixed this...

@alkagin
Copy link

alkagin commented May 5, 2016

Hi guys :) any update on this?

@rmetzger
Copy link
Contributor

rmetzger commented May 5, 2016

I'll test the PR again

@rmetzger
Copy link
Contributor

rmetzger commented May 5, 2016

I just tried the PR, but the recovery after a failure doesn't seem to work:

java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier
    at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to fetch state handle size
    at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:234)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:511)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
    ... 8 more
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://nameservice1/user/robert/cassandra-fs/e70d0b78b7875877f42a8ebfba463f14/chk-0/9f892bc0-b5e2-484f-a981-6e666e7ad897
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
    at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
    at org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
    at org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:428)
    at org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink$ExactlyOnceState.getStateSize(GenericAtLeastOnceSink.java:190)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:81)
    at org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:231)
    ... 10 more

* This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were
* used since the last completed checkpoint.
**/
public class ExactlyOnceState implements StateHandle<Serializable> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serial version uid missing

@zentol
Copy link
Contributor Author

zentol commented May 23, 2016

I finally was able to fix the restart issue. there were 2 massive bugs in the CassandraCommitter:

  • within open() the checkpoint entry was always overridden
  • within close() the checkpoint entry was always deleted

in addition i have made the following changes:

  • renamed GenericAtLeastOnceSink to GenericWriteAheadSink
  • implemented a caching of the last committed checkpointID in the CassandraCommitter

It's rather obvious that more tests are required.

-revert final flag removal on ResultPartitionWriter
-fixed type in CassandraCommitter
-cached cpID is properly reset on close()
-added test for CassandraCommitter
-modified StreamTaskTestHarness to properly notice exceptions in the
task thread
-modified writeAheadSink test to close/open operator upon "failure"
@rmetzger
Copy link
Contributor

rmetzger commented Jun 3, 2016

I've reviewed the connector again.
The issues I've seen previously (failure on restart) are resolved.
However, I found new issues:

  • The Cassandra Sink doesn't fail (at least not within 15 minutes) if Cassandra is not available anymore. Its probably just a configuration setting of the cassandra driver to fail after a certain amount of time.
  • We should probably introduce a (configurable) limit (nr. records / some gb's) for the write ahead log. It seemed to me, that due to the failed other instance, no checkpoints were able to complete anymore (because some of the cassandra sinks were stuck in the notifyCheckpointComplete()), while other's were accepting data into the WAL. This lead to a lot of data being written into the statebackend. I think the cassandra sink should stop at some point in such a situation.

Also, I would like to test the exactly once behavior on a cluster more thoroughly. Currently, I've only tested whether the connector is properly failing and restoring, but I didn't test if the written data is actually correct.

However, since the code seems to be working under normal operation, I would suggest to merge the connector now, and then file follow up JIRAs for the remaining issues.
This makes collaboration and reviews easier and allows our users to help testing the cassandra connector.

Some log:

2016-06-03 12:28:36,478 ERROR org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink  - Error while sending value.
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
    at com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128)
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:114)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
    at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
    at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
    at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
    at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
    ... 11 more
2016-06-03 12:28:57,473 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
2016-06-03 12:28:57,487 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
2016-06-03 12:29:02,939 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
2016-06-03 12:29:02,970 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
2016-06-03 12:29:12,945 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
2016-06-03 12:29:12,974 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
2016-06-03 12:29:17,947 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 8000 milliseconds
2016-06-03 12:29:17,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 8000 milliseconds
2016-06-03 12:29:28,481 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 16000 milliseconds
2016-06-03 12:29:28,974 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 16000 milliseconds
2016-06-03 12:29:44,482 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 32000 milliseconds
2016-06-03 12:29:44,975 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 32000 milliseconds
2016-06-03 12:30:16,482 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 64000 milliseconds
2016-06-03 12:30:16,975 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 64000 milliseconds
2016-06-03 12:31:20,483 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 128000 milliseconds
2016-06-03 12:31:20,976 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 128000 milliseconds
2016-06-03 12:33:28,484 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 256000 milliseconds
2016-06-03 12:33:28,976 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 256000 milliseconds
2016-06-03 12:37:44,484 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 512000 milliseconds
2016-06-03 12:37:44,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 512000 milliseconds
2016-06-03 12:46:16,485 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 600000 milliseconds
2016-06-03 12:46:16,977 ERROR com.datastax.driver.core.ControlConnection                    - [Control connection] Cannot connect to any host, scheduling retry in 600000 milliseconds
2016-06-03 12:46:54,906 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: Custom Source (1/2)
2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/2) switched to CANCELING
2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source (1/2) (dec8d24e486ca9937739b7c6e07fbb05).
2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Cassandra Sink (1/2)
2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Cassandra Sink (1/2) switched to CANCELING
2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Cassandra Sink (1/2) (96511ef6293a893b0ef35dd211aea2b9).
2016-06-03 12:46:55,389 INFO  com.dataartisans.Job                                          - Received cancel in EventGenerator
2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/2) switched to CANCELED
2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/2)
2016-06-03 12:46:55,394 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (dec8d24e486ca9937739b7c6e07fbb05)
2016-06-03 12:47:24,911 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

2016-06-03 12:47:54,912 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

2016-06-03 12:48:24,913 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

2016-06-03 12:48:54,915 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

2016-06-03 12:49:24,916 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

2016-06-03 12:49:54,918 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

2016-06-03 12:50:24,919 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'Cassandra Sink (1/2)' did not react to cancelling signal, but is stuck in method:
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)

@theomega
Copy link
Contributor

I tried out this branch and it works like it should in the scenario I set up:
I wrote (so only using the Sink) to a quiet complex columnfamily in a 8 node cassandra cluster. I was using a complex setup of windowed streams and all the data appeared and was perfectly readable as expected. I also had multiple sinks at the same time which also worked perfectly. I could not test the scenario @rmetzger is mentioning.

Overall, I agree with @rmetzger that it should be considered to merge this to get more users to test it and report issues.

@rmetzger
Copy link
Contributor

Thank you for testing the cassandra connector.

I'll merge the pull request now.

rmetzger pushed a commit to rmetzger/flink that referenced this pull request Jun 13, 2016
@rmetzger
Copy link
Contributor

The merging will probably be a bit delayed because there are some memory issues:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
06/13/2016 11:12:22 Job execution switched to status RUNNING.
06/13/2016 11:12:22 Source: Collection Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
06/13/2016 11:12:22 Source: Collection Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
06/13/2016 11:12:23 Source: Collection Source -> Sink: Unnamed(1/1) switched to RUNNING 
06/13/2016 11:12:27 Source: Collection Source -> Sink: Unnamed(1/1) switched to FINISHED 
06/13/2016 11:12:27 Job execution switched to status FINISHED.
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:2"
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 150.02 sec - in org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:5"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:4"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "HintedHandoffManager:1"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "CompactionExecutor:7"
==============================================================================
Maven produced no output for 300 seconds.
==============================================================================
==============================================================================
The following Java processes are running (JPS)
==============================================================================
2956 Launcher
98454 Jps
97148 surefirebooter7601064672285446339.jar
==============================================================================
Printing stack trace of Java process 2956
==============================================================================
2016-06-13 11:28:23
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):

I will take care of them!

rmetzger pushed a commit to rmetzger/flink that referenced this pull request Jun 16, 2016
@asfgit asfgit closed this in b63e19b Jun 16, 2016
@f1yegor
Copy link

f1yegor commented Jun 20, 2016

Looking at the code of CassandraSinkBase I assume that every new sink will create own session to the cluster.
I'm creating arount 1000 streams and sinks for them, so cassandra cluster would be saturated with openning connections.
Cassandra session could manage several connections via one session.

Could you confirm that this isn't an issue with the implementation?

@zentol
Copy link
Contributor Author

zentol commented Jun 20, 2016

You are correct that every CassandraSink instance opens a separate connection to the cluster. I don't see how we could avoid this without using Singletons, which should be avoided.

@f1yegor
Copy link

f1yegor commented Jun 20, 2016

Could somebody point to a documentation how we could manage state (pool of connections) when Sink works inside single JVM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants