Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4175: Can't have StandbyTasks in KafkaStreams where NUM_STREAM_THREADS_CONFIG > 1 #1862

Closed
wants to merge 2 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Sep 15, 2016

standby tasks should be assigned per consumer not per process

@dguy
Copy link
Contributor Author

dguy commented Sep 15, 2016

@guozhangwang @enothereska @mjsax @hjafarpour - this fixes the lock error worked around in: #1861
Also re-instantes NUM_THREADS_CONFIG to 3 in the SmokeTestClient

@guozhangwang
Copy link
Contributor

Thanks @dguy the patch looks good to me.

But one question I have just thought about is, with num.threads = 1 and num.standby >0, the TaskAssignor will assign both the active and the standby of the same task to the same thread as well, which can also trigger locking issues, right? In that case why changing num.threads from 2 to 1 can temporarily fix the system test?

@dguy
Copy link
Contributor Author

dguy commented Sep 16, 2016

@guozhangwang - The TaskAssignor doesn't assign the standby tasks to the same process. So if num.threads = 0 and num.standby > 0 you only get the active task assigned. Which seems correct to me as there isn't anywhere for the standby to run.
The change of threads to 1 fixed the system test as all the threads in the same KafkaStreams instance were being assigned the same set of standby tasks, hence they were competing for the same locks.

@enothereska
Copy link
Contributor

LGTM

@guozhangwang
Copy link
Contributor

@dguy In that case, won't a TaskAssignmentException("failed to find an assignable client"); gets thrown since the state == null.

@dguy
Copy link
Contributor Author

dguy commented Sep 19, 2016

@guozhangwang In TaskAssignor

public void assignStandbyTasks(int numStandbyReplicas) {
        int numReplicas = Math.min(numStandbyReplicas, states.size() - 1);
        for (int i = 0; i < numReplicas; i++) {
            assignTasks(false);
        }
    }

So, that exception will not be thrown for the standby tasks.

@asfgit asfgit closed this in 70afd5f Sep 19, 2016
@guozhangwang
Copy link
Contributor

You are right. Thanks for the patience!

Merged to trunk.

@dguy dguy deleted the kafka-4175 branch October 4, 2016 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants