Skip to content
Permalink
Browse files
[INLONG-4141][Sort] Sort lightwieght support load data from Pulsar (#…
  • Loading branch information
thexiay authored and healchow committed May 18, 2022
1 parent 0a08141 commit cc508f5c77516e3c3420475ee6c451602d9fe052
Showing 13 changed files with 1,703 additions and 1 deletion.
@@ -26,6 +26,7 @@
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
@@ -50,6 +51,7 @@
@JsonSubTypes.Type(value = KafkaExtractNode.class, name = "kafkaExtract"),
@JsonSubTypes.Type(value = PostgresExtractNode.class, name = "postgresExtract"),
@JsonSubTypes.Type(value = FileSystemExtractNode.class, name = "fileSystemExtract"),
@JsonSubTypes.Type(value = PulsarExtractNode.class, name = "pulsarExtract"),
@JsonSubTypes.Type(value = TransformNode.class, name = "baseTransform"),
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
@JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.node.extract;

import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;

@EqualsAndHashCode(callSuper = true)
@JsonTypeName("pulsarExtract")
@Data
public class PulsarExtractNode extends ExtractNode {
private static final long serialVersionUID = 1L;

@Nonnull
@JsonProperty("topic")
private String topic;
@Nonnull
@JsonProperty("adminUrl")
private String adminUrl;
@Nonnull
@JsonProperty("serviceUrl")
private String serviceUrl;
@Nonnull
@JsonProperty("format")
private Format format;

@JsonProperty("scanStartupMode")
private String scanStartupMode;

@JsonProperty("primaryKey")
private String primaryKey;

@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
@Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
@JsonProperty("properties") Map<String, String> properties,
@Nonnull @JsonProperty("topic") String topic,
@Nonnull @JsonProperty("adminUrl") String adminUrl,
@Nonnull @JsonProperty("serviceUrl") String serviceUrl,
@Nonnull @JsonProperty("format") Format format,
@Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
@JsonProperty("primaryKey") String primaryKey) {
super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null.");
this.adminUrl = Preconditions.checkNotNull(adminUrl, "pulsar adminUrl is null.");
this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null.");
this.format = Preconditions.checkNotNull(format, "pulsar format is null.");
this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode,
"pulsar scanStartupMode is null.");
this.primaryKey = primaryKey;
}

/**
* generate table options
*
* @return options
*/
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
if (StringUtils.isEmpty(this.primaryKey)) {
options.put("connector", "pulsar-inlong");
options.putAll(format.generateOptions(false));
} else {
options.put("connector", "upsert-pulsar-inlong");
options.putAll(format.generateOptions(true));
}
options.put("generic", "true");
options.put("service-url", serviceUrl);
options.put("admin-url", adminUrl);
options.put("topic", topic);
options.put("scan.startup.mode", scanStartupMode);

return options;
}

@Override
public String genTableName() {
return String.format("table_%s", super.getId());
}

@Override
public String getPrimaryKey() {
return primaryKey;
}

@Override
public List<FieldInfo> getPartitionFields() {
return super.getPartitionFields();
}
}
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.node.extract;

import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.Format;

import java.util.Arrays;
import java.util.List;

/**
* Test for {@link PulsarExtractNode}
*/
public class PulsarExtractNodeTest extends SerializeBaseTest<Node> {

@Override
public Node getTestObject() {
List<FieldInfo> fields = Arrays.asList(
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo()));
Format format = new CsvFormat();
return new PulsarExtractNode("2",
"pulsar_input",
fields,
null,
null,
"persistent://public/default/test_stream",
"http://localhost:8080",
"pulsar://localhost:6650",
format,
"earliest",
null);
}
}
@@ -69,6 +69,11 @@
<artifactId>flink-table-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${flink.scala.binary.version}</artifactId>
@@ -139,6 +144,12 @@
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
@@ -156,6 +167,7 @@
</exclusions>
</dependency>


</dependencies>

</project>

0 comments on commit cc508f5

Please sign in to comment.