Skip to content

Commit

Permalink
Merge branch 'dev' into develop2
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Oct 18, 2022
2 parents 9265e26 + d6f8268 commit fff1287
Show file tree
Hide file tree
Showing 23 changed files with 185 additions and 57 deletions.
24 changes: 22 additions & 2 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| partition_key | string | no | - |
| partition | int | no | - |
| assign_partitions | list | no | - |
| transaction_prefix | string | no | - |
Expand Down Expand Up @@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### partition_key [string]

Configure which field is used as the key of the kafka message.

For example, if you want to use value of a field from upstream data as key, you can assign it to the field name.

Upstream data is the following:

| name | age | data |
| ---- | ---- | ------------- |
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

If the field name does not exist in the upstream data, the configured parameter will be used as the key.

### partition [int]

We can specify the partition, all messages will be sent to this partition.
Expand Down Expand Up @@ -93,7 +111,9 @@ sink {

### change log
#### next version

- Add kafka sink doc
- New feature : Kafka specified partition to send
- New feature : Determine the partition that kafka send based on the message content
- New feature : Determine the partition that kafka send message based on the message content
- New feature : Configure which field is used as the key of the kafka message

10 changes: 10 additions & 0 deletions docs/en/connector-v2/sink/common-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,27 @@
| name | type | required | default value |
| ----------------- | ------ | -------- | ------------- |
| source_table_name | string | no | - |
| parallelism | int | no | - |


### source_table_name [string]

When `source_table_name` is not specified, the current plug-in processes the data set `dataset` output by the previous plugin in the configuration file;

When `source_table_name` is specified, the current plug-in is processing the data set corresponding to this parameter.

### parallelism [int]

When `parallelism` is not specified, the `parallelism` in env is used by default.

When parallelism is specified, it will override the parallelism in env.

## Examples

```bash
source {
FakeSourceStream {
parallelism = 2
result_table_name = "fake"
field_name = "name,age"
}
Expand All @@ -37,6 +46,7 @@ transform {

sink {
console {
parallelism = 3
source_table_name = "fake_name"
}
}
Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/common-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
| name | type | required | default value |
| ----------------- | ------ | -------- | ------------- |
| result_table_name | string | no | - |
| parallelism | int | no | - |

### result_table_name [string]

When `result_table_name` is not specified, the data processed by this plugin will not be registered as a data set `(dataStream/dataset)` that can be directly accessed by other plugins, or called a temporary table `(table)` ;

When `result_table_name` is specified, the data processed by this plugin will be registered as a data set `(dataStream/dataset)` that can be directly accessed by other plugins, or called a temporary table `(table)` . The data set `(dataStream/dataset)` registered here can be directly accessed by other plugins by specifying `source_table_name` .

### parallelism [int]

When `parallelism` is not specified, the `parallelism` in env is used by default.

When parallelism is specified, it will override the parallelism in env.

## Example

```bash
Expand Down
2 changes: 1 addition & 1 deletion docs/en/intro/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ SeaTunnel have lots of users which you can find more information in [users](http
<br/><br/>
<img src="https://landscape.cncf.io/images/left-logo.svg" width="150" alt=""/>&nbsp;&nbsp;<img src="https://landscape.cncf.io/images/right-logo.svg" width="200" alt=""/>
<br/><br/>
SeaTunnel enriches the <a href="https://landscape.cncf.io/landscape=observability-and-analysis&license=apache-license-2-0">CNCF CLOUD NATIVE Landscape.</a >
SeaTunnel enriches the <a href="https://landscape.cncf.io/card-mode?category=streaming-messaging&license=apache-license-2-0&grouping=category&selected=sea-tunnal">CNCF CLOUD NATIVE Landscape.</a >
</p >

## What's More
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ public final class Constants {

public static final String SOURCE_SERIALIZATION = "source.serialization";

public static final String SOURCE_PARALLELISM = "parallelism";

public static final String HDFS_ROOT = "hdfs.root";

public static final String HDFS_USER = "hdfs.user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -39,7 +39,7 @@ public class FileSystemUtils {
public static Configuration CONF;

public static FileSystem getFileSystem(@NonNull String path) throws IOException {
FileSystem fileSystem = FileSystem.get(new File(path).toURI(), CONF);
FileSystem fileSystem = FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF);
fileSystem.setWriteChecksum(false);
return fileSystem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
Expand All @@ -47,6 +50,18 @@

@Slf4j
public abstract class AbstractReadStrategy implements ReadStrategy {
protected static final String[] TYPE_ARRAY_STRING = new String[0];
protected static final Boolean[] TYPE_ARRAY_BOOLEAN = new Boolean[0];
protected static final Byte[] TYPE_ARRAY_BYTE = new Byte[0];
protected static final Short[] TYPE_ARRAY_SHORT = new Short[0];
protected static final Integer[] TYPE_ARRAY_INTEGER = new Integer[0];
protected static final Long[] TYPE_ARRAY_LONG = new Long[0];
protected static final Float[] TYPE_ARRAY_FLOAT = new Float[0];
protected static final Double[] TYPE_ARRAY_DOUBLE = new Double[0];
protected static final BigDecimal[] TYPE_ARRAY_BIG_DECIMAL = new BigDecimal[0];
protected static final LocalDate[] TYPE_ARRAY_LOCAL_DATE = new LocalDate[0];
protected static final LocalDateTime[] TYPE_ARRAY_LOCAL_DATETIME = new LocalDateTime[0];

protected HadoopConf hadoopConf;
protected SeaTunnelRowType seaTunnelRowType;
protected SeaTunnelRowType seaTunnelRowTypeWithPartition;
Expand Down Expand Up @@ -142,7 +157,7 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea
return seaTunnelRowType;
}
// get all names of partitions fields
String[] partitionNames = partitionsMap.keySet().toArray(new String[0]);
String[] partitionNames = partitionsMap.keySet().toArray(TYPE_ARRAY_STRING);
// initialize data type for partition fields
SeaTunnelDataType<?>[] partitionTypes = new SeaTunnelDataType<?>[partitionNames.length];
Arrays.fill(partitionTypes, BasicType.STRING_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -236,7 +235,7 @@ private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(TypeDescription typeD
return new MapType<>(orcDataType2SeaTunnelDataType(keyType), orcDataType2SeaTunnelDataType(valueType));
case STRUCT:
List<TypeDescription> children = typeDescription.getChildren();
String[] fieldNames = typeDescription.getFieldNames().toArray(new String[0]);
String[] fieldNames = typeDescription.getFieldNames().toArray(TYPE_ARRAY_STRING);
SeaTunnelDataType<?>[] fieldTypes = children.stream().map(this::orcDataType2SeaTunnelDataType).toArray(SeaTunnelDataType<?>[]::new);
return new SeaTunnelRowType(fieldNames, fieldTypes);
default:
Expand Down Expand Up @@ -533,15 +532,15 @@ private Object[] readLongListVector(LongColumnVector longVector, TypeDescription
}
}
if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
return longList.toArray(new Boolean[0]);
return longList.toArray(TYPE_ARRAY_BOOLEAN);
} else if (childType.getCategory() == TypeDescription.Category.INT) {
return longList.toArray(new Integer[0]);
return longList.toArray(TYPE_ARRAY_INTEGER);
} else if (childType.getCategory() == TypeDescription.Category.BYTE) {
return longList.toArray(new Byte[0]);
return longList.toArray(TYPE_ARRAY_BYTE);
} else if (childType.getCategory() == TypeDescription.Category.SHORT) {
return longList.toArray(new Short[0]);
return longList.toArray(TYPE_ARRAY_SHORT);
} else {
return longList.toArray(new Long[0]);
return longList.toArray(TYPE_ARRAY_LONG);
}
}

Expand All @@ -567,9 +566,9 @@ private Object[] readDoubleListVector(DoubleColumnVector doubleVec, TypeDescript
}
}
if (colType.getCategory() == TypeDescription.Category.FLOAT) {
return doubleList.toArray(new Float[0]);
return doubleList.toArray(TYPE_ARRAY_FLOAT);
} else {
return doubleList.toArray(new Double[0]);
return doubleList.toArray(TYPE_ARRAY_DOUBLE);
}
}

Expand Down Expand Up @@ -599,7 +598,7 @@ private Object[] readBytesListVector(BytesColumnVector bytesVec, TypeDescription
}
}
if (childType.getCategory() == TypeDescription.Category.STRING) {
return bytesValList.toArray(new String[0]);
return bytesValList.toArray(TYPE_ARRAY_STRING);
} else {
return bytesValList.toArray();
}
Expand All @@ -622,7 +621,7 @@ private Object[] readDecimalListVector(DecimalColumnVector decimalVector, int of
decimalList.add(null);
}
}
return decimalList.toArray(new BigDecimal[0]);
return decimalList.toArray(TYPE_ARRAY_BIG_DECIMAL);
}

private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
Expand Down Expand Up @@ -651,9 +650,9 @@ private Object[] readTimestampListVector(TimestampColumnVector timestampVector,
}
}
if (childType.getCategory() == TypeDescription.Category.DATE) {
return timestampList.toArray(new LocalDate[0]);
return timestampList.toArray(TYPE_ARRAY_LOCAL_DATE);
} else {
return timestampList.toArray(new LocalDateTime[0]);
return timestampList.toArray(TYPE_ARRAY_LOCAL_DATETIME);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,31 @@ public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
}

private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
if (field == null) {
return null;
}
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayList<Object> origArray = new ArrayList<>();
((GenericData.Array<?>) field).iterator().forEachRemaining(origArray::add);
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
switch (elementType.getSqlType()) {
case STRING:
return origArray.toArray(new String[0]);
return origArray.toArray(TYPE_ARRAY_STRING);
case BOOLEAN:
return origArray.toArray(new Boolean[0]);
return origArray.toArray(TYPE_ARRAY_BOOLEAN);
case TINYINT:
return origArray.toArray(new Byte[0]);
return origArray.toArray(TYPE_ARRAY_BYTE);
case SMALLINT:
return origArray.toArray(new Short[0]);
return origArray.toArray(TYPE_ARRAY_SHORT);
case INT:
return origArray.toArray(new Integer[0]);
return origArray.toArray(TYPE_ARRAY_INTEGER);
case BIGINT:
return origArray.toArray(new Long[0]);
return origArray.toArray(TYPE_ARRAY_LONG);
case FLOAT:
return origArray.toArray(new Float[0]);
return origArray.toArray(TYPE_ARRAY_FLOAT);
case DOUBLE:
return origArray.toArray(new Double[0]);
return origArray.toArray(TYPE_ARRAY_DOUBLE);
default:
String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType());
throw new UnsupportedOperationException(errorMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ public class Config {
*/
public static final String ASSIGN_PARTITIONS = "assign_partitions";

/**
* Determine the key of the kafka send partition
*/
public static final String PARTITION_KEY = "partition_key";

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
}
}

@Override
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
//if the key is null, kafka will send message to a random partition
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,13 @@ public interface SeaTunnelRowSerializer<K, V> {
* @return kafka record.
*/
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);

/**
* Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
*
* @param key String
* @param row seatunnel row
* @return kafka record.
*/
ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
}
Loading

0 comments on commit fff1287

Please sign in to comment.