Skip to content

Conversation

@duli559
Copy link
Contributor

@duli559 duli559 commented Dec 30, 2019

Fix #5934

Motivation
Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class

Modifications

  1. modify PulsarConsumerSource.deserialize access right from 'private' to 'protected'.
  2. add method protected Map<String, String> generateProperties(T value) in class FlinkPulsarProducer, and invoked in TypedMessageBuilder.properties() to add it in pulsar Message.

@sijie
Copy link
Member

sijie commented Jan 1, 2020

@yjshen can you please help review this pull request?

@yjshen
Copy link
Member

yjshen commented Jan 1, 2020

LGTM.

@sijie
Copy link
Member

sijie commented Jan 1, 2020

@yjshen how does this user provide the properties using this code?

@yjshen
Copy link
Member

yjshen commented Jan 1, 2020

As discussed in #5934, users need to subclass FlinkPulsarProducer, overwrite generateProperties proposed by this PR, and generate properties based on:

  • the current value to write
  • constant provided in subclass
  • RuntimeContext from the base RichSinkFunction.

@sijie
Copy link
Member

sijie commented Jan 1, 2020

@yjshen sorry that I missed the discussion. I think it is better to pass in a function PropertitiesExtractor like KeyExtractor rather than inheriting FlinkPulsarProducer, no?

@yjshen
Copy link
Member

yjshen commented Jan 1, 2020

@sijie I agree.
@duli559 could you please check sijie's advice above?

@duli559
Copy link
Contributor Author

duli559 commented Jan 3, 2020

@sijie @yjshen , it's a good advice, i have changed it.

  1. add a interface 'PulsarPropertiesExtractor' to set message properties by user
  2. add an attribute in 'FlinkPulsarProducer', and user can set it in constructors

@duli559 duli559 force-pushed the issue-5934 branch 3 times, most recently from 8c6a767 to 3ba821a Compare January 6, 2020 02:58
@duli559
Copy link
Contributor Author

duli559 commented Jan 6, 2020

why this ci build is failed? I feel a little strange, some other pull requests have the same problem, @yjshen

@tuteng
Copy link
Member

tuteng commented Jan 7, 2020

retest this please

@sijie
Copy link
Member

sijie commented Jan 7, 2020

why this ci build is failed? I feel a little strange, some other pull requests have the same problem,

@duli559 the current Jenkins is a bit flaky. We are working on migrating to Github Actions.

@duli559
Copy link
Contributor Author

duli559 commented Jan 8, 2020

why this ci build is failed? I feel a little strange, some other pull requests have the same problem,

@duli559 the current Jenkins is a bit flaky. We are working on migrating to Github Actions.

@sijie , it's great, and i have changed my commited code with your earlier review advice.

@sijie sijie added component/flink type/feature The PR added a new feature or issue requested a new feature labels Jan 26, 2020
@duli559 duli559 requested a review from sijie March 21, 2020 13:33
@duli559
Copy link
Contributor Author

duli559 commented Mar 21, 2020

@yjshen @tuteng, it seems some instable unit tests failed, can you try to retest them?

@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

/pulsarbot run-failure-checks

1 similar comment
@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

/pulsarbot run-failure-checks

@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

@duli559 could you please merge the master? (git fetch apache master && git merge apache/master)

@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

/pulsarbot run-failure-checks

1 similar comment
@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

/pulsarbot run-failure-checks

@duli559
Copy link
Contributor Author

duli559 commented Mar 25, 2020

/pulsarbot run-failure-checks

7 similar comments
@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

/pulsarbot run-failure-checks

@yjshen
Copy link
Member

yjshen commented Mar 25, 2020

/pulsarbot run-failure-checks

@duli559
Copy link
Contributor Author

duli559 commented Mar 26, 2020

/pulsarbot run-failure-checks

@duli559
Copy link
Contributor Author

duli559 commented Mar 26, 2020

/pulsarbot run-failure-checks

@yjshen
Copy link
Member

yjshen commented Mar 26, 2020

/pulsarbot run-failure-checks

@duli559
Copy link
Contributor Author

duli559 commented Mar 26, 2020

/pulsarbot run-failure-checks

@duli559
Copy link
Contributor Author

duli559 commented Mar 26, 2020

/pulsarbot run-failure-checks

@jiazhai jiazhai merged commit 5ad3e02 into apache:master Mar 26, 2020
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…lsar consumer/producer (apache#5955)

Fix apache#5934

Motivation
Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class

Modifications

1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 'protected'.
2. add method `protected Map<String, String> generateProperties(T value)` in class `FlinkPulsarProducer`, and invoked in `TypedMessageBuilder.properties()` to add it in pulsar Message.

* fix unit test failure

Co-authored-by: herodu <herodu@tencent.com>
Co-authored-by: Sijie Guo <sijie@apache.org>
Co-authored-by: duli <554979476@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Can support read/write properties from/to Message in flink pulsar consumer/producer

5 participants