Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Umbrella][Feature][Core] Decoupling connectors from compute engines #1608

Closed
7 tasks done
ashulin opened this issue Mar 29, 2022 · 20 comments
Closed
7 tasks done

[Umbrella][Feature][Core] Decoupling connectors from compute engines #1608

ashulin opened this issue Mar 29, 2022 · 20 comments
Labels
core SeaTunnel core module Design discuss

Comments

@ashulin
Copy link
Member

ashulin commented Mar 29, 2022

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

In the current implementation of SeaTunnel, the connector is coupled with the computing engine.
so implementing a connector requires implementing the interface of each computing engine.

The detailed design doc:https://docs.google.com/document/d/1h-BeGKK98VarSpYUfYkNtpYLVbYCh8PM_09N1pdq_aw/edit?usp=sharing

Motivation

  1. A connector only needs to be implemented once and can be used on all engines;
  2. Supports multiple versions of Spark/Flink engines;
  3. Source interface to explicit partition/shard/split/parallel logic.
  4. Multiplexing JDBC/log connection.

Why not use Apache Beam?
The source of Apache Beam is divided into two categories: Unbounded and Bounded, which cannot achieve the purpose of one-time code;

Overall Design

SeaTunnel Framework

  • Catalog:Metadata management, which can automatically discover the schema and other information of the structured database;

  • Catalog Storage:Used to store metadata for unstructured storage engines (e.g. Kafka);

  • SQL

  • DataType:Table Column Data Type;

  • Table API:Used for context passing and SeaTunnel Source/Sink instantiation

  • Source API

    • Explicit partition/shard/split/parallel logic;
    • Batch & Streaming Unification;
    • Multiplexing source connection;
  • Sink API

    • Distributed transaction;
    • Aggregated commits;
  • Translation

    • Make the engine support the SeaTunnel connector.
    • Convert data to Row inside the engine.
    • Data distribution after multiplexing.

Simple Flow

SeaTunnel Flow

Why do we need multiplex connections
Streaming scene:

  • RDB (e.g. MySQL) may have too many connections errors or database pressure;
  • Duplicate parsing logs under change data capture (CDC) scenes (e.g. MySQL binlog,Oracle Redolog);

Simple Source & Sink Flow

SeaTunnel Engine Flow

The subtasks:

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

ashulin added a commit to ashulin/incubator-seatunnel that referenced this issue Mar 29, 2022
ashulin added a commit to ashulin/incubator-seatunnel that referenced this issue Mar 29, 2022
@CalvinKirs CalvinKirs added discuss core SeaTunnel core module labels Mar 30, 2022
@ashulin ashulin changed the title [Feature][Core] Decoupling connectors from compute engines [Umbrella][Feature][Core] Decoupling connectors from compute engines Apr 15, 2022
@yx91490
Copy link
Member

yx91490 commented Apr 17, 2022

what does your multiple connection means? does it means one job can have multiple sources in different type, or one job can have multiple connection to a same source instance to work on same or different split of a table?

@ashulin
Copy link
Member Author

ashulin commented Apr 18, 2022

what does your multiple connection means? does it means one job can have multiple sources in different type, or one job can have multiple connection to a same source instance to work on same or different split of a table?

@yx91490 Multiplexing source connection: a source instance can read data from multiple tables.
At present, the source connector of Spark and Flink will create a connection for each table; That is, a source instance will only read one table;
This is fine for off-line jobs, but not acceptable for real-time sync jobs with hundreds (or more) tables.

@dijiekstra
Copy link
Contributor

Is one Coordinator better than many? Because if an exception occurs at Sink and the Source does not know about it and continues to send data downstream, is the data discarded? If the Sink end notifies the Coordinator on the Source end, why not use the same one?

@dijiekstra
Copy link
Contributor

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@dijiekstra
Copy link
Contributor

Anyway, looking forward to the new API . 🎉

@ashulin
Copy link
Member Author

ashulin commented May 10, 2022

Is one Coordinator better than many? Because if an exception occurs at Sink and the Source does not know about it and continues to send data downstream, is the data discarded? If the Sink end notifies the Coordinator on the Source end, why not use the same one?

@lonelyGhostisdog The fault-tolerance between source and sink is supported by chandy-lamport algorithm (ie checkpoint).

@ashulin
Copy link
Member Author

ashulin commented May 11, 2022

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

@dijiekstra
Copy link
Contributor

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

Why don't we design a new engine that doesn't dependency on Spark or Flink

@dijiekstra
Copy link
Contributor

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

Why don't we design a new engine that doesn't dependency on Spark or Flink

Should we first complete the design and construction of Source, Sink and Transform? Even if is only stand-alone mode like datax. Refine fault tolerance, tables, Sql through gradual iteration?

@EricJoy2048
Copy link
Member

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

Why don't we design a new engine that doesn't dependency on Spark or Flink

The new engine is being designed. However, because a large number of SeaTunnel users are using Flink and spark, we hope to be compatible with Flink and spark engines as much as possible. In the future, if our own engine is good enough, we will discuss again whether to continue to rely on Flink and spark.

@dijiekstra
Copy link
Contributor

I have another question. If a Connector is not implemented in Spark or Flink, how will we implement this new Connector if we are based on a new engine? Do we still need to implement the corresponding Connector in Spark or Flink first?

@EricJoy2048
Copy link
Member

I have another question. If a Connector is not implemented in Spark or Flink, how will we implement this new Connector if we are based on a new engine? Do we still need to implement the corresponding Connector in Spark or Flink first?

I suggest the new engine can adaptation the unified API too. And then If the user develops a connector based on the unified tool, the connector will be able to run on spark, Flink and ST own engine.

@dijiekstra
Copy link
Contributor

Thank everyone for answering my questions, maybe I need to see the code to understand the design better.

@dijiekstra
Copy link
Contributor

From the code, I only saw the abstract upper design about catalog,table,source,sink... and some logic of converting Seatunnel to Flink types. There is no distributed execution or network transmission or seatunnel Connector conversion to flink/ Spark connector ,etc.
Is it not commited? Where can I find the corresponding designs?

@dijiekstra
Copy link
Contributor

Or are we going to do a datax-like model for now?

@EricJoy2048
Copy link
Member

From the code, I only saw the abstract upper design about catalog,table,source,sink... and some logic of converting Seatunnel to Flink types. There is no distributed execution or network transmission or seatunnel Connector conversion to flink/ Spark connector ,etc. Is it not commited? Where can I find the corresponding designs?

Branch api-draft only contain connector api, not contain st-engine.

@EricJoy2048
Copy link
Member

EricJoy2048 commented May 16, 2022

Or are we going to do a datax-like model for now?

A seatunnel-engine is being designed. It is designed to solve scenarios that Flink and spark cannot support in data synchronization scenarios. Link resource share, jdbc connector pool, ddl support .

In the scenario of offline synchronization, it is more like dataX, and it supports distributed deployment and execution. It also needs to support real-time synchronization.

We plan to discuss these in the mailing list after the preliminary design is completed.

@ashulin
Copy link
Member Author

ashulin commented Jul 28, 2022

Completed!

@ashulin ashulin closed this as completed Jul 28, 2022
@ashulin
Copy link
Member Author

ashulin commented Aug 12, 2022

Enable the SPI factory classes to improve the entire process.

@ashulin ashulin reopened this Aug 12, 2022
@dinggege1024
Copy link

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core SeaTunnel core module Design discuss
Projects
None yet
Development

No branches or pull requests

7 participants