Skip to content

[flink-cdc-composer] Base interfaces and models for composer and pipeline definition#2656

Merged
GOODBOY008 merged 2 commits intoapache:masterfrom
PatrickRen:composer-interfaces
Nov 8, 2023
Merged

[flink-cdc-composer] Base interfaces and models for composer and pipeline definition#2656
GOODBOY008 merged 2 commits intoapache:masterfrom
PatrickRen:composer-interfaces

Conversation

@PatrickRen
Copy link
Copy Markdown
Contributor

@PatrickRen PatrickRen commented Nov 7, 2023

This pull request implements #2608, which introduces base interfaces exposed to upper-level CLI, including:

  • PipelineComposer: interface for composer
  • PipelineDef: Java object abstracting a pipeline definition
  • PipelineExecution: Abstraction for an execution of the pipeline

And a Flink implementation (just a framework without solid composing logic) is made as a proof of concept.

Note that Flink composer doesn't have any solid implementation now.
Comment on lines +51 to +52
public PipelineExecution compose(PipelineDef pipelineDef) {
return new FlinkPipelineExecution(env, "CDC Job", isBlocking);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineDef seems useless here.
Do we need a hook passing configurations to StreamExecutionenvironment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently FlinkPipelineComposer doesn't implement any solid functionality because it depends on common and runtime module, so there's no usage for pipelineDef now. The implementation of compose would be like:

@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
    // Translate routers and other operators ...
    Source flinkSource = translateToSource(pipelineDef.getSource());
    Sink flinkSink = translateToSink(pipelineDef.getSink())
    
    // Build Flink DataStream job
     env.fromSource(flinkSource)
        .addTransformation(router)
        .sinkTo(flinkSink);

    // Construct execution
    return new FlinkPipelineExecution(env, jobName, isBlocking);
}

Then the pipelineDef will be used.

I don't quite get the "passing configuration by hook" part. Could you elaborate more about it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it , @PatrickRen Can u add todo to remind the uncomplete logic.

private final SinkDef sink;
private final List<RouteDef> routes;
private final List<TransformDef> transforms;
private final Configuration config;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration is optional is ok , some fixed common configuration key should add as a constant (default value,desc,type etc.) for easy to access.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it , com.ververica.cdc.composer.flink.FlinkPipelineComposer#compose is not complete.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can't tell what configurations are needed right now, but will do as we implement the composer later. Thanks!

Copy link
Copy Markdown
Member

@GOODBOY008 GOODBOY008 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@GOODBOY008 GOODBOY008 merged commit 99d1ad5 into apache:master Nov 8, 2023
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
…line definition (apache#2656)

* [flink-cdc-composer] Base interfaces and models for Flink CDC composer and pipeline definition

* [flink-cdc-composer] Flink implementation of composer and execution.

Note that Flink composer doesn't have any solid implementation now.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants