Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of topics with periods #70

Merged
merged 1 commit into from
Jun 6, 2016
Merged

Fix handling of topics with periods #70

merged 1 commit into from
Jun 6, 2016

Conversation

jingw
Copy link
Contributor

@jingw jingw commented Jun 2, 2016

Currently the filters / offset extraction rely on splitting on dot. If there's a topic called namespace.topic, it'll extract namespace and compare that with the topic name. This breaks recovery, where it looks for the max offset.

Also fixed typo in HdfsSinkConnecorConstants and deleted unused CommittedFileWithEndOffsetFilter.

Closes #45

@ConfluentJenkins
Copy link
Contributor

Can one of the admins verify this patch?

@ghost
Copy link

ghost commented Jun 2, 2016

Hey @jingw,
thank you for your Pull Request.

It looks like you haven't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@jingw
Copy link
Contributor Author

jingw commented Jun 2, 2016

Our legal folks filled out the CLA. Please let me know if I still need to do something for that.

if (!m.matches()) {
throw new IllegalArgumentException(filename + " does not match COMMITTED_FILENAME_PATTERN");
}
return Long.parseLong(m.group(4));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe change this to a named constant in HdfsSinkConnectorConstants since it could change if, e.g. any grouping was adjusted in COMMITTED_FILENAME_PATTERN?

@ewencp
Copy link
Contributor

ewencp commented Jun 4, 2016

@jingw Looks great. I left a couple of comments about cleanup and duplicated code in a test, but should all be trivial to cleanup and get committed.

@ewencp
Copy link
Contributor

ewencp commented Jun 4, 2016

test this please

Currently the filters / offset extraction rely on splitting on dot. If there's a topic called `namespace.topic`, it'll extract `namespace` and compare that with the topic name. This breaks recovery, where it looks for the max offset.

Also fixed typo in HdfsSinkConnecorConstants and deleted unused CommittedFileWithEndOffsetFilter.
@jingw
Copy link
Contributor Author

jingw commented Jun 6, 2016

I made the suggested cleanups. Let me know if you need anything else. Thanks!

@ewencp
Copy link
Contributor

ewencp commented Jun 6, 2016

ok to test

@ewencp
Copy link
Contributor

ewencp commented Jun 6, 2016

LGTM, thanks for the contribution!

@ewencp ewencp merged commit 6096733 into confluentinc:master Jun 6, 2016
@jingw jingw deleted the fix branch June 6, 2016 18:56
@jingw jingw mentioned this pull request Jul 22, 2016
@blbradley
Copy link

Did this make it into 3.0.1? I have a topic like okcoin.streaming.btcusd.trades and I get this traceback upon recovery:

[2016-09-09 14:36:25,225] ERROR Task cryptocoin-hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NumberFormatException: For input string: "streaming"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Integer.parseInt(Integer.java:580)
        at java.lang.Integer.parseInt(Integer.java:615)
        at io.confluent.connect.hdfs.filter.TopicPartitionCommittedFileFilter.accept(TopicPartitionCommittedFileFilter.java:37)
        at io.confluent.connect.hdfs.FileUtils.fileStatusWithMaxOffset(FileUtils.java:141)
        at io.confluent.connect.hdfs.FileUtils.fileStatusWithMaxOffset(FileUtils.java:130)
        at io.confluent.connect.hdfs.TopicPartitionWriter.readOffset(TopicPartitionWriter.java:390)
        at io.confluent.connect.hdfs.TopicPartitionWriter.resetOffsets(TopicPartitionWriter.java:453)
        at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:203)
        at io.confluent.connect.hdfs.DataWriter.recover(DataWriter.java:239)
        at io.confluent.connect.hdfs.DataWriter.open(DataWriter.java:281)
        at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:428)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1000(WorkerSinkTask.java:54)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:464)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:234)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:255)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:250)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:459)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:445)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:702)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:266)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:975)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

@blbradley
Copy link

It did not. See you at the next release!

@blbradley
Copy link

I put an image (from automated build) on Docker Hub from the 3.0.1 images that has a custom build with this patch applied in case anyone needs this fix.

@Radeep
Copy link

Radeep commented Dec 7, 2018

Is it an option to mention custom DB name for Hive instead of using the Kafka topic name.. If this is already covered, could you briefly describe the procedure ..

serssp pushed a commit to serssp/kafka-connect-hdfs that referenced this pull request Dec 29, 2018
…failure

MINOR: Fix compilation error due to new method in SinkTaskContext
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Topics with periods should be supported.
5 participants