Skip to content

Commit

Permalink
KSTREAMS-4026: Fix type parameter bounds of ProcessorContext in Scala
Browse files Browse the repository at this point in the history
The AK PR apache/kafka#8414 introduced type parameters
for the ProcessorContext. Since Scala requires generic arguments for any
type that takes generic parameters, the Scala code in the examples that uses
the ProcessorContext did not compile anymore.
  • Loading branch information
cadonna committed Apr 23, 2020
1 parent 5dcc34d commit ba5a118
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
Expand Up @@ -185,7 +185,7 @@ class CMSStore[T: CMSHasher](override val name: String,
/**
* Initializes this store, including restoring the store's state from its changelog.
*/
override def init(context: ProcessorContext, root: StateStore) {
override def init(context: ProcessorContext[_, _], root: StateStore) {
val serdes = new StateSerdes[Integer, TopCMS[T]](
name,
Serdes.Integer(),
Expand Down
Expand Up @@ -29,14 +29,14 @@ import org.apache.kafka.streams.state.StateSerdes
* with the [[org.apache.kafka.common.utils.Bytes]] class.
*/
class CMSStoreChangeLogger[K, V](val storeName: String,
val context: ProcessorContext,
val context: ProcessorContext[_, _],
val partition: Int,
val serialization: StateSerdes[K, V]) {

private val topic = ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName)
private val collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector

def this(storeName: String, context: ProcessorContext, serialization: StateSerdes[K, V]) {
def this(storeName: String, context: ProcessorContext[_, _], serialization: StateSerdes[K, V]) {
this(storeName, context, context.taskId.partition, serialization)
}

Expand Down
Expand Up @@ -11,9 +11,9 @@ class ProbabilisticCounter(val cmsStoreName: String)
extends Transformer[String, String, KeyValue[String, Long]] {

private var cmsState: CMSStore[String] = _
private var processorContext: ProcessorContext = _
private var processorContext: ProcessorContext[Object, Object] = _

override def init(processorContext: ProcessorContext): Unit = {
override def init(processorContext: ProcessorContext[Object, Object]): Unit = {
this.processorContext = processorContext
cmsState = this.processorContext.getStateStore(cmsStoreName).asInstanceOf[CMSStore[String]]
}
Expand Down

0 comments on commit ba5a118

Please sign in to comment.