Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Limit to Cassandra Source connector #223

Closed
amoussoubaruch opened this issue Jul 10, 2017 · 6 comments
Closed

Add Limit to Cassandra Source connector #223

amoussoubaruch opened this issue Jul 10, 2017 · 6 comments

Comments

@amoussoubaruch
Copy link

Hello,

We have put in place your Cassandra Source connector (i.e. the one that writes from Cassandra to Kafka).
We are using Cassandra 3.10 with Kafka 0.10.x
Earlier, everything was working fine and we were having successful writes from Cassandra to Kafka topic.
However, this is blocking now and we cannot understand for what reason.

Can you please help us trace the source of our problem ?
Below is the stdout output from the connector.

thanks in advance for all your help and advice,
Baruch

`[root@cassandra ~]# tail -f /mnt/cassandra/cassandra-sink-connector.log
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:05:28,746] ERROR Failed to put SourceRecord{sourcePartition={connect.cassandra.assigned.tables=mytopic}, sourceOffset={=1900-01-01 00:00:00.000Z}} ConnectRecord{topic='mytopic', kafkaPartition=null, key=null, value=Struct{....} on to the queue for pop.mytopic. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:277)
[2017-07-10 13:06:27,023] ERROR Failed to flush WorkerSourceTask{id=cassandra-source-orders-0}, timed out while waiting for producer to flush outstanding 84515 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)

[root@cassandra confluent-3.2.1]# /opt/confluent-3.2.1/bin/connect-standalone /opt/confluent-3.2.1/etc/kafka-connect-cassandra/connect-avro-standalone.properties /opt/confluent-3.2.1/etc/kafka-connect-cassandra/cassandra-source.properties >> /mnt/cassandra/cassandra-sink-connector.log
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-azure-documentdb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-blockchain-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-bloomberg-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-cassandra-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-coap-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-druid-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-elastic-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-ftp-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-hazelcast-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-hbase-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-influxdb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-jms-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-kudu-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-mongodb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-mqtt-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-redis-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-rethink-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-voltdb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-yahoo-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-s3/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-storage-common/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Jul 10, 2017 1:09:51 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

Exception in thread "CLASSPATH traversal thread." java.lang.OutOfMemoryError: Java heap space
at java.io.BufferedInputStream.(BufferedInputStream.java:203)
at java.io.BufferedInputStream.(BufferedInputStream.java:183)
at org.reflections.adapters.JavassistAdapter.getOfCreateClassObject(JavassistAdapter.java:99)
at org.reflections.adapters.JavassistAdapter.getOfCreateClassObject(JavassistAdapter.java:24)
at org.reflections.scanners.AbstractScanner.scan(AbstractScanner.java:30)
at org.reflections.Reflections.scan(Reflections.java:250)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.(Reflections.java:129)
at org.apache.kafka.connect.runtime.PluginDiscovery.scanClasspathForPlugins(PluginDiscovery.java:69)
at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:390)
at java.lang.Thread.run(Thread.java:745)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Connection has been closed))
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:308)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:265)
at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:136)
at com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader$$anonfun$process$1.applyOrElse(CassandraTableReader.scala:216)
at com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader$$anonfun$process$1.applyOrElse(CassandraTableReader.scala:209)
at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Connection has been closed))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:218)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:284)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:406)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "org.eclipse.jetty.server.session.HashSessionManager@648c94daTimer"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "pool-1-thread-1"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "cluster1-nio-worker-0"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "cluster1-scheduled-task-worker-0"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "cluster1-connection-reaper-0"
Exception in thread "threadDeathWatcher-2-1" java.lang.OutOfMemoryError: Java heap space
Exception in thread "qtp1803669141-20" java.lang.OutOfMemoryError: Java heap space
Exception in thread "qtp1803669141-22" java.lang.OutOfMemoryError: Java heap space
^C^CKilled
[root@cassandra confluent-3.2.1]# /opt/confluent-3.2.1/bin/connect-standalone /opt/confluent-3.2.1/etc/kafka-connect-cassandra/connect-avro-standalone.properties /opt/confluent-3.2.1/etc/kafka-connect-cassandra/cassandra-source.properties > /mnt/cassandra/cassandra-sink-connector.log
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-azure-documentdb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-blockchain-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-bloomberg-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-cassandra-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-coap-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-druid-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-elastic-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-ftp-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-hazelcast-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-hbase-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-influxdb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-jms-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-kudu-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-mongodb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-mqtt-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-redis-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-rethink-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-voltdb-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/lib/kafka-connector/stream-reactor/libs/kafka-connect-yahoo-0.2.5-3.2.0-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-s3/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka-connect-storage-common/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/confluent-3.2.1/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Jul 10, 2017 1:39:48 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Connection has been closed))
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:308)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:265)
at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:136)
at com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader$$anonfun$process$1.applyOrElse(CassandraTableReader.scala:216)
at com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader$$anonfun$process$1.applyOrElse(CassandraTableReader.scala:209)
at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Connection has been closed))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:218)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:284)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:406)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
`

@andrewstevenson
Copy link
Collaborator

You are out of heap space. More than likely you are pulling too much data in one iteration.

java.lang.OutOfMemoryError: Java heap space
at java.io.BufferedInputStream.(BufferedInputStream.java:203)
at java.io.BufferedInputStream.(BufferedInputStream.java:183)
at org.reflections.adapters.JavassistAdapter.getOfCreateClassObject(JavassistAdapter.java:99)
at org.reflections.adapters.JavassistAdapter.getOfCreateClassObject(JavassistAdapter.java:24)
at org.reflections.scanners.AbstractScanner.scan(AbstractScanner.java:30)
at org.reflections.Reflections.scan(Reflections.java:250)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.(Reflections.java:129)

@andrewstevenson
Copy link
Collaborator

and cassandra seems to down.

WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Connection has been closed))

@amoussoubaruch
Copy link
Author

Hi Andrew,

thank you so much for your prompt reply.

How can i handle the heap space ?
Is it possible to configure it somewhere ? eg. in the .properties files ?

Or Is it possible to use some sort of backpressure to limit consuming messages so fast ?
What about recreating the topic with more Kafka partitions ?

We checked using nodetool and noticed that Cassandra was working well, no issues with that.

All your advice would be highly appreciated.

thanks in advance,
Baruch

@andrewstevenson
Copy link
Collaborator

You can increase the heap space before starting connect by setting the KAFKA_HEAP_OPTS environment variable.

The last logs indicate the cassandra driver can't connect to cassandra.

For the limiting rows returned by Cassandra, with the current release you can't. Adding a LIMIT statement to and paging is something we can look at, however the priority for Cassandra is the CDC source which is a better way to get events out. We can always change these priorities with a commercial agreement.

@amoussoubaruch
Copy link
Author

Ok! Thanks for your answer.
I am testing many possibility actually. If connector solution are retained with my boss, we thinks about commercial agreement.

Thanks

@andrewstevenson andrewstevenson changed the title Problem with Cassandra Source connector Add Limit to Cassandra Source connector Jul 14, 2017
@andrewstevenson
Copy link
Collaborator

@amoussoubaruch Paging is already implemented on the connector, it can be set by using the connect.cassandra.fetch.size option. Additionally we have made some changes to frequency the connector polls. You can try changing the connect.cassandra.import.poll.interval setting, this controls the frequency at which the internal buffer is drained, the default is 1000 milliseconds. A combination of reducing the fetch size and poll interval should help reduce the heap required but the poll interval reduction will hit the CPU.

FYI we have release, in Beta, our CDC Source for Cassandra which is a better option to get the mutations of the data in Cassandra. It is still in Beta and hasn't been tested thoroughly yet.

http://docs.datamountaineer.com/en/latest/cassandra-cdc.html

Re open is you still have issues, closing for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants