-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter topics by committer #13
Filter topics by committer #13
Conversation
Initial commit without tests @denisgrenader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly name changes, and one logical comment
@@ -44,6 +44,11 @@ class Castle private (val castleConfig: CastleConfig, val metricsLogger: Metrics | |||
} | |||
}).toMap | |||
|
|||
val filterMap: Map[String, Filter] = committerFactoryMap map { | |||
idCommitterFactory => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idCommitterFactoryPair => {
@@ -44,6 +44,11 @@ class Castle private (val castleConfig: CastleConfig, val metricsLogger: Metrics | |||
} | |||
}).toMap | |||
|
|||
val filterMap: Map[String, Filter] = committerFactoryMap map { | |||
idCommitterFactory => { | |||
idCommitterFactory._1 -> idCommitterFactory._2.createFilter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idCommitterFactoryPair._1 -> idCommitterFactoryPair._2.createTopicFilter()
@@ -44,6 +44,11 @@ class Castle private (val castleConfig: CastleConfig, val metricsLogger: Metrics | |||
} | |||
}).toMap | |||
|
|||
val filterMap: Map[String, Filter] = committerFactoryMap map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicFilterMap: Map[String, TopicFilter] = committerFactoryMap map {
@@ -84,7 +89,7 @@ class Castle private (val castleConfig: CastleConfig, val metricsLogger: Metrics | |||
|
|||
val workerActorFactory = new WorkerActorFactory(fetcherActorFactory, workerFactory, castleConfig.committerConfigs, metricsLogger) | |||
|
|||
val taskManager = TaskManager(castleConfig.leaderConfig, castleConfig.committerConfigs) | |||
val taskManager = TaskManager(castleConfig.leaderConfig, castleConfig.committerConfigs, filterMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, topicFilterMap)
def apply( | ||
leaderConfig: LeaderConfig, | ||
committerConfigs: Iterable[CommitterConfig], | ||
committerFilterMap: Map[String, Filter]) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicFilterMap: Map[String, TopicFilter]) =
class TaskManager( | ||
val leaderConfig: LeaderConfig, | ||
val committerConfigs: Iterable[CommitterConfig], | ||
committerFilterMap: Map[String, Filter]) extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicFilterMap: Map[String, TopicFilter]) extends Logging {
private def matchCommitterFilter(kafkaTopic: KafkaTopic, committerId: String): Boolean = { | ||
val filter = committerFilterMap(committerId) | ||
filter.containsTopic(kafkaTopic.name) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val topicFilter = topicFilterMap(committerId)
topicFilter.match(kafkaTopic.name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match is a keyword in scala and wont let me use it
committerIds + committerId | ||
else | ||
committerIds | ||
} | ||
|
||
private def matchCommitterFilter(kafkaTopic: KafkaTopic, committerId: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def matchCommitterTopicFilter
} | ||
} | ||
else { | ||
committerIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regex match is more stable and well known than whatever the committer filter does, let's invert their checking:
private def matchTopicsRegex(topicsRegex: Regex, kafkaTopic: KafkaTopic, committerId: String, committerIds: Set[String]): Set[String] = {
kafkaTopic.name match {
case topicsRegex(_*) => if (matchCommitterTopicFilter(kafkaTopic, committerId)) committerIds + committerId else committerIds
case _ => committerIds
}
}
Added comments in the commit. Merging this for now and open to more comments
taskManager.matchTopicsSet(Set("performance", "login"),performanceTopic,"falseCommitter", Set("kafkaCommitter")) shouldEqual Set("kafkaCommitter") | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add one more test
where the there is a "split" acceptance committer
on some topics
where it accepts some but not others
i.e. if the topics to subscribe are Foo, Bar, Baz
the topic regex is “.*”
so all match
but then the committer returns “true” for Foo,
but “false” for Bar, Baz
right now this case does not cover the split it doesn’t cover the split case
the same thing goes for TopicSet
def apply( | ||
leaderConfig: LeaderConfig, | ||
committerConfigs: Iterable[CommitterConfig], | ||
topicFilterMap: Map[String, TopicFilter]) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this always be required? If not, would it make sense to provide a default parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every commiterFactory has a TopicFilter associated with it. It should be mandatory
} | ||
protected[leader] def matchTopicsRegex(topicsRegex: Regex, kafkaTopic: KafkaTopic, committerId: String, committerIds: Set[String]) = { | ||
kafkaTopic.name match { | ||
case topicsRegex(_*) => if(matchCommitterTopicFilter(kafkaTopic,committerId)) committerIds + committerId else committerIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Space between if & open paren ("(")
- Since we have 2 conditions, would it make sense to change this to a simple if-statement, rather than using case and then a nested If? (See matchTopicsSet(...) -- It seems the logic is quite similar, except the topicsSet(...) versus topicsRegex(...))
case topicsRegex(_*) => committerIds + committerId | ||
case _ => committerIds | ||
} | ||
protected[leader] def matchTopicsRegex(topicsRegex: Regex, kafkaTopic: KafkaTopic, committerId: String, committerIds: Set[String]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Need a function signature
- What's the difference between committerId & committerIds?
committerIds + committerId | ||
else | ||
committerIds | ||
} | ||
|
||
protected[leader] def matchCommitterTopicFilter(kafkaTopic: KafkaTopic, committerId: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It might be good to make function behavior consistent -- matchTopicsSet(...) returns a Set[String], while matchCommitterTopicFilter returns a Boolean.
- Function signature 😄
import com.box.castle.committer.api.TopicFilter | ||
|
||
/** | ||
* Created by bravishanker on 7/10/17. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update class description
} | ||
|
||
private def matchTopicsSet(topicsSet: Set[String], kafkaTopic: KafkaTopic, committerId: String, committerIds: Set[String]) = { | ||
if (topicsSet.contains(kafkaTopic.name)) | ||
protected[leader] def matchTopicsSet(topicsSet: Set[String], kafkaTopic: KafkaTopic, committerId: String, committerIds: Set[String]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add function description.
} | ||
|
||
|
||
|
||
class TaskManager(val leaderConfig: LeaderConfig, val committerConfigs: Iterable[CommitterConfig]) extends Logging { | ||
class TaskManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need class description
No description provided.