Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector] New SeaTunnel API Connectors #1946 Add Druid Source&Sink #2651

Closed
wants to merge 3 commits into from
Closed

[Feature][Connector] New SeaTunnel API Connectors #1946 Add Druid Source&Sink #2651

wants to merge 3 commits into from

Conversation

guanboo
Copy link

@guanboo guanboo commented Sep 5, 2022

… Source&Sink

Purpose of this pull request

Check list

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add connector-v2 e2e-testcase(flink、spark)?

reference:#2499

@guanboo
Copy link
Author

guanboo commented Sep 5, 2022

THX,I deal with connector-v2 e2e-testcase

@guanboo
Copy link
Author

guanboo commented Sep 5, 2022

@hailin0
hi bro.
Have add connector-v2 e2e-testcase create another IS? There will be some workload。


```hocon
DruidSource {
url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. jdbc的协议有http吗?
  2. 请使用说明对url进行标注, http://{IP or domain}:[port]/
  3. 字段作用与说明请写上

statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
return statement.getMetaData();
} catch (SQLException se) {
throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

与openInputFormat的异常不一样,不应该使用一样的异常吗?

quarySQL = getSQL();
connection = DriverManager.getConnection(druidSourceOptions.getURL());
statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
return statement.getMetaData();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否可以把 60,62独立一个方法。因为与69,70一样。或则与openInputFormat方法进行合并下

}

public void closeInputFormat() {
try {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否可以使用try-with解决方案,可以提前释放statement,connent

columns = String.join(",", druidSourceOptions.getColumns());
}

String sql = String.format(QUERY_TEMPLATE, columns, dataSource);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请使用stringbuffer,代码可维护性好些

import java.io.Serializable;

public class DruidSourceConfig {
public static final String URL = "url";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否可以收敛成为常量类

@Data
@AllArgsConstructor
public class DruidSourceOptions implements Serializable {
private String URL;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否可以变量小写,github编辑器都提示变量名是类

import java.util.HashMap;
import java.util.Map;

public class DruidTypeMapper {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否可以收敛,同时在api里面定义

@AutoService(SeaTunnelSink.class)
public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private Config config;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config变量,是否可以放到AbstratSimpleSink里面

this.datasource = pluginConfig.getString(DruidSinkConfig.DATASOURCE);
this.columns = pluginConfig.getStringList(DruidSinkConfig.COLUMNS);
this.timestampColumn = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_COLUMN) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN) : null;
this.timestampFormat = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_FORMAT) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_FORMAT) : null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是否可以加强Config对象, pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN,null);
这样谁用都简单

@hailin0
Copy link
Member

hailin0 commented Sep 6, 2022

@hailin0 hi bro. Have add connector-v2 e2e-testcase create another IS? There will be some workload。

e2e should be submitted with the feature, use e2e validation feature running result instead of code review

reference
https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e
https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-spark-connector-v2-e2e

@guanboo
Copy link
Author

guanboo commented Sep 6, 2022

@hailin0 ok thx,What is your wechat account in the group and ask you some questions

… Source&Sink #2651 add Spark Example demo and Checkstyle code
@EricJoy2048
Copy link
Member

Hi, @guanboo Please resolve conflicts.

@guanboo guanboo closed this by deleting the head repository Sep 28, 2022
@guanboo
Copy link
Author

guanboo commented Sep 28, 2022

done #2937

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants