Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-6617][Sort] Add common process for dirty data sink and supports log sink #6618

Merged
merged 6 commits into from
Nov 28, 2022

Conversation

yunqingmoswu
Copy link
Contributor

@yunqingmoswu yunqingmoswu commented Nov 24, 2022

Prepare a Pull Request

(Change the title refer to the following example)

Title: [INLONG-6617][Sort] Add common process for dirty data sink and supports log sink

(The following XYZ should be replaced by the actual GitHub Issue number)

Motivation

Add common process for dirty data sink and supports log sink.
This piece is designed as follows:

  1. Add option 'dirty.ignore' to determine whether to ignore dirty data, default is 'false'
  2. If the option 'dirty.ignore' is 'true', it can be determined whether to enable sink for dirty data by option 'dirty.side-output.enable'
  3. If the option 'dirty.side-output.enable' is 'true', The dirty data can be sink to the target system by option 'dirty.side-output.connector'
  4. Only supports 'log' side-output for dirty data for now.
  5. Add labels for dirty data by option 'dirty.side-output.labels', the labels format is 'key1=value1&key2=value2', it supports variable replace like '${variable}'. There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported, and the support of other variables is determined by the connector.
  6. Add option 'dirty.side-output.format' to format the real dirty data and labels, only supports [csv|json] for now, default is 'csv'.
  7. Add option 'dirty.side-output.log-tag' for log dirty data, it appears only in logs before labels.
  8. Add option 'dirty.side-output.field-delimiter' to delimit labels and dirty data, default is ','.
  9. If 'dirty.side-output.format' is 'csv', the whole log format will be '[${logTag}] ${labels}${fieldDelimiter}${dirtyData}'.

Modifications

1.Add dirty process model
2.Add abstract for DirtySinkFactory, DirtySink,DirtyData,DirtyType
3.Add DirtyOptions as dirty common options
4.Add utils class for dirty sink such as LabelUtils, PatternReplaceUtils, and etc.
5.Add common option define, such as 'dirty.ignore',dirty.side-output.enable', 'dirty.side-output.connector', 'dirty.side-output.labels', 'dirty.side-output.format', 'dirty.side-output.log-tag', 'dirty.side-output.field-delimiter', and etc.

Verifying this change

(Please pick either of the following options)

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

    (example:)

    • Added integration tests for end-to-end deployment with large payloads (10MB)
    • Extended integration test for recovery after broker failure

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation

@yunqingmoswu yunqingmoswu added this to the 1.5.0 milestone Nov 24, 2022
@gong gong removed this from the 1.5.0 milestone Nov 25, 2022
@Yizhou-Yang
Copy link
Contributor

Yizhou-Yang commented Nov 25, 2022

Is this dirty sink added to integration tasks as one of the sinks in the sink list? Like whenever data is dirty during the process, write to the given sink...I feel that there is much work to do when adapting this dirty sink to individual adaptors and identifying exactly where errors would occur.

What happens if errors occur during read/writes to the dirty sink? will adding this dirty sink (I want to say "side output" if i understood correctly) induce a high performance cost?

Copy link
Contributor

@EMsnap EMsnap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can the connector be applied to the existed connector, can you describe it in the description ?

@gong
Copy link
Contributor

gong commented Nov 25, 2022

Maybe, add some decription in issue, how to use ?

@yunqingmoswu
Copy link
Contributor Author

Maybe, add some decription in issue, how to use ?

It has located in pr describe.

@yunqingmoswu
Copy link
Contributor Author

how can the connector be applied to the existed connector, can you describe it in the description ?

It is necessary to create DirtySink through DirtyFactory in the connector, and then call DirtySink's invoke method where dirty data occurs.

@gong
Copy link
Contributor

gong commented Nov 25, 2022

how can the connector be applied to the existed connector, can you describe it in the description ?

It is necessary to create DirtySink through DirtyFactory in the connector, and then call DirtySink's invoke method where dirty data occurs.

@yunqingmoswu Maybe, add a tool class like org.apache.flink.table.factories.FactoryUtil. It can find sub class of DirtyFactory to create dirtySink object.

@yunqingmoswu
Copy link
Contributor Author

how can the connector be applied to the existed connector, can you describe it in the description ?

It is necessary to create DirtySink through DirtyFactory in the connector, and then call DirtySink's invoke method where dirty data occurs.

@yunqingmoswu Maybe, add a tool class like org.apache.flink.table.factories.FactoryUtil. It can find sub class of DirtyFactory to create dirtySink object.
Just use it directly.

@gong
Copy link
Contributor

gong commented Nov 28, 2022

@yunqingmoswu please git rebase master

@yunqingmoswu
Copy link
Contributor Author

@yunqingmoswu please git rebase master

done

@EMsnap EMsnap merged commit ff17385 into apache:master Nov 28, 2022
@thesumery
Copy link
Contributor

LGTM

featzhang pushed a commit to featzhang/inlong that referenced this pull request Nov 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Sort] Add common process for dirty data sink and supports log sink
6 participants