Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4217: Add KStream.flatTransform #5273

Merged
merged 16 commits into from
Jan 27, 2019

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented Jun 22, 2018

  • Adds flatTrasform method in KStream
  • Adds processor supplier and processor for flatTransform

This contribution is my original work and I license the work to the project under the project's open source license.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

- Adds flatTrasform method in KStream
- Adds processor supplier and processor for flatTransform
Copy link
Contributor Author

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I remove all references to context#forward in the API documentation of transform or flatTransform? What about the references to context#forward() in Transformer#init() and Transformer#close()? Are there use cases for those calls to forward()?

public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
private <K1, V1> KStream<K1, V1> doTransform(final ProcessorSupplier processorSupplierForGraph,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need two distinct objects for the processor supplier? This is an intermediate solution, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\cc @bbejeck Can you help out here? I am not sure atm. This was changed recently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna @mjsax I cannot fully understand the comment. The code was not having two separate supplier but this PR passed in two parameters to the same reference, why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang This code and comment is outdated. The method doTransform() takes one supplier and is not modified by this PR any longer.

@mjsax mjsax added the streams label Jun 23, 2018
@mjsax mjsax requested review from mjsax and guozhangwang June 23, 2018 00:32
@mjsax
Copy link
Member

mjsax commented Jun 23, 2018

\cc @bbejeck @vvcephei

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Couple of comments and question.

@@ -263,7 +263,7 @@
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
* @see #flatTransform(TransformerSupplier, String...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove transform ? We only add a new flatTransform but transform is not removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to make the list of referenced methods too long, so I thought to let map reference transform and flatMap reference flatTransform. But I do not have hard feelings about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Not sure either :) So far, we list a lot of methods. Do you think we should do a general cleanup reduction of listed method? In the past, we only added but never removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of either just adding the new method to the list (without removing another one) or deleting this whole list.
Personally, I feel the list is a little redundant with this interface itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with both options, but I am in favor of deleting the list. Out of curiosity, what is the advantage of just adding new methods to the list without removing others?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless @mjsax objects, I vote to just delete these lists. At this point, it almost looks like it's directing you to all the other methods in this interface, which seems redundant.

I'm not sure I follow your last question. The list exists to direct readers to other relevant methods. I'm not sure why adding flatTransform renders transform irrelevant...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, it's overall useful to use @see to guide users -- however, I agree that it's getting a very long list. This seems to be a general issue though. Maybe, we should do a follow up PR, and go over all JavaDocs and clean them up? We should define a "strategy/guideline" how we cross reference. For example, does it make sense to link to both flatMapValues -- I guess not.

WDYT @vvcephei ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm fine with defining and applying a coherent strategy.

In lieu of that, I guess the default thing to do here would be to just add the new method to the list without removing any other items.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-added the links. @vvcephei My questions was about just adding links vs reorganizing links. I think my question was answered by the decision taken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* Although calling {@link ProcessorContext#forward(K, V) forward()} in {@link Transformer#transform}
* allows to emit multiple records for each input record, it is discouraged to do so since output records
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emit multiple records -- this seems to contradict the change from L491. Can we be more consistent?

Reviewing this, I am also realizing that context.forward() is still the only way to emit data with a punctuation callback. Thus, the question is, what should we describe in the JavaDocs and what not?

It might be better, to not mention context.forward() at all, but only describe it in Punctuator instead?

* {@link #flatTransform(TransformerSupplier, String...) flatTransform()} is safer because it throws a compile error
* if the types of the output records are not compatible with the output stream.
* Calling {@link ProcessorContext#forward(K, V)} in {@link Transformer#transform} may be disallowed in future
* releases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do this. Also, I would only mention it, when we start to deprecate an API.

* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #process(ProcessorSupplier, String...)
*/
<K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> transformerSupplier,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>> and not just Iterable<KeyValue<K1, V1>> ?

transform also has return type KeyValue<K1, V1> but not ? extends KeyValue<? extends K1, ? extends V1>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit, that I copied this from a comment in the corresponding Jira issue. I am not (yet) a Java Generics expert, but I think to -- at least partly -- understand the rationale behind.

With Iterable<KeyValue<K1,V1>> the API forces the Transformer to declare Iterable as the return type of its transform method. With ? extends Iterable<KeyValue<K1,V1>>, the Transfomer may declare any type that implements Iterable (e.g. List) as the return type of its transform method. So it enlarges the set of valid Transformer declarations. Is it better to have a straight Iterable or allow more return types? I do not know. Maybe some of the more expert Kafka Streams developers and users can answer this question.

Regarding <K1, V1> vs. <? extends K1, ? extends V1>, I guess <K1, V1> makes more sense, since K1 and V1 are inferred by the compiler from the actual arguments which would always be the key type and the value type in the return types of the transform method of the given Transformer. Please correct me, if I misunderstood Java Generics here.

For the case KeyValue<...> vs. ? extends KeyValue<...>, I am too much of a newbie, to know what benefits it may have to extend KeyValue. Please help me.

Btw, flatMap is declared as follows:
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generics confuse me regularly, too... I was just comparing to transform -- seems our API is not consistent. I guess your argument makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding <K1, V1> vs. <? extends K1, ? extends V1>, I changed my mind. I guess also here <? extends K1, ? extends V1> makes sense because a user could also give K1 and V1 explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope you don't mind if I jump in...

It's probably worth doing some experiments to get the bounds just right. Here are the basic properties we need:

Let's say we have a simple class hierarchy:

interface Animal
interface Cat extends Animal
interface Dog extends Animal

And let's say we subclass KeyValue: public class KV extends KeyValue.

Given a KStream<String, Cat>, we want to be able to call flatTransform with a TransformerSupplier<String, Animal, List<KeyValue<Animal>>> like this one:
(String s, Animal a) -> asList(new KV(s, new Cat()), new KeyValue(s, new Dog())).

The ? super K and ? super V ensure that any transformer that can handle a K and V is permitted. You can handle an instance if you can assign it to you parameter types (so your parameter type is a supertype of the instance type). You need to specify this as a bound because the compiler doesn't generally let me assign a supertype to a subtype (I can't write Cat c = (Animal) a;).

I don't think you actually need the ? extends Whatever parts to get the right outcome. If anything, you might need it on the stuff inside the iterable to make sure that you can just pass in heterogeneous members, and they'll get "rounded" to their lowest common superclass.

I think this works without extends because you generally can assign a subtype to a supertype.

So maybe try an example like this without the extends stuff and see if the compiler chokes on it or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, Java doesn't fully implement variance, but this is the basic concept (in scala) of what's going on with those extends return types: https://docs.scala-lang.org/tour/variances.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I stand corrected. I did the experiment I was suggesting, and I got the desired results only with the API you have submitted:

<K1, V1> KStreamImpl<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> transformerSupplier,
                                               String... storeNames) {
        return null;
    }

    public static class KV<K, V> extends KeyValue<K, V> {
        public KV(final K key, final V value) {
            super(key, value);
        }
    }

    public static void main(String[] args) {

        final KStreamImpl<Integer, Long> stream = new KStreamImpl<>(null, null, null, false, null);

        // exact transformer
        final KStreamImpl<Integer, Long> stream2 = stream.flatTransform(
            new TransformerSupplier<Integer, Long, Iterable<KeyValue<Integer, Long>>>() {
                @Override
                public Transformer<Integer, Long, Iterable<KeyValue<Integer, Long>>> get() {
                    return new Transformer<Integer, Long, Iterable<KeyValue<Integer, Long>>>() {
                        @Override
                        public void init(final ProcessorContext context) {}

                        @Override
                        public Iterable<KeyValue<Integer, Long>> transform(final Integer key, final Long value) {
                            return Arrays.asList(new KV<>(key, value), new KeyValue<>(key, value));
                        }

                        @Override
                        public void close() {}
                    };
                }
            }
        );

        // transformer that takes superclass k/v and returns exact results
        final KStreamImpl<Integer, Long> stream3 = stream.flatTransform(
            new TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Long>>>() {
                @Override
                public Transformer<Number, Number, Iterable<KeyValue<Integer, Long>>> get() {
                    return new Transformer<Number, Number, Iterable<KeyValue<Integer, Long>>>() {
                        @Override
                        public void init(final ProcessorContext context) {

                        }

                        @Override
                        public Iterable<KeyValue<Integer, Long>> transform(final Number key, final Number value) {
                            return Arrays.asList(new KV<>(key.intValue(), value.longValue()), new KeyValue<>(1, 3L));
                        }

                        @Override
                        public void close() {

                        }
                    };
                }
            }
        );


        // transformer that takes exact parameters and returns subclass results
        final KStreamImpl<Number, Number> stream4 = stream.flatTransform(
            new TransformerSupplier<Integer, Long, Iterable<KeyValue<Integer, Long>>>() {
                @Override
                public Transformer<Integer, Long, Iterable<KeyValue<Integer, Long>>> get() {
                    return new Transformer<Integer, Long, Iterable<KeyValue<Integer, Long>>>() {
                        @Override
                        public void init(final ProcessorContext context) {

                        }

                        @Override
                        public Iterable<KeyValue<Integer, Long>> transform(final Integer key, final Long value) {
                            return Arrays.asList(new KV<>(key, value), new KeyValue<>(1, 3L));
                        }

                        @Override
                        public void close() {

                        }
                    };
                }
            }
        );


        // transformer that takes superclass parameters and returns subclass results
        final KStreamImpl<Number, Number> stream5 = stream.flatTransform(
            new TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Long>>>() {
                @Override
                public Transformer<Number, Number, Iterable<KeyValue<Integer, Long>>> get() {
                    return new Transformer<Number, Number, Iterable<KeyValue<Integer, Long>>>() {
                        @Override
                        public void init(final ProcessorContext context) {

                        }

                        @Override
                        public Iterable<KeyValue<Integer, Long>> transform(final Number key, final Number value) {
                            return Arrays.asList(new KV<>(key.intValue(), value.longValue()), new KeyValue<>(1, 3L));
                        }

                        @Override
                        public void close() {

                        }
                    };
                }
            }
        );

    }

If you take out any of the ? extends, it won't compile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for verifying @vvcephei!


private final TransformerSupplier<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> transformerSupplier;

public KStreamFlatTransform(TransformerSupplier<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> transformerSupplier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

In Kafka Streams, we apply final to all variable/parameters etc whenever possible. Applies to multiple place. Please all everywhere.

public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
private <K1, V1> KStream<K1, V1> doTransform(final ProcessorSupplier processorSupplierForGraph,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\cc @bbejeck Can you help out here? I am not sure atm. This was changed recently.

inputKey = 1;
inputValue = 10;
Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> tempTransformer = mock(Transformer.class);
transformer = tempTransformer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tempTransformer can be removed? assign to transformer directly?

@Test
public void shouldInitialiseFlatTransformProcessor() {
transformer.init(context);
replayAll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am always unsure: don't we need to call "expectLastCall()" or similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it suffices to write it as I did it in this case. You would use expectLastCall() to check whether a method is called multiple times and you do not want to repeat the same line amongst other use cases. See this example in the EasyMock Documentation.

EasyMock.reset(transformer);

EasyMock.expect(transformer.transform(inputKey, inputValue))
.andReturn(Collections.<KeyValue<Integer, Integer>>emptyList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix indention (should be +4 spaces only)

verifyAll();
}

@Test(expected = NullPointerException.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this save? Test would pass if NPE is thrown from somewhere unexpected... I would prefer to keep the scope where NPE is expected as tide as possible. We often use:

try {
  toSomethingAndExpectNPE();
  fail("Should have thrown");
} catch (final NPE expected) { }

Not sure if we can rewrite this way. Doesn't EasyMock not allow to expect an exception, too (as another alternative)?

@mjsax
Copy link
Member

mjsax commented Sep 7, 2018

@cadonna What is the status of this PR? Can you address the open comment, and also rebase this PR to resolve the conflicts? Thx.

- added final to method parameter
- unified Java Generics in transform and flatTransform for
  TransformerSupplier
- removed references to context.forward in API documentation
- added integration tests for transform and flatTransform
- improved unit test to check NullPointerException for transform
  and flatTransform
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cadonna ,

Thanks for the PR! This looks good overall; I just had a few remarks.

Do you think it would be worthwhile to go ahead and replace all the other operations that can be expressed in terms of FlatTransform? For example, do we need the implementation of KStreamTransform at all, or can we just convert the supplied Transformer to one that returns a singleton of the value?

Not sure about whether that would actually save some complexity or not.

@Override
public void close() {}
});
.transform(new TransformerSupplier<Windowed<String>, Long, KeyValue<Object, Object>>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to drop the SAM-converted function and go back to Supplier here, or is this just an artifact of testing? (If the latter, can you revert it?)

(also applies elsewhere)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the code does not compile with the SAM-converted function. I have not yet understood the reason. The compiler error is:

method transform in interface KStream<K,V> cannot be applied to given types;
                    .transform(() ->  new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>() {
                    ^
  required: TransformerSupplier<? super Windowed<String>,? super Long,? extends KeyValue<? extends K1,? extends V1>>,String[]
  found: ()->new Tr[...]{ } }
  reason: cannot infer type-variable(s) K1,V1
    (argument mismatch; bad return type in lambda expression
      <anonymous Transformer<Windowed<String>,Long,KeyValue<Object,Object>>> cannot be converted to Transformer<Windowed<String>,Long,KeyValue<? extends K1,? extends V1>>)
  where K1,V1,K,V are type-variables:
    K1 extends Object declared in method <K1,V1>transform(TransformerSupplier<? super K,? super V,? extends KeyValue<? extends K1,? extends V1>>,String...)
    V1 extends Object declared in method <K1,V1>transform(TransformerSupplier<? super K,? super V,? extends KeyValue<? extends K1,? extends V1>>,String...)
    K extends Object declared in interface KStream
    V extends Object declared in interface KStream

When I use the old generics signature for transform it compiles. I will take a deeper look at it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.. is it because of the signature in KStream#transform for generics as <? super K, ? super V, ..> @vvcephei ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, the culprit is ? extends K1 and ? extends V1. The type system won't go so far as to assume that if I return, say, Integer-valued results, then V1 must be Integer. Because I didn't say I would return a V1, just that whatever I return will extend V1, and I never said what V1 would actually be.

Unfortunately, it gets even less sensible from here... To solve that problem, it should be sufficient to fix V1 and K1, like this: .<Integer, Integer>transform(...). But that fails with a message that Transformer<Windowed<String>, Long, KeyValue<Integer, Integer>> can't be converted to Transformer<Windowed<String>, Long, KeyValue<? extends Integer, ? extends Integer>>.

I thought maybe it doesn't consider Integer to extend Integer, but .<Number, Number>transform(...) also doesn't work. I think what we're dealing with is just the insufficiency of Java's type system. I have heard several times that they deliberately stopped short of fully implementing the type system in exchange for performance, so there are some true type statements that nevertheless don't compile. Maybe this is one of them.

I think that we'd rather keep the ability to use lambda substitution for the supplier and give up the "maximum flexibility" on the return values for K1 and V1. Maybe you can just replace ? extends K1 with K1 (and the same for V1) and re-attempt the experiments I went though before so we know what we're giving up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable to me, especially given that the only caller of the constructor is always passing in a

TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier

anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the experiments and the most flexible declaration that worked with the lambda is the following:

<K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, ? extends KeyValue<K1, V1>> transformerSupplier,
                                   final String... stateStoreNames);

With this declaration, the Transformer may not return keys and values of subclasses of the key type and the value type of the result stream. Thus, the examples with stream4 and stream5 in @vvcephei 's experiments above do not compile with this declaration.

If nobody objects, I will proceed with the declaration above.

Removing only the ? extends in front of KeyValue instead of removing ? extends from K1 and V1 does also not compile when using lambdas.

FYI, I created a minimal example of this rather strange compiler behavior and posted a question on stackoverflow [1].

[1] https://stackoverflow.com/questions/52445222/java-using-lambda-impedes-inference-of-type-variable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me. Thanks for checking, @cadonna !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I had to use the following signatures in the end:

<K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
                                   final String... stateStoreNames);

<K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
                                       final String... stateStoreNames);

The reason is that I got similar compile errors as before with the following signature for flatTransform:

<K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, ? extends Iterable<? extends KeyValue<K1, V1>>> transformerSupplier,
                                       final String... stateStoreNames);

I tried a couple of alternatives, but the only one that allowed me to define a TransformerSupplier with an anonymous class and a lambda expression was the one without any wildcards in the generics for the output of the operator. To be consistent between transform and flatTransform, I also removed the wildcards in the generics for the output of transform. Now, transform has again the same signature as on the current trunk.

Lessons learned:

  • Lambdas and anonymous classes have different representations in Java byte code. See this answer to my stackoverflow question. I guess this is the reason for the differences in type variable inference.
  • There would be ways to define a lambda for the more flexible signatures for transform and flatTransform, but they consists of casts and obscure generics declarations in the lambda (see my stackoverflow question). Additionally, it is a trial and error process to get the lambda right. Thus, something you do not want to expose to your user to.
  • In Java 8, there exists the java.util.function.Supplier functional interface which is really similar to Kafka's TransformerSupplier. With the java.util.function.Supplier the more flexible signatures for transform and flatTransform seem to compile with lambdas (I have not thoroughly tested it, though). The drawback here is that defining the transformer supplier by means of an anonymous class does not seem to work anymore with the more flexible signature (also here, I have not invested too much time to try it out).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, well, I buy your reasoning, so I'm +1 for the interface you've settled on.
I doubt anyone really needs to subclass KeyValue anyhow.

@@ -24,24 +24,24 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
public class KStreamTransform<Ki, Vi, Ko, Vo> implements ProcessorSupplier<Ki, Vi> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment about the actual name choice here. But also, I'm not sure if the increase of clarity here is worth the extra review time that including it in the diff incurs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the increase in clarity is worth the (small) extra review time, especially for newbies like me that are not used to the common patterns in the code base you described in another comment.

import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

public class KStreamFlatTransform<Ki, Vi, Ko, Vo> implements ProcessorSupplier<Ki, Vi> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess this is "K-in" and "K-out"? I think a more common pattern in the code base is <K,V,KR,VR> (r=result) or <K,V,K1,V1> (not sure what the rationale there is ;) ).

That said, there's no reason generics should get to be alphabet soup when the project style prohibits alphabet soup variable names. Feel free to use (short) descriptive names for these types if you feel the "standard" ones are too opaque.

Even KIn and KOut would be better than replacing one arcane pattern with another.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to use KIn and KOut. Thank you for the proposal.


stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream
.flatTransform(new TransformerSupplier<Integer, Integer, List<KeyValue<Integer, Integer>>>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We typically use SAM conversion whenever possible just to keep the testing code compact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment about SAM conversion.

  - declared Java Generics in transform and flatTransform for
    TransformerSupplier such that the TransformerSupplier can be
    defined as lambda and anonymous class
  - implemented transform as a special case of flatTransform
  - used more readable names for type variables of inputs and
    outputs in KStreamFlatTransform
  - defined TransformerSupplier as lambda expressions in the tests
- reverted use of anonymous class instead of lambda expression in test
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! it looks like I wrote a few comments and never submitted them...

@@ -263,7 +263,7 @@
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
* @see #flatTransform(TransformerSupplier, String...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of either just adding the new method to the list (without removing another one) or deleting this whole list.
Personally, I feel the list is a little redundant with this interface itself.

* returns zero or one output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}).
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: needs a comma after the {@link ...}

@mjsax
Copy link
Member

mjsax commented Oct 3, 2018

@cadonna Can you rebase this PR to resolve the conflicts?

@cadonna
Copy link
Contributor Author

cadonna commented Oct 7, 2018

Recently, I started work on flatTransformValues. Do you prefer a new PR for that or should I add it to this?

@vvcephei
Copy link
Contributor

vvcephei commented Oct 8, 2018

Hi @cadonna , I think it would be better to have a new PR.

Anything over a few hundred lines of code becomes a significant burden on reviewers, and reviewing a PR over 1k LOC is a major undertaking. The more you can break up code into isolated features for review, the better.

- adds missing comma in javadoc
- re-adds links to transform in javadoc
@@ -491,14 +494,15 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
final Produced<K, V> produced);

/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* Transform each record of the input stream into zero or one record in the output stream (both key and value type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, it's still possible to emit multiple key-value pairs via context.forward(), right? I guess this change is to highlight the different to the new flatTransformValue() only?

I am wondering, if we should still mention this (but point out the flatTransformValue is recommended for this case, because of type safety)? Think, this might be worth adding, because context.forward may be use within punctuations.

Or is the intend, to discourage the usage of context.forward completely and thus remove from the JavaDocs? This seems to limit users to implement custom operators... Note, that process() does not return an output stream and thus transform is quite critical for users to embed custom operators within the DSL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably mean flatTransform and not flatTransformValue in your comment. I mention that to avoid misunderstandings in the following. How does transform differ from transformValues with respect to punctuations? I ask because in transformValues the transformer is not allowed to call context.forward. With this code the context in the transformer is wrapped with a context that throws an exception when context.forward is called. Does this mean a punctuation in transformValues is not allowed to emit any pair?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably mean flatTransform and not flatTransformValue in your comment.

Yes.

In transformValue it's not allowed to use context.forward() (neither in transform nor in a punctuation) because it would allow to change the key violating the transformValue contract to not change the key.

In transform it's ok to call context.forward() though as there is no such restriction.

@@ -529,19 +533,16 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
* this.context = context;
* this.state = context.getStateStore("myTransformState");
* // punctuate each 1000ms; can access this.state
* // can emit as many new KeyValue pairs as required via this.context#forward()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still true...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump... should this comment be restored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a note in the javadocs about the recommended way to emit multiple output records.

* Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing <p> for new paragraph

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the JavaDocs of transform, I guess there is a blank line too much rather than a missing <p>. I deleted the blank lines around this line.

* this.context = context;
* this.state = context.getStateStore("myTransformState");
* // punctuate each 1000ms; can access this.state
* context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We updated this API in 2.1 release to take Duration instead of long -- compare https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times

This raises the question, if we did forget to update some JavaDocs... Would you mind to double check? If yes, it would be cool, if you could do a separate PR to update the JavaDocs (this allows us to cherry-pick it into 2.1 branch).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the pull request #5924

if (pair != null)
public void process(final KIn key, final VIn value) {
final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(key, value);
Objects.requireNonNull(pairs, "result of transform can't be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, if we need to fail here? We could actually allow null, too and just skip the for loop for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally do not like to use null to represent an empty result. With KStream#transform(...), we do currently use null to represent empty results although with Java 8, we could probably use java.util.Optional. With KStream#flatTransform(...), we use an empty list to represent an empty result. However, for consistency with KStream#transform(...), I now also added your proposal and skip the loop, if the transformer returns null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I share your dissatisfaction with all the nullability in our API. It would be nice to migrate away from it over time.

Not important for this PR, but just to share the thought, as opposed to an optional return type, we could have coalesced transform and flattransform into one API by making the return type void and forcing implementers to use context.forward. Maybe this would have been better, since it would remove any ambiguity about how to use the API.

I think for that to work really well, we'd need to have a tighter control on the forwarded types (so we can still tell whether or not the key is going to change, and so we can properly type the down-stream). It's something I've been wanting to do anyway, to reduce casting. So it might be something to consider in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making the return type void and forcing implementers to use context.forward

I don't think that is a good idea. Note that context.forward is not type safe by design. Adding return types allows us to offer a type safe way to return output records to Kafka Streams runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's what I was referring to in the last paragraph. I think we could offer a different context API that offers the type safety we need, but it's speculative. I haven't actually tried. My comment wasn't for this PR at all, just thinking about where we could go in the future within the context of this change.

Note that what you point out about the type-danger of context.forward is applicable to transform today. We encourage users to use forward within their transformers, but doing so completely circumvents the type system.


@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, if we could do this test using TopologyTestDriver instead? Ie, what it the exact purpose of the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this test is to test the integration of all components that are involved in the calls to flatTransform and transform.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but why would TopologyTestDriver not provide this (ie, what do you consider as "all components" to verify the correct behavior? It don't think we need broker for this, but just a "Task" that the test driver provides. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the test can also be written using TopologyTestDriver instead of a broker. I re-wrote the test accordingly.

private WindowStore<String, String> store;
private int numRecordsProcessed;

.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this rewrite instead of using lambda expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should not be in the PR. I thought, I reverted all of these changes after our discussion about generics and lambdas. Thank you!

final String name = builder.newProcessorName(TRANSFORM_NAME);
return flatTransform(() -> new Transformer<K, V, Iterable<KeyValue<KR, VR>>>() {

private Transformer<? super K, ? super V, KeyValue<KR, VR>> transformer = transformerSupplier.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a critical piece -- can we add a test to make sure we create new instances correctly? We had a bug previously with similar wrapping that was done incorrectly. Compare #4435

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test shouldAlwaysGetNewTransformerFromTransformerSupplier added in KStreamImplTest.

}

@Test
public void shouldCallInitAndCloseOfSinglePairTransformer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the purpose of this test is? Can you elaborate? It seems to test, that the KStreamImplMock is calling transformer.init(context); and transformer.close();? But this would test, test code -- I guess I am missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies that if init(context) and close(context) are called on the transformer instantiated in KStreamImplMock#flatTransform then the wrapper around the transformer class in KStreamImpl#transform calls init(context) and close(context) on the transformer instantiated within the wrapper (i.e. final Transformer<String, String, KeyValue<Integer, Integer>> transformer = mock(Transformer.class) in the test). The code lines under test are this and this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Same comment as above. (If we split the wrapper code out, it might be good to split this test into two -- one for init() one for close())?

testStream.transform(null);
}

@Test
public void shouldReturnSingletonListOrEmptyListIfSinglePairTransformerReturnsNotNullOrNull() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this test? I don't understand what it is supposed to verify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies this method in KStreamImpl#transform that calls KStreamImpl#flatTransform. The code under test is part of the wrapper around the transformer class for KStreamImpl#transform that is needed to call KStreamImpl#flatTransform.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Makes sense.

I am wondering, if we should extract the part that builds the wrapper into its own method (package private so we can test it) and simplify the test accordingly?

(Or at least adding a comment might help -- not sure if we can encode the purpose in the test name -- it's too complex IMHO.)

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted the wrapper code into an adapter class (TransformSupplierAdapter). It seemed to me to be the cleaner solution instead of loosening access to members for unit tests.

@guozhangwang
Copy link
Contributor

@cadonna are you still working on this PR?

@cadonna
Copy link
Contributor Author

cadonna commented Nov 16, 2018

@guozhangwang Yes, I do. I just have not had spare time to work on the PR for the last couple of weeks. I will try to reply to @mjsax comments during the weekend.

- adds test to verify, whether the wrapper for transformer supplier
  in transform creates a new instance for each call to its get method
- allows null as return value for the transformer used in flatTransform
- minor refactorings in the Java docs
- removes unintended change
- extracts code to adapt a TransformerSupplier for a transform
  operation to a TransformSupplier for flatTransform operation
  to an adapter class.
- re-writes integration tests for flatTransform and transform to use
  TopologyTestDriver instead of a broker.
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits. Overall LGTM.

Call for second review anyof @guozhangwang @bbejeck @vvcephei

context().forward(pair.key, pair.value);
public void process(final KIn key, final VIn value) {
final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(key, value);
if (pairs != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDocs says, the return value must not be null. With with check, this is actually not true and null will be interpreted as empty/zero output records.

I am wondering if we should keep the code as-is and update the JavaDoc, or if we would want to be strict and remove this check and let the for-loop throw an NPE?

\cc @guozhangwang @bbejeck @vvcephei WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the javadoc of Transformer? It says:

@return new {@link KeyValue} pair&mdash;if {@code null} no key-value pair will
     * be forwarded to down stream

So I think this logic here is fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you refer to the flatTransform JavaDocs? There it says

     * Method {@link Transformer#transform(Object, Object) transform()} must return an {@link java.util.Iterable} (e.g.,
     * any {@link java.util.Collection} type) and the return value must not be {@code null}.

Apparently, I copied this sentence from the flatMap JavaDocs. I would remove this sentence from the flatTransform JavaDocs because I guess that the only reason to not allow nulls in a map operation is to distinguish it from a filter operation. This is not needed for a transform operation. Is this correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.
Please update the JavaDocs accordingly.

For flatMap() as it can return zero (ie, empty collection), too, we could also allow to return null and interpret it as zero. Don't see a big difference here -- however, for a functional style API it might be ok to be more restrictive and not allow null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion, but it seems like we might want to stick close to the behavior of flatMap.

  1. If we're permissive with the return value of flatTransform, but not with flatMap, then it might be a confusing asymmetry with flatMap.
  2. Also, if we want to change the return behavior of flatMap, don't we need a KIP?

Together, it would suggest that we should not permit null returns from flatTransform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree that the asymmetry between flatTransform and flatMap may be confusing, I would also like to note that, we have already a similar asymmetry between transform and map. The transformer for transform has to return null to represent the empty result. Consequently, we would also have an asymmetry between transform and flatTransform if we forbade null as a representation of the empty output for flatTransform.

Do we want to have an asymmetry between flatTransform and transform or between flatMap and flatTransform?

One option could be to go for the former -- i.e., forbidding null output for flatTransform -- and to create a JIRA issue to change the transformer for transform to return java.util.Optional<KeyValue<KOut, VOut>> instead of KeyValue<KOut, VOut> to restore symmetry regarding null output values. This solution would break backward compatibility, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to consider: map() is a 1:1 operation while flatMap() is a 1:[0,...,n] operation. Also transform() in existing API is a 1:[0,...,n] operations. Thus, the grouping and comparison of map/transform and flatMap/flatTransform cannot really be applied.

Note that map() and flatMap() are DSL operators while transform() is PAPI integration. Also note, that flatMap() has a different return type compared to transform(), that makes it possible to require non-null output for flatMap() but not for transform() in old Java7 API.

We introduce flatTransform() as new 1:[0,...,n] transformation now and also have Java8. However, as @cadonna mentioned, we cannot simply change the return type for transform() to Optional though. Also, for PAPI integration I think it's not required to apply the same rules as for DSL operators and to disallow null necessarily.

Thus the base question seems to be, should we change the "contract" of transform() from 1:[0,...,n] to a 1:1 operator because flatTransform()is the preferred operation for 1:[0,...,n] now. I think the answer is _no_, becausetransform()still allows to emit records viacontext.forward()`.

Thus, I would keep map(), flatMap(), and transform() as-is, and I would allow null as return type for flatTransform(), because it's "just" a type safe alternative to transform() anyway.

Just my 2 cents.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine to me @mjsax . I think we can possibly find a better way to unify all this stuff, but it would require some structural changes in the processor context.

I'm happy with permitting null return in flatTransform.

Copy link
Contributor Author

@cadonna cadonna Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the above sentence from the javadocs of flatTransform.

@@ -441,27 +443,6 @@ public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningT
}
}

@Test
public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea. I re-added the test.

public void before() throws InterruptedException {
builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract "myTransformState" into variable that is use below to get the state from the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}

@Test
public void shouldAllowNullAsResultOfTransform() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a corresponding test for flatTransform in the test above, too. (Positive or negative test, depending if we want to allow null or not -- cf. my other comment.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand to what you refer to? This test tests the KStreamFlatTransformProcessor. Both, calls to transform and calls to flatTransform, run this code. The transformer returns a list of output records for both calls. The list can be null or it can be empty. The null-list is tested in shouldAllowNullAsResultOfTransform and the empty list is tested in shouldAllowEmptyListAsResultOfTransform. At this point, there is no distinction between transform and flatTransform.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. My bad.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Thanks again for the PR. I made another pass on it and beside the very minor ones I had a meta comment about the adaptor class solution (cc @vvcephei ), concerning if it may have perf penalties since for each single value returned, we are effectively calling singletonList(value) followed by a foreach -> forward, and it is on the critical path.

Plus, it makes the code less readable (this one is a subjective call of course). Any thoughts about this?


private final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier;
private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is out of the scope but the previous discussion makes me thinking: do we have a similar issue with KStreamFlatMap / FlatMapValues that takes ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>>, that blocks lambda functions? @cadonna @vvcephei

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, it might be worth a Jira to create tests demonstrating the desired source code. Then, not only do we prove it works (or discover it doesn't), we also have a regression test in case we change the generic bounds in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to compile the following code

stream
    .flatMap((Integer key, Integer value) -> Arrays.asList(
        KeyValue.pair(key, value),
        KeyValue.pair(key, value),
        KeyValue.pair(key, value)))
    .foreach(action);

I guess in the flatTransform case it is the TransformerSupplier which makes the code not compile.
I agree with @vvcephei that a Jira to create tests demonstrating the desired source code might be beneficial.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks for checking, @cadonna

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Can you create a JIRA for tracking?

Copy link
Contributor Author

@cadonna cadonna Mar 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created ticket KAFKA-8035.

Should I label it as a newbie issue?

context().forward(pair.key, pair.value);
public void process(final KIn key, final VIn value) {
final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(key, value);
if (pairs != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the javadoc of Transformer? It says:

@return new {@link KeyValue} pair&mdash;if {@code null} no key-value pair will
     * be forwarded to down stream

So I think this logic here is fine?

if (pair != null) {
return Arrays.asList(pair);
}
return Arrays.asList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Collections.emptyList()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
private <K1, V1> KStream<K1, V1> doTransform(final ProcessorSupplier processorSupplierForGraph,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna @mjsax I cannot fully understand the comment. The code was not having two separate supplier but this PR passed in two parameters to the same reference, why?

public Iterable<KeyValue<KOut, VOut>> transform(final KIn key, final VIn value) {
final KeyValue<KOut, VOut> pair = transformer.transform(key, value);
if (pair != null) {
return Arrays.asList(pair);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Collections.singletonList.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang , you had a performance concern about this class.

It definitely pads the execution, but my gut says that it won't be too bad, and the complexity savings of consolidating the code paths probably makes up for it.

Collections.singletonList doesn't create a full-blown list, but just wraps the argument in an instance of SingletonList, so it's very light to allocate and very light to collect. Collections.emptyList just returns a reference to a static singleton instance of EmptyList, so there's no overhead to it.

Also, the iteration should be low/no cost as well in hot code, as the jit compiler should unroll the loop.

This is all speculative, but something that @cadonna could evaluate with a microbenchmark if desired.

Anyway, like I said, I'm not worried about it in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. I do recommend taking both your suggestions to use singletonList/emptyList in this class, and actually in general as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced Arrays.asList with Collections.singletonList.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! (FYI, if you use IDEA, there's a code inspection you can enable to prompt you to make this change.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I think I'm convinced. If others like TransformerSupplierAdapter for complexity savings as well we can merge this as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @mjsax please make a final pass and merge if you have no further comments.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, LGTM. I've left a few comments on running threads.

Thanks, @cadonna !

- re-added test that was accidentally removed
- added note about use of context.forward to emit multiple output
  records
- replaced Arrays.asList with Collections.singletonList and
  Collections.emptyList
- minor javadoc refactorings
- minor integration test refactoring
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks, @cadonna .

- removed requirement for not null return value from javadocs
  of flatTransform
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

I left a couple of JavaDoc nits, but would like to get them address in a follow up PR. Merging this.

Thanks a lot for the KIP and PR @cadonna! And contracts to your first contribution! 🎉 🎉 🎉

* This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}).
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)}, the processing progress
* can be observed and additional periodic actions can be performed.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already changed by somebody else.

* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, a schedule must be registered.
* Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object)
* transform()} and {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize, that this seems to be wrong. punctuate() does not return anything. Might be a problem throughout all JavaDocs -- could your verify @cadonna ? I would suggest to do a separate PR to address this though so we can merge this PR without additional delay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created PR #6552

@@ -552,20 +553,109 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}.
* (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
* </p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: JavaDoc markup is not HTML... Thus, there is no closing tag. Remove the and next empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already changed by somebody else.

*
* <p>
* Note that it is possible to emit multiple records for each input record by using
* {@link ProcessorContext#forward(Object, Object) context#forward()} in {@link Transformer#transform(K, V)}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "in transform() and punctuate()"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in PR #6365

* To ensure type-safety at compile-time, it is recommended to use
* {@link #flatTransform(TransformerSupplier, String...)} if multiple records need to be emitted for each input
* record.
* </p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already changed by somebody else.

* transformation).
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In transform() we have a sentence:

The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object) transform()}

Might be good to add this here too and point out it might be a an Iterable plus example. Compare flatMap():

     * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
     * and the return value must not be {@code null}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty line was already removed by somebody else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a similar sentence in PR #6365

* void init(ProcessorContext context) {
* this.context = context;
* this.state = context.getStateStore("myTransformState");
* // punctuate each 1000ms; can access this.state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: punctuate each second (might be c&p error and affect other places, too) -- maybe better to address in separate PR (the void punctuate() issue and this can be one PR IMHO).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also contained in PR #6552

* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}.
* (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a similar paragraph about using context.forward() as in transform() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a similar paragraph in PR #6365


private final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier;
private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Can you create a JIRA for tracking?

@mjsax
Copy link
Member

mjsax commented Jan 26, 2019

Retest this please.

1 similar comment
@mjsax
Copy link
Member

mjsax commented Jan 27, 2019

Retest this please.

@mjsax mjsax merged commit aca027e into apache:trunk Jan 27, 2019
@mjsax
Copy link
Member

mjsax commented Jan 27, 2019

Create follow up Jira to track second part of KIP-313: adding KStream#flatTransformValues that does not make it into 2.2. Assigned the ticket to @cadonna.

@guozhangwang
Copy link
Contributor

Thanks @cadonna for this PR!

cadonna added a commit to cadonna/kafka that referenced this pull request Mar 4, 2019
This commit is a leftover of pull request apache#5273.
bbejeck pushed a commit that referenced this pull request Mar 7, 2019
This commit is a follow-up of pull request #5273

Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
bbejeck pushed a commit that referenced this pull request Mar 7, 2019
This commit is a follow-up of pull request #5273

Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk:
  MINOR: Update usage of deprecated API (apache#6146)
  KAFKA-4217: Add KStream.flatTransform (apache#5273)
  MINOR: Update Gradle to 5.1.1 (apache#6160)
  KAFKA-3522: Generalize Segments (apache#6170)
  Added quotes around the class path (apache#4469)
  KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (apache#6202)
  MINOR: In the MetadataResponse schema, ignorable should be a boolean
  KAFKA-7838: Log leader and follower end offsets when shrinking ISR (apache#6168)
  KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (apache#3848)
  MINOR: clarify why suppress can sometimes drop tombstones (apache#6195)
  MINOR: Upgrade ducktape to 0.7.5 (apache#6197)
  MINOR: Improve IntegrationTestUtils documentation (apache#5664)
  MINOR: upgrade to jdk8 8u202
  KAFKA-7693; Fix SequenceNumber overflow in producer (apache#5989)
  KAFKA-7692; Fix ProducerStateManager SequenceNumber overflow (apache#5990)
  MINOR: update copyright year in the NOTICE file. (apache#6196)
  KAFKA-7793: Improve the Trogdor command line. (apache#6133)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This commit is a follow-up of pull request apache#5273

Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
@cadonna cadonna deleted the kafka4217-flatTransform branch October 21, 2019 10:53
@mjsax mjsax added the kip Requires or implements a KIP label Jun 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants