Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Maxcompute] Add Maxcompute source & sink connector #3640

Merged
merged 10 commits into from
Dec 11, 2022

Conversation

stdnt-xiao
Copy link
Contributor

@stdnt-xiao stdnt-xiao commented Dec 4, 2022

Purpose of this pull request

Maxcompute source & sink connector #3018

Check list

stdnt-xiao and others added 4 commits November 22, 2022 15:36
# Conflicts:
#	plugin-mapping.properties
#	seatunnel-connectors-v2/pom.xml
#	seatunnel-dist/pom.xml
@stdnt-xiao stdnt-xiao changed the title [Feature][Connector-V2][Maxcompute] Add Maxcompute file source & sink connector [Feature][Connector-V2][Maxcompute] Add Maxcompute source & sink connector Dec 4, 2022
Copy link
Contributor

@TaoZex TaoZex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add e2e tests or picture proof that this connector feature is available.


## Changelog

### 2.1.3-beta 2022-12-05
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use next version will be better.


## Changelog

### 2.1.3-beta 2022-12-05
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Comment on lines 27 to 42
public static final Option<String> ACCESS_ID = Options.key("accessId").stringType().noDefaultValue()
.withDescription("Your Maxcompute accessId which cloud be access from Alibaba Cloud");
public static final Option<String> ACCESS_KEY = Options.key("accesskey").stringType().noDefaultValue()
.withDescription("Your Maxcompute accessKey which cloud be access from Alibaba Cloud");
public static final Option<String> ENDPOINT = Options.key("endpoint").stringType().noDefaultValue()
.withDescription("Your Maxcompute endpoint start with http");
public static final Option<String> PROJECT = Options.key("project").stringType().noDefaultValue()
.withDescription("Your Maxcompute project which is created in Alibaba Cloud");
public static final Option<String> RESULT_TABLE_NAME = Options.key("result_table_name").stringType().noDefaultValue()
.withDescription("Target Maxcompute table name eg: fake");
public static final Option<String> PARTITION_SPEC = Options.key("partition_spec").stringType().noDefaultValue()
.withDescription("This spec of Maxcompute partition table.");
public static final Option<Integer> SPLIT_ROW = Options.key("split_row").intType().defaultValue(SPLIT_ROW_DEFAULT)
.withDescription("Number of rows per split. default: 10000");
public static final Option<Boolean> OVERWRITE = Options.key("overwrite").booleanType().defaultValue(false)
.withDescription("Whether to overwrite the table or partition");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix code style.You can refer this pr : e68ecf7

}

@Override
public OptionRule optionRule() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add UT for optionRule() please.

Comment on lines 47 to 49
@Slf4j
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private static final Logger LOG = LoggerFactory.getLogger(MaxcomputeSource.class);
Copy link
Contributor

@TaoZex TaoZex Dec 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already using @Slf4j, LOG may not be useful.

try {
this.session.commit(new Long[]{Thread.currentThread().getId()});
} catch (TunnelException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify exception for connector.

}

@Override
public OptionRule optionRule() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add optionRule test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please, give a refer pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer this:226dc6a

//Doesn't support yet
case MAXCOMPUTE_UNKNOWN:
default:
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify exception.

@stdnt-xiao
Copy link
Contributor Author

Please add e2e tests or picture proof that this connector feature is available.

connector-v2-maxcompute-conf
connector-v2-maxcompute


@Override
public String getPluginName() {
return "MaxcomputeSink";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "MaxcomputeSink";
return "Maxcompute";


@Override
public String getPluginName() {
return "MaxcomputeSource";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "MaxcomputeSource";
return "Maxcompute";

@Slf4j
public class MaxcomputeSourceReader implements SourceReader<SeaTunnelRow, MaxcomputeSourceSplit> {
private final SourceReader.Context context;
private Set<MaxcomputeSourceSplit> sourceSplits;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Set<MaxcomputeSourceSplit> sourceSplits;
private final Set<MaxcomputeSourceSplit> sourceSplits;

SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
if (null == rs.get(i)) {
seatunnelField = null;
} else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TyrantLucifer
Copy link
Member

Overall LGTM except few nits, thank you for your contribution of SeaTunnel.

| accesskey | string | yes | - |
| endpoint | string | yes | - |
| project | string | yes | - |
| result_table_name | string | yes | - |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result_table_name is in common-options now.

Here is an example

image


`project` Your Maxcompute project which is created in Alibaba Cloud.

### result_table_name [string]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this option.


## Key features

- [x] [batch](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sink Connector have no Key features named batch and it only have two Key features

image

| accesskey | string | yes | - |
| endpoint | string | yes | - |
| project | string | yes | - |
| result_table_name | string | yes | - |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete result_table_name and add common-options .

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, RESULT_TABLE_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RESULT_TABLE_NAME is a common options . It will used by engine to generate logical DAG.

@stdnt-xiao stdnt-xiao requested review from EricJoy2048, TyrantLucifer and TaoZex and removed request for TaoZex, EricJoy2048 and TyrantLucifer December 6, 2022 07:21
@stdnt-xiao
Copy link
Contributor Author

WX20221206-152347@2x

TaoZex
TaoZex previously approved these changes Dec 6, 2022
Copy link
Contributor

@TaoZex TaoZex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for your contribution.

EricJoy2048
EricJoy2048 previously approved these changes Dec 7, 2022
@EricJoy2048 EricJoy2048 dismissed stale reviews from TaoZex and themself via 4dc42d9 December 7, 2022 14:12
@github-actions github-actions bot removed the reviewed label Dec 7, 2022
TaoZex
TaoZex previously approved these changes Dec 7, 2022
@TyrantLucifer TyrantLucifer added this to the 2.3.0 milestone Dec 7, 2022
@EricJoy2048 EricJoy2048 closed this Dec 8, 2022
@EricJoy2048 EricJoy2048 reopened this Dec 8, 2022
EricJoy2048
EricJoy2048 previously approved these changes Dec 8, 2022
Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, @TyrantLucifer PTAL

docs/en/connector-v2/sink/Maxcompute.md Outdated Show resolved Hide resolved
docs/en/connector-v2/source/Maxcompute.md Outdated Show resolved Hide resolved
Co-authored-by: hailin0 <hailin088@gmail.com>
Co-authored-by: hailin0 <hailin088@gmail.com>
@github-actions github-actions bot removed the approved label Dec 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants