Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection #5188

Closed
wants to merge 3 commits into from

Conversation

tzulitai
Copy link
Contributor

What is the purpose of the change

Prior to this PR, reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods.
This was done using the FlinkKafkaConsumerBaseTest::getConsumer(...) method (have been removed by this PR). This, however, caused the unit tests to be too implementation-specific and hard to extend.

This PR reworks the FlinkKafkaConsumerBaseTest to remove the reflection-based FlinkKafkaConsumerBaseTest::getConsumer(...) method.
All tests now instantiate the DummyFlinkKafkaConsumer normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic.

Brief change log

  • Remove reflection-relying FlinkKafkaConsumerBaseTest::getConsumer(...) method.
  • Generalize the DummyFlinkKafkaConsumer class
  • Introduce the FlinkKafkaConsumerBaseTest::setupConsumer(...) method that iterates through all normal operator life cycle methods.
  • The test pattern for all unit tests in the FlinkKafkaConsumerBaseTest is now: 1) instantiate a DummyFlinkKafkaConsumer, and 2) call setupConsumer(dummyConsumer) to make sure the consumer goes through all life cycle methods, and instance fields are properly instantiated (instead of relying on reflection like before).

Verifying this change

Test coverage of the reworked FlinkKafkaConsumerBaseTest has not been touched.
That unit test verifies this change.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? n/a

… Java reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.
int subtaskIndex,
int totalNumSubtasks) throws Exception {

final StreamingRuntimeContext runtimeContext = Mockito.mock(StreamingRuntimeContext.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use here proper mock or throw new UnsupportedOperationException() for non supported (instead of returning nulls...). Probably you could extract/re-use one of the already existing logic for instantiating StreamingRuntimeContext (for example MockRuntimeContext or from SourceFunctionUtil)

when(runtimeContext.getNumberOfParallelSubtasks()).thenReturn(totalNumSubtasks);
when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class));

final OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, proper mock, or one with throwing UnsupportedOperationException

// also mock the legacy 1.2 Kafka consumer state (return empty state)
when(operatorStateStore.getSerializableListState(Matchers.anyString())).thenReturn(new TestingListState<>());

final FunctionInitializationContext initializationContext = Mockito.mock(FunctionInitializationContext.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And same here.

public void testEitherWatermarkExtractor() {
try {
new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
new DummyFlinkKafkaConsumer<String>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add default constructor with mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class) parameters?

@tzulitai
Copy link
Contributor Author

Thank you for the review @pnowojski! Have addressed your comments.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes! LGTM

@tzulitai
Copy link
Contributor Author

Thanks! Merging this ...

tzulitai added a commit to tzulitai/flink that referenced this pull request Jan 12, 2018
… Java reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

This closes apache#5188.
@asfgit asfgit closed this in 37cdaf9 Jan 12, 2018
tzulitai added a commit to tzulitai/flink that referenced this pull request Jan 13, 2018
… Java reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

This closes apache#5188.
glaksh100 pushed a commit to lyft/flink that referenced this pull request Jun 6, 2018
… Java reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

This closes apache#5188.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants