diff --git a/docs/en/connector-v2/sink/Maxcompute.md b/docs/en/connector-v2/sink/Maxcompute.md new file mode 100644 index 00000000000..302dca7aefd --- /dev/null +++ b/docs/en/connector-v2/sink/Maxcompute.md @@ -0,0 +1,79 @@ +# Maxcompute + +> Maxcompute sink connector + +## Description + +Used to read data from Maxcompute. + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-------------------|----------|-----------|---------------| +| accessId | string | yes | - | +| accesskey | string | yes | - | +| endpoint | string | yes | - | +| project | string | yes | - | +| table_name | string | yes | - | +| partition_spec | string | no | - | +| overwrite | boolean | no | false | +| common-options | string | no | | + +### accessId [string] + +`accessId` Your Maxcompute accessId which cloud be access from Alibaba Cloud. + +### accesskey [string] + +`accesskey` Your Maxcompute accessKey which cloud be access from Alibaba Cloud. + +### endpoint [string] + +`endpoint` Your Maxcompute endpoint start with http. + +### project [string] + +`project` Your Maxcompute project which is created in Alibaba Cloud. + +### table_name [string] + +`table_name` Target Maxcompute table name eg: fake. + +### partition_spec [string] + +`partition_spec` This spec of Maxcompute partition table eg:ds='20220101'. + +### overwrite [boolean] + +`overwrite` Whether to overwrite the table or partition, default: false. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +## Examples + +```hocon +sink { + Maxcompute { + accessId="" + accesskey="" + endpoint="" + project="" + table_name="" + #partition_spec="" + #overwrite = false + } +} +``` + +## Changelog + +### next version + +- [Feature] Add Maxcompute Sink Connector([3640](https://github.com/apache/incubator-seatunnel/pull/3640)) diff --git a/docs/en/connector-v2/source/Maxcompute.md b/docs/en/connector-v2/source/Maxcompute.md new file mode 100644 index 00000000000..133d659490e --- /dev/null +++ b/docs/en/connector-v2/source/Maxcompute.md @@ -0,0 +1,82 @@ +# Maxcompute + +> Maxcompute source connector + +## Description + +Used to read data from Maxcompute. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------------|--------|-----------|---------------| +| accessId | string | yes | - | +| accesskey | string | yes | - | +| endpoint | string | yes | - | +| project | string | yes | - | +| table_name | string | yes | - | +| partition_spec | string | no | - | +| split_row | int | no | 10000 | +| common-options | string | no | | + +### accessId [string] + +`accessId` Your Maxcompute accessId which cloud be access from Alibaba Cloud. + +### accesskey [string] + +`accesskey` Your Maxcompute accessKey which cloud be access from Alibaba Cloud. + +### endpoint [string] + +`endpoint` Your Maxcompute endpoint start with http. + +### project [string] + +`project` Your Maxcompute project which is created in Alibaba Cloud. + +### table_name [string] + +`table_name` Target Maxcompute table name eg: fake. + +### partition_spec [string] + +`partition_spec` This spec of Maxcompute partition table eg:ds='20220101'. + +### split_row [int] + +`split_row` Number of rows per split, default: 10000. + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +## Examples + +```hocon +source { + Maxcompute { + accessId="" + accesskey="" + endpoint="" + project="" + table_name="" + #partition_spec="" + #split_row = 10000 + } +} +``` + +## Changelog + +### next version + +- [Feature] Add Maxcompute Source Connector([3640](https://github.com/apache/incubator-seatunnel/pull/3640)) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 35f5c201435..f3bec6c143a 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -160,4 +160,6 @@ seatunnel.source.Notion = connector-http-notion seatunnel.sink.RabbitMQ = connector-rabbitmq seatunnel.source.RabbitMQ = connector-rabbitmq seatunnel.source.OpenMldb = connector-openmldb +seatunnel.source.Maxcompute = connector-maxcompute +seatunnel.sink.Maxcompute = connector-maxcompute seatunnel.source.MySQL-CDC = connector-cdc-mysql \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-maxcompute/pom.xml b/seatunnel-connectors-v2/connector-maxcompute/pom.xml new file mode 100644 index 00000000000..66c6503f553 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/pom.xml @@ -0,0 +1,61 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-maxcompute + + + 0.31.3 + 3.4 + + + + + org.apache.seatunnel + seatunnel-common + ${project.version} + + + com.aliyun.odps + odps-sdk-core + ${maxcompute.version}-public + + + + org.apache.commons + commons-lang3 + ${commons.lang3.version} + + + org.apache.seatunnel + connector-common + ${project.version} + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java new file mode 100644 index 00000000000..29420520beb --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.io.Serializable; + +public class MaxcomputeConfig implements Serializable { + private static final int SPLIT_ROW_DEFAULT = 10000; + public static final Option ACCESS_ID = Options.key("accessId") + .stringType() + .noDefaultValue() + .withDescription("Your Maxcompute accessId which cloud be access from Alibaba Cloud"); + public static final Option ACCESS_KEY = Options.key("accesskey") + .stringType() + .noDefaultValue() + .withDescription("Your Maxcompute accessKey which cloud be access from Alibaba Cloud"); + public static final Option ENDPOINT = Options.key("endpoint") + .stringType() + .noDefaultValue() + .withDescription("Your Maxcompute endpoint start with http"); + public static final Option PROJECT = Options.key("project") + .stringType() + .noDefaultValue() + .withDescription("Your Maxcompute project which is created in Alibaba Cloud"); + public static final Option TABLE_NAME = Options.key("table_name") + .stringType() + .noDefaultValue() + .withDescription("Target Maxcompute table name eg: fake"); + public static final Option PARTITION_SPEC = Options.key("partition_spec") + .stringType() + .noDefaultValue() + .withDescription("This spec of Maxcompute partition table."); + public static final Option SPLIT_ROW = Options.key("split_row") + .intType() + .defaultValue(SPLIT_ROW_DEFAULT) + .withDescription("Number of rows per split. default: 10000"); + public static final Option OVERWRITE = Options.key("overwrite") + .booleanType() + .defaultValue(false) + .withDescription("Whether to overwrite the table or partition"); +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java new file mode 100644 index 00000000000..83cdaaa3e83 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class MaxcomputeConnectorException extends SeaTunnelRuntimeException { + + public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java new file mode 100644 index 00000000000..cae4005b7e4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoService(SeaTunnelSink.class) +public class MaxcomputeSink extends AbstractSimpleSink { + private static final Logger LOG = LoggerFactory.getLogger(MaxcomputeSink.class); + private Config pluginConfig; + private SeaTunnelRowType typeInfo; + + @Override + public String getPluginName() { + return "Maxcompute"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.pluginConfig = pluginConfig; + MaxcomputeUtil.initTableOrPartition(pluginConfig); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.typeInfo = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.typeInfo; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) { + return new MaxcomputeWriter(this.typeInfo, this.pluginConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java new file mode 100644 index 00000000000..92ef0e664cf --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MaxcomputeSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Maxcompute"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME) + .optional(PARTITION_SPEC, OVERWRITE) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java new file mode 100644 index 00000000000..d2dcb0d232f --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.aliyun.odps.PartitionSpec; +import com.aliyun.odps.Table; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.RecordWriter; +import com.aliyun.odps.tunnel.TableTunnel; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +public class MaxcomputeWriter extends AbstractSinkWriter { + private final SeaTunnelRowType seaTunnelRowType; + private final RecordWriter recordWriter; + private final TableTunnel.UploadSession session; + private final TableSchema tableSchema; + + private Config pluginConfig; + + public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config pluginConfig) { + this.seaTunnelRowType = seaTunnelRowType; + this.pluginConfig = pluginConfig; + try { + Table table = MaxcomputeUtil.getTable(pluginConfig); + this.tableSchema = table.getSchema(); + TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig); + if (this.pluginConfig.hasPath(PARTITION_SPEC.key())) { + PartitionSpec partitionSpec = new PartitionSpec(this.pluginConfig.getString(PARTITION_SPEC.key())); + session = tunnel.createUploadSession(pluginConfig.getString(PROJECT.key()), pluginConfig.getString(TABLE_NAME.key()), partitionSpec); + } else { + session = tunnel.createUploadSession(pluginConfig.getString(PROJECT.key()), pluginConfig.getString(TABLE_NAME.key())); + } + this.recordWriter = session.openRecordWriter(Thread.currentThread().getId()); + log.info("open record writer success"); + } catch (Exception e) { + throw new MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e); + } + } + + @Override + public void write(SeaTunnelRow seaTunnelRow) throws IOException { + Record record = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.seaTunnelRowType); + recordWriter.write(record); + } + + @Override + public void close() throws IOException { + this.recordWriter.close(); + try { + this.session.commit(new Long[]{Thread.currentThread().getId()}); + } catch (Exception e) { + throw new MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java new file mode 100644 index 00000000000..b7381739acc --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(SeaTunnelSource.class) +public class MaxcomputeSource implements SeaTunnelSource { + private SeaTunnelRowType typeInfo; + private Config pluginConfig; + + @Override + public String getPluginName() { + return "Maxcompute"; + } + + @Override + public void prepare(Config pluginConfig) { + this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig); + this.pluginConfig = pluginConfig; + } + + @Override + public SeaTunnelRowType getProducedType() { + return this.typeInfo; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new MaxcomputeSourceReader(this.pluginConfig, readerContext, this.typeInfo); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + return new MaxcomputeSourceSplitEnumerator(enumeratorContext, this.pluginConfig); + } + + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, MaxcomputeSourceState checkpointState) throws Exception { + return new MaxcomputeSourceSplitEnumerator(enumeratorContext, this.pluginConfig, checkpointState); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java new file mode 100644 index 00000000000..152ca8e547a --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MaxcomputeSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return "Maxcompute"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME) + .optional(PARTITION_SPEC, SPLIT_ROW) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java new file mode 100644 index 00000000000..c3e978b0f45 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.aliyun.odps.data.Record; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.io.TunnelRecordReader; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Slf4j +public class MaxcomputeSourceReader implements SourceReader { + private final SourceReader.Context context; + private final Set sourceSplits; + private Config pluginConfig; + boolean noMoreSplit; + private SeaTunnelRowType seaTunnelRowType; + + public MaxcomputeSourceReader(Config pluginConfig, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) { + this.pluginConfig = pluginConfig; + this.context = context; + this.sourceSplits = new HashSet<>(); + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public void pollNext(Collector output) throws Exception { + sourceSplits.forEach(source -> { + try { + TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(pluginConfig); + TunnelRecordReader recordReader = session.openRecordReader(source.getSplitId(), source.getRowNum()); + log.info("open record reader success"); + Record record; + while ((record = recordReader.read()) != null) { + SeaTunnelRow seaTunnelRow = MaxcomputeTypeMapper.getSeaTunnelRowData(record, seaTunnelRowType); + output.collect(seaTunnelRow); + } + recordReader.close(); + } catch (Exception e) { + throw new MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e); + } + }); + if (this.noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded Maxcompute source"); + context.signalNoMoreElement(); + } + } + + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(sourceSplits); + } + + @Override + public void addSplits(List splits) { + sourceSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + this.noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java new file mode 100644 index 00000000000..4ab73c335a3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +import lombok.Getter; + +public class MaxcomputeSourceSplit implements SourceSplit { + @Getter + private int splitId; + @Getter + private long rowNum; + + public MaxcomputeSourceSplit(int splitId, long rowNum) { + this.splitId = splitId; + this.rowNum = rowNum; + } + + @Override + public String splitId() { + return String.valueOf(this.splitId); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java new file mode 100644 index 00000000000..ad2363f3d00 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class MaxcomputeSourceSplitEnumerator implements SourceSplitEnumerator { + private final Context enumeratorContext; + private final Map> pendingSplits; + private Set assignedSplits; + private Config pluginConfig; + + public MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context enumeratorContext, Config pluginConfig) { + this.enumeratorContext = enumeratorContext; + this.pluginConfig = pluginConfig; + this.pendingSplits = new HashMap<>(); + this.assignedSplits = new HashSet<>(); + } + + public MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context enumeratorContext, Config pluginConfig, + MaxcomputeSourceState sourceState) { + this(enumeratorContext, pluginConfig); + this.assignedSplits = sourceState.getAssignedSplit(); + } + + @Override + public void open() { + } + + @Override + public void run() throws Exception { + discoverySplits(); + assignPendingSplits(); + } + + @Override + public void close() throws IOException { + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + addSplitChangeToPendingAssignments(splits); + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplits.size(); + } + + @Override + public void registerReader(int subtaskId) { + } + + @Override + public MaxcomputeSourceState snapshotState(long checkpointId) { + return new MaxcomputeSourceState(assignedSplits); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + } + + @Override + public void handleSplitRequest(int subtaskId) { + } + + private void discoverySplits() throws TunnelException { + TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(this.pluginConfig); + long recordCount = session.getRecordCount(); + int numReaders = enumeratorContext.currentParallelism(); + int splitRowNum = (int) Math.ceil((double) recordCount / numReaders); + int splitRow = SPLIT_ROW.defaultValue(); + if (this.pluginConfig.hasPath(SPLIT_ROW.key())) { + splitRow = this.pluginConfig.getInt(SPLIT_ROW.key()); + } + Set allSplit = new HashSet<>(); + for (int i = 0; i < numReaders; i++) { + int readerStart = i * splitRowNum; + int readerEnd = (int) Math.min((i + 1) * splitRowNum, recordCount); + for (int num = readerStart; num < readerEnd; num += splitRow) { + allSplit.add(new MaxcomputeSourceSplit(num, Math.min(splitRow, readerEnd - num))); + } + } + assignedSplits.forEach(allSplit::remove); + addSplitChangeToPendingAssignments(allSplit); + log.debug("Assigned {} to {} readers.", allSplit, numReaders); + log.info("Calculated splits successfully, the size of splits is {}.", allSplit.size()); + } + + private void addSplitChangeToPendingAssignments(Collection newSplits) { + for (MaxcomputeSourceSplit split : newSplits) { + int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism(); + pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>()) + .add(split); + } + } + + private void assignPendingSplits() { + // Check if there's any pending splits for given readers + for (int pendingReader : enumeratorContext.registeredReaders()) { + // Remove pending assignment for the reader + final Set pendingAssignmentForReader = + pendingSplits.remove(pendingReader); + + if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) { + // Mark pending splits as already assigned + assignedSplits.addAll(pendingAssignmentForReader); + // Assign pending splits to reader + log.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader); + enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader)); + } + enumeratorContext.signalNoMoreSplits(pendingReader); + } + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java new file mode 100644 index 00000000000..1abcb000ca9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import java.io.Serializable; +import java.util.Set; + +public class MaxcomputeSourceState implements Serializable { + private Set assignedSplit; + + public MaxcomputeSourceState(Set assignedSplit) { + this.assignedSplit = assignedSplit; + } + + public Set getAssignedSplit() { + return assignedSplit; + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java new file mode 100644 index 00000000000..fa6382c4f66 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.util; + +import static com.aliyun.odps.OdpsType.ARRAY; +import static com.aliyun.odps.OdpsType.BIGINT; +import static com.aliyun.odps.OdpsType.BINARY; +import static com.aliyun.odps.OdpsType.BOOLEAN; +import static com.aliyun.odps.OdpsType.DATE; +import static com.aliyun.odps.OdpsType.DECIMAL; +import static com.aliyun.odps.OdpsType.DOUBLE; +import static com.aliyun.odps.OdpsType.FLOAT; +import static com.aliyun.odps.OdpsType.INT; +import static com.aliyun.odps.OdpsType.MAP; +import static com.aliyun.odps.OdpsType.SMALLINT; +import static com.aliyun.odps.OdpsType.STRING; +import static com.aliyun.odps.OdpsType.TIMESTAMP; +import static com.aliyun.odps.OdpsType.TINYINT; +import static com.aliyun.odps.OdpsType.VOID; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.aliyun.odps.Column; +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.Table; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.data.ArrayRecord; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.type.ArrayTypeInfo; +import com.aliyun.odps.type.DecimalTypeInfo; +import com.aliyun.odps.type.MapTypeInfo; +import com.aliyun.odps.type.StructTypeInfo; +import com.aliyun.odps.type.TypeInfo; +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +@Slf4j +public class MaxcomputeTypeMapper implements Serializable { + + private static SeaTunnelDataType maxcomputeType2SeaTunnelType(TypeInfo typeInfo) { + switch (typeInfo.getOdpsType()) { + case BIGINT: + return BasicType.LONG_TYPE; + case DOUBLE: + return BasicType.DOUBLE_TYPE; + case BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case DECIMAL: + return mappingDecimalType((DecimalTypeInfo) typeInfo); + case MAP: + return mappingMapType((MapTypeInfo) typeInfo); + case ARRAY: + return mappingListType((ArrayTypeInfo) typeInfo); + case VOID: + return BasicType.VOID_TYPE; + case TINYINT: + case SMALLINT: + case INT: + return BasicType.INT_TYPE; + case FLOAT: + return BasicType.FLOAT_TYPE; + case CHAR: + case VARCHAR: + case STRING: + return BasicType.STRING_TYPE; + case DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case TIMESTAMP: + case DATETIME: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case BINARY: + return PrimitiveByteArrayType.INSTANCE; + case STRUCT: + return mappingStructType((StructTypeInfo) typeInfo); + case INTERVAL_DAY_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case INTERVAL_YEAR_MONTH: + default: + throw new MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, String.format( + "Doesn't support Maxcompute type '%s' .", + typeInfo.getTypeName())); + } + } + + private static DecimalType mappingDecimalType(DecimalTypeInfo decimalTypeInfo) { + return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); + } + + private static MapType mappingMapType(MapTypeInfo mapTypeInfo) { + return new MapType(maxcomputeType2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()), maxcomputeType2SeaTunnelType(mapTypeInfo.getValueTypeInfo())); + } + + private static ArrayType mappingListType(ArrayTypeInfo arrayTypeInfo) { + switch (arrayTypeInfo.getOdpsType()) { + case BOOLEAN: + return ArrayType.BOOLEAN_ARRAY_TYPE; + case INT: + return ArrayType.INT_ARRAY_TYPE; + case BIGINT: + return ArrayType.LONG_ARRAY_TYPE; + case FLOAT: + return ArrayType.FLOAT_ARRAY_TYPE; + case DOUBLE: + return ArrayType.DOUBLE_ARRAY_TYPE; + case STRING: + return ArrayType.STRING_ARRAY_TYPE; + default: + throw new MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, String.format( + "Doesn't support Maxcompute type '%s' .", + arrayTypeInfo.getTypeName())); + } + } + + private static SeaTunnelRowType mappingStructType(StructTypeInfo structType) { + List fields = structType.getFieldTypeInfos(); + List fieldNames = new ArrayList<>(fields.size()); + List> fieldTypes = new ArrayList<>(fields.size()); + for (TypeInfo field : fields) { + fieldNames.add(field.getTypeName()); + fieldTypes.add(maxcomputeType2SeaTunnelType(field)); + } + return new SeaTunnelRowType(fieldNames.toArray(new String[0]), + fieldTypes.toArray(new SeaTunnelDataType[0])); + } + + private static OdpsType seaTunnelType2MaxcomputeType(SeaTunnelDataType seaTunnelDataType) { + switch (seaTunnelDataType.getSqlType()) { + case ARRAY: + return ARRAY; + case MAP: + return MAP; + case STRING: + return STRING; + case BOOLEAN: + return BOOLEAN; + case TINYINT: + return TINYINT; + case SMALLINT: + return SMALLINT; + case INT: + return INT; + case BIGINT: + return BIGINT; + case FLOAT: + return FLOAT; + case DOUBLE: + return DOUBLE; + case DECIMAL: + return DECIMAL; + case BYTES: + return BINARY; + case DATE: + return DATE; + case TIMESTAMP: + return TIMESTAMP; + case NULL: + return VOID; + case TIME: + default: + throw new MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, String.format( + "Doesn't support SeaTunnelDataType type '%s' .", + seaTunnelDataType.getSqlType())); + } + } + + public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { + Table table = MaxcomputeUtil.getTable(pluginConfig); + TableSchema tableSchema = table.getSchema(); + ArrayList> seaTunnelDataTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + try { + for (int i = 0; i < tableSchema.getColumns().size(); i++) { + fieldNames.add(tableSchema.getColumns().get(i).getName()); + TypeInfo maxcomputeTypeInfo = tableSchema.getColumns().get(i).getTypeInfo(); + SeaTunnelDataType seaTunnelDataType = maxcomputeType2SeaTunnelType(maxcomputeTypeInfo); + seaTunnelDataTypes.add(seaTunnelDataType); + } + } catch (Exception e) { + throw new MaxcomputeConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, e); + } + return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); + } + + public static TableSchema seaTunnelRowType2TableSchema(SeaTunnelRowType seaTunnelRowType) { + TableSchema tableSchema = new TableSchema(); + for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { + OdpsType odpsType = seaTunnelType2MaxcomputeType(seaTunnelRowType.getFieldType(i)); + Column column = new Column(seaTunnelRowType.getFieldName(i), odpsType); + tableSchema.addColumn(column); + } + return tableSchema; + } + + private static Object resolveObject(Object field, SeaTunnelDataType fieldType) { + if (field == null) { + return null; + } + switch (fieldType.getSqlType()) { + case ARRAY: + ArrayList origArray = new ArrayList<>(); + java.util.Arrays.stream(((Record) field).getColumns()).iterator().forEachRemaining(origArray::add); + SeaTunnelDataType elementType = ((ArrayType) fieldType).getElementType(); + switch (elementType.getSqlType()) { + case STRING: + return origArray.toArray(new String[0]); + case BOOLEAN: + return origArray.toArray(new Boolean[0]); + case INT: + return origArray.toArray(new Integer[0]); + case BIGINT: + return origArray.toArray(new Long[0]); + case FLOAT: + return origArray.toArray(new Float[0]); + case DOUBLE: + return origArray.toArray(new Double[0]); + default: + String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType()); + throw new MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel not support this data type now"); + } + case MAP: + HashMap dataMap = new HashMap<>(); + SeaTunnelDataType keyType = ((MapType) fieldType).getKeyType(); + SeaTunnelDataType valueType = ((MapType) fieldType).getValueType(); + HashMap origDataMap = (HashMap) field; + origDataMap.forEach((key, value) -> dataMap.put(resolveObject(key, keyType), resolveObject(value, valueType))); + return dataMap; + case BOOLEAN: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case DECIMAL: + case DATE: + return field; + case STRING: + return field.toString(); + case TINYINT: + return Byte.parseByte(field.toString()); + case SMALLINT: + return Short.parseShort(field.toString()); + case NULL: + return null; + case BYTES: + ByteBuffer buffer = (ByteBuffer) field; + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes, 0, bytes.length); + return bytes; + case TIMESTAMP: + Instant instant = Instant.ofEpochMilli((long) field); + return LocalDateTime.ofInstant(instant, ZoneId.of("+8")); + default: + // do nothing + // never got in there + throw new MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel not support this data type now"); + } + } + + public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType typeInfo) throws SQLException { + List fields = new ArrayList<>(); + SeaTunnelDataType[] seaTunnelDataTypes = typeInfo.getFieldTypes(); + for (int i = 0; i < rs.getColumns().length; i++) { + fields.add(resolveObject(rs.get(i), seaTunnelDataTypes[i])); + } + return new SeaTunnelRow(fields.toArray()); + } + + public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) { + TableSchema tableSchema = seaTunnelRowType2TableSchema(seaTunnelRowType); + ArrayRecord arrayRecord = new ArrayRecord(tableSchema); + for (int i = 0; i < seaTunnelRow.getFields().length; i++) { + arrayRecord.set(i, resolveObject(seaTunnelRow.getField(i), seaTunnelRowType.getFieldType(i))); + } + return arrayRecord; + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java new file mode 100644 index 00000000000..4bf37b95ba1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.util; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.PartitionSpec; +import com.aliyun.odps.Table; +import com.aliyun.odps.account.Account; +import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.tunnel.TableTunnel; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MaxcomputeUtil { + public static Table getTable(Config pluginConfig) { + Odps odps = getOdps(pluginConfig); + Table table = odps.tables().get(pluginConfig.getString(TABLE_NAME.key())); + return table; + } + + public static TableTunnel getTableTunnel(Config pluginConfig) { + Odps odps = getOdps(pluginConfig); + TableTunnel tunnel = new TableTunnel(odps); + return tunnel; + } + + public static Odps getOdps(Config pluginConfig) { + Account account = new AliyunAccount(pluginConfig.getString(ACCESS_ID.key()), pluginConfig.getString(ACCESS_KEY.key())); + Odps odps = new Odps(account); + odps.setEndpoint(pluginConfig.getString(ENDPOINT.key())); + odps.setDefaultProject(pluginConfig.getString(PROJECT.key())); + return odps; + } + + public static TableTunnel.DownloadSession getDownloadSession(Config pluginConfig) { + TableTunnel tunnel = getTableTunnel(pluginConfig); + TableTunnel.DownloadSession session; + try { + if (pluginConfig.hasPath(PARTITION_SPEC.key())) { + PartitionSpec partitionSpec = new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key())); + session = tunnel.createDownloadSession(pluginConfig.getString(PROJECT.key()), pluginConfig.getString(TABLE_NAME.key()), partitionSpec); + } else { + session = tunnel.createDownloadSession(pluginConfig.getString(PROJECT.key()), pluginConfig.getString(TABLE_NAME.key())); + } + } catch (Exception e) { + throw new MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e); + } + return session; + } + + public static void initTableOrPartition(Config pluginConfig) { + Boolean overwrite = OVERWRITE.defaultValue(); + if (pluginConfig.hasPath(OVERWRITE.key())) { + overwrite = pluginConfig.getBoolean(OVERWRITE.key()); + } + try { + Table table = MaxcomputeUtil.getTable(pluginConfig); + if (pluginConfig.hasPath(PARTITION_SPEC.key())) { + PartitionSpec partitionSpec = new PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key())); + if (overwrite) { + try { + table.deletePartition(partitionSpec, true); + } catch (NullPointerException e) { + log.debug("NullPointerException when delete table partition"); + } + } + table.createPartition(partitionSpec, true); + } else { + if (overwrite) { + try { + table.truncate(); + } catch (NullPointerException e) { + log.debug("NullPointerException when truncate table"); + } + } + } + } catch (Exception e) { + throw new MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf new file mode 100644 index 00000000000..4038766194a --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 2 + job.mode = "STREAMING" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + MaxcomputeSource { + accessId="" + accesskey="" + endpoint="" + project="" + table_name="" + #partition_spec="" + #split_row = 10000 + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + sql { + source_table_name = "fake" + sql = "select * from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform +} + +sink { + MaxcomputeSink { + accessId="" + accesskey="" + endpoint="" + project="" + result_table_name="" + #partition_spec="" + #overwrite = false + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java new file mode 100644 index 00000000000..da14d0680c7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper; + +import com.aliyun.odps.Column; +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.data.ArrayRecord; +import com.aliyun.odps.data.Record; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.sql.SQLException; + +public class BasicTypeToOdpsTypeTest { + + private static void testType(String fieldName, SeaTunnelDataType seaTunnelDataType, OdpsType odpsType, Object object) throws SQLException { + SeaTunnelRowType typeInfo = new SeaTunnelRowType(new String[]{ + fieldName + }, new SeaTunnelDataType[]{ + seaTunnelDataType + }); + + ArrayRecord record = new ArrayRecord(new Column[]{ + new Column(fieldName, odpsType) + }); + record.set(fieldName, object); + + SeaTunnelRow seaTunnelRow = MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo); + Record tRecord = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, typeInfo); + + for (int i = 0; i < tRecord.getColumns().length; i++) { + Assertions.assertEquals(record.get(i), tRecord.get(i)); + } + } + + @SneakyThrows + @Test + void testSTRING_TYPE_2_STRING() { + testType("STRING_TYPE_2_STRING", BasicType.STRING_TYPE, OdpsType.STRING, "hello"); + } + + @SneakyThrows + @Test + void testBOOLEAN_TYPE_2_BOOLEAN() { + testType("BOOLEAN_TYPE_2_BOOLEAN", BasicType.BOOLEAN_TYPE, OdpsType.BOOLEAN, Boolean.TRUE); + } + + @SneakyThrows + @Test + void testSHORT_TYPE_2_SMALLINT() { + testType("SHORT_TYPE_2_SMALLINT", BasicType.SHORT_TYPE, OdpsType.SMALLINT, Short.MAX_VALUE); + } + + @SneakyThrows + @Test + void testLONG_TYPE_2_BIGINT() { + testType("LONG_TYPE_2_BIGINT", BasicType.LONG_TYPE, OdpsType.BIGINT, Long.MAX_VALUE); + } + + @SneakyThrows + @Test + void testFLOAT_TYPE_2_FLOAT_TYPE() { + testType("FLOAT_TYPE_2_FLOAT_TYPE", BasicType.FLOAT_TYPE, OdpsType.FLOAT, Float.MAX_VALUE); + } + + @SneakyThrows + @Test + void testDOUBLE_TYPE_2_DOUBLE() { + testType("DOUBLE_TYPE_2_DOUBLE", BasicType.DOUBLE_TYPE, OdpsType.DOUBLE, Double.MAX_VALUE); + } + + @SneakyThrows + @Test + void testVOID_TYPE_2_VOID() { + testType("VOID_TYPE_2_VOID", BasicType.VOID_TYPE, OdpsType.VOID, null); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java new file mode 100644 index 00000000000..53627c2f602 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSourceFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MaxcomputeSourceFactoryTest { + @Test + void optionRule() { + Assertions.assertNotNull((new MaxcomputeSourceFactory()).optionRule()); + Assertions.assertNotNull((new MaxcomputeSinkFactory()).optionRule()); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 7a7a7d3af32..3a0ba4cd4ac 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -65,6 +65,7 @@ connector-slack connector-rabbitmq connector-openmldb + connector-maxcompute diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 5e8134f9d2e..c588db0210f 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -407,6 +407,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-maxcompute + ${project.version} + provided + org.apache.seatunnel connector-cdc-mysql @@ -575,4 +581,4 @@ - + \ No newline at end of file