SAMZA-1516: Another round of issues found by BEAM tests#370
SAMZA-1516: Another round of issues found by BEAM tests#370xinyuiscool wants to merge 3 commits intoapache:masterfrom
Conversation
| K key = keyFunction.apply(message); | ||
| V value = valueFunction.apply(message); | ||
| collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value)); | ||
| Long partitionKey = key == null ? 0L : null; |
There was a problem hiding this comment.
after checking with Xinyu, Kafka has a tricky behavior to do round robin when both the partition key and key are null. It might be better to add a line of comment in code as well for future maintenance reference.
There was a problem hiding this comment.
This behavior in Kafka is pretty reasonable. A null key actually means that a key is absent (as opposed to a key being null) and the message is non-keyed. Imagine all non-keyed messages going to the same partition 0? That would be very sub-optimal. For stream-processing use-cases, we want to use null for grouping by key. Hence, we need this to get around this.
| return operatorImpls.get(operatorSpec.getOpId()); | ||
| // and registered. We still need to traverse the DAG further to register the input streams. | ||
| OperatorImpl operatorImpl = operatorImpls.get(operatorSpec.getOpId()); | ||
| operatorImpl.registerInputStream(inputStream); |
There was a problem hiding this comment.
Are we doing a dual registration here? First in the if clause, and then in the else clause? Can we consider consolidating the recursion logic then?
There was a problem hiding this comment.
Sure, I can extract the registerInputStream outside the if .. else. SHould be straightforward to do.
| Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs(); | ||
| registeredSpecs.forEach(registeredSpec -> { | ||
| createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context); | ||
| }); |
There was a problem hiding this comment.
wondering if this logic is needed here? Is the following equivalent?
I'd flip the statement that adds it to the map, and add it to the operatorImpls after the recursive call returns?
OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
SystemStream inputStream, Config config, TaskContext context) {
if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
operatorImpl.init(config, context);
operatorImpl.registerInputStream(inputStream);
Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
registeredSpecs.forEach(registeredSpec -> {
OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
operatorImpl.registerNextOperator(nextImpl);
});
operatorImpls.put(operatorImpl.getOpImplId(), operatorImpl);
return operatorImpl;
} else {
return operatorImpls.get(operatorSpec.getOpId());
}
}
There was a problem hiding this comment.
This logic is needed since I need to further traverse the graph even if the operator is already registered. That way the input stream can be registered to the downstream operators.
A couple of more fixes: 1. fix a bug of identifying input streams for an operator. 2. for partitionBy, set the partitionKey to 0L when key is null. Author: xiliu <xiliu@xiliu-ld1.linkedin.biz> Reviewers: Jagadish V <vjagadish1989@gmail.com> Closes apache#370 from xinyuiscool/SAMZA-1516
A couple of more fixes: 1. fix a bug of identifying input streams for an operator. 2. for partitionBy, set the partitionKey to 0L when key is null.