Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Cassandra Sink: Failed to commit transaction when the channel is full #152

Closed
thun8392 opened this issue Dec 28, 2015 · 5 comments
Closed

Comments

@thun8392
Copy link

If the memory or file channel are full, cassandra sink fails when it tries to commit transaction. Error logs:

2015-12-28 11:06:59 DEBUG CassandraSink:181 - Executing CassandraSink.process()...
2015-12-28 11:06:59 ERROR CassandraSink:231 - Failed to commit transaction. Transaction rolled back.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 40 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at com.stratio.ingestion.sink.cassandra.CassandraSink.process(CassandraSink.java:190)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
2015-12-28 11:06:59 ERROR SinkRunner:160 - Unable to deliver event. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 40 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at com.stratio.ingestion.sink.cassandra.CassandraSink.process(CassandraSink.java:190)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)

Flume configuration:

flume-conf.properties.txt

Complete flume-ng logs:

flume-ng-logs.txt

@aaitor
Copy link
Contributor

aaitor commented Jan 7, 2016

Hi @thun8392,
Can you try increasing the postal.channels.memch.capacity and transactionCapacity channel values?
postal.channels.memch.capacity = 1000
postal.channels.memch.transactionCapacity = 100

@thun8392
Copy link
Author

thun8392 commented Jan 7, 2016

Hi @aaitor,
Thank you for your response. I have changed the values to 1000 and 100 respectively. When i have tested it again, ingestion returns the next error:

2016-01-07 11:14:34 ERROR SpoolDirectorySource:256 - FATAL: Spool Directory source srcdir: { spoolDir: /var/log/postal/source/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Because source log file has non UTF-8 characters that it is solved with 'postal.sources.srcdir.inputCharset = ISO8859-1')

Then with the values to 1000 and 100, it does not appear the 'CassandraSink - Failed to commit transaction' error again.

Flume-ng logs (truncated in the line 9656 because it has more than 100000 lines):

flume-ng-logs.txt

Is it normal the warning line "WARN CassandraTable:74 - Event [Event headers = {}, body.length = 83 ] could not be mapped. Suggestion: Cassandra is case sensitive, so maybe you can check field names" for every log line that it does not match with the interceptor regular expression?

Regards.

@aaitor
Copy link
Contributor

aaitor commented Jan 7, 2016

Hi @thun8392, another recommendation in order to find any Cassandra sink configuration issue is to add a test logger sink:
postal.sinks.logsink.type = logger

When you run your Ingestion agent, if you can see in log files the trace with the data the problem can be in Cassandra sink configuration file. I would take a look at your /opt/sds/ingestion/conf/init_cassandra.cql . The transformed data should have the same structure in your cql and headers in order to map properly the data with cassandra table.

If you can't see the data using the logger sink, the problem can be in the interceptor. Maybe the interceptor is not applying the data transformation properly.

Hope it helps

@thun8392
Copy link
Author

thun8392 commented Jan 8, 2016

Hi @aaitor,
The problem of the warning line happened because regex_extractor does not realize 'drop event'. Adding 'regex_filter' interceptor with 'excludeEvents' solved it. Thanks.

About channel transactionCapacity and capacity, why does it not work with 100 and 40 ?

@aaitor
Copy link
Contributor

aaitor commented Jan 13, 2016

Hi @thun8392 ,
Seeing your first log messages looks that channel was full. But now that all is working, maybe you can play with the initial transactionCapacity and capacity values.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants