Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connector doris #31

Merged
merged 7 commits into from Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,14 +35,14 @@ public interface Writer<InputT, CommT, WriterStateT> extends Serializable, Close
*
* @param element Input data from upstream.
*/
void write(InputT element);
void write(InputT element) throws IOException;

/**
* Flush buffered input data to target source.
*
* @param endOfInput Flag indicates if all input data are delivered.
*/
void flush(boolean endOfInput);
void flush(boolean endOfInput) throws IOException;

/**
* Prepare commit information before snapshotting when checkpoint is triggerred.
Expand Down
Expand Up @@ -44,7 +44,7 @@ public interface WriterGenerator<InputT, CommitT, WriterStateT> extends Serializ
* @param commonConfiguration Common options.
* @param writerConfiguration Options for writer.
*/
void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration);
void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception;

/**
* Create a writer for processing elements.
Expand Down
Expand Up @@ -21,7 +21,13 @@
import lombok.Setter;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Setter
Expand All @@ -35,6 +41,11 @@ public Row(Object[] fields) {
this.fields = fields;
}

public Row(int arity) {
this.kind = RowKind.fromByteValue(RowKind.INSERT.toByteValue());
this.fields = new Object[arity];
}

public Row(byte kind, Object[] fields) {
this.kind = RowKind.fromByteValue(kind);
this.fields = fields;
Expand All @@ -52,6 +63,10 @@ public Object[] getFields() {
return fields;
}

public int getArity() {
return fields.length;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -70,4 +85,69 @@ public int hashCode() {
result = 31 * result + Arrays.hashCode(fields);
return result;
}

public boolean getBoolean(int pos) {
return (Boolean) this.fields[pos];
}

public byte getByte(int pos) {
return (Byte) this.fields[pos];
}

public short getShort(int pos) {
return (Short) this.fields[pos];
}

public int getInt(int pos) {
return (Integer) this.fields[pos];
}

public Date getDate(int pos) {
return (Date) this.fields[pos];
}

public long getLong(int pos) {
return (Long) this.fields[pos];
}

public BigInteger getBigInteger(int pos) {
return (BigInteger) this.fields[pos];
}

public float getFloat(int pos) {
return (Float) this.fields[pos];
}

public double getDouble(int pos) {
return (Double) this.fields[pos];
}

public String getString(int pos) {
return (String) this.fields[pos];
}

public BigDecimal getDecimal(int pos, int precision, int scale) {
return (BigDecimal) this.fields[pos];
}

public Timestamp getTimestamp(int pos, int precision) {
return (Timestamp) this.fields[pos];
}

public byte[] getBinary(int pos) {
return ((byte[]) this.fields[pos]);
}

public List getArray(int pos) {
return (List) this.fields[pos];
}

public Map getMap(int pos) {
return (Map) this.fields[pos];
}

public boolean isNullAt(int pos) {
return this.fields[pos] == null;
}

}
Expand Up @@ -20,6 +20,11 @@
import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.ReaderOptions;

import com.alibaba.fastjson.TypeReference;

import java.util.List;
import java.util.Map;

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX;

Expand Down Expand Up @@ -47,4 +52,8 @@ public interface FakeReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<Boolean> USE_BITSAIL_TYPE =
key(READER_PREFIX + "use_bitsail_type")
.defaultValue(true);

ConfigOption<List<Map<String, String>>> FIXED_COLUMNS =
key(READER_PREFIX + "fixed_columns")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the meaning of fixed_columns?

Copy link
Collaborator Author

@BlockLiu BlockLiu Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Means the column(s) defined here has fixed value.
Format is:
[ { "name": "column_name", "fixed_value": "a value" } ]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this option because we want some fixed value columns sometimes. For example in doris connector test, the FakeSource needs to produce records with the same partition value(s).

Already add comment to this option.

.onlyReference(new TypeReference<List<Map<String, String>>>(){});
}
Expand Up @@ -32,6 +32,7 @@

import com.github.javafaker.Faker;
import com.google.common.annotations.VisibleForTesting;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
Expand All @@ -46,6 +47,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -74,6 +76,8 @@ public class FakeSource extends InputFormatPlugin<Row, InputSplit> implements Re

private boolean useBitSailType;

private Map<String, Object> fixedObjects;

@Override
public Row buildRow(Row reuse, String mandatoryEncoding) throws BitSailException {
rateLimiter.acquire();
Expand All @@ -84,11 +88,61 @@ public Row buildRow(Row reuse, String mandatoryEncoding) throws BitSailException
private Row createRow(Row reuse) {
for (int index = 0; index < columnInfos.size(); index++) {
String fieldName = columnInfos.get(index).getName();
reuse.setField(index, createColumn(rowTypeInfo.getTypeAt(index), uniqueFields.get(fieldName), useBitSailType));
if (fixedObjects.containsKey(fieldName)) {
reuse.setField(index, fixedObjects.get(fieldName));
} else {
reuse.setField(index, createColumn(rowTypeInfo.getTypeAt(index), uniqueFields.get(fieldName), useBitSailType));
}
}
return reuse;
}

private Map<String, Object> initFixedObject(List<Map<String, String>> fixedColumns,
List<ColumnInfo> columnInfos,
RowTypeInfo rowTypeInfo,
boolean useBitSailType) {
Map<String, Object> fixedColumnMap = new HashMap<>();
if (Objects.isNull(fixedColumns)) {
return fixedColumnMap;
}

Map<String, Integer> name2IndexMap = new HashMap<>();
for (int i = 0; i < columnInfos.size(); ++i) {
name2IndexMap.put(columnInfos.get(i).getName(), i);
}

for (Map<String, String> column : fixedColumns) {
String columnName = column.get("name");
String fixedValue = column.get("fixed_value");
int index = name2IndexMap.get(columnName);

Object fixedColumn = createFixedColumn(rowTypeInfo.getTypeAt(index), fixedValue, useBitSailType);
fixedColumnMap.put(columnName, fixedColumn);
}
return fixedColumnMap;
}

@SneakyThrows
private Object createFixedColumn(TypeInformation<?> typeInformation, String fixedValue, Boolean useBitSailType) {
if (PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO.equals(typeInformation)) {
return useBitSailType ? new LongColumn(fixedValue) : Long.parseLong(fixedValue);
}
if (PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO.equals(typeInformation)) {
return useBitSailType ? new StringColumn(fixedValue) : fixedValue;
}
if (PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO.equals(typeInformation)) {
return useBitSailType ? new DoubleColumn(fixedValue) : Double.parseDouble(fixedValue);
}
if (PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO.equals(typeInformation)) {
return useBitSailType ? new BytesColumn(fixedValue.getBytes()) : fixedValue.getBytes();
}
if (PrimitiveColumnTypeInfo.DATE_COLUMN_TYPE_INFO.equals(typeInformation)) {
Date fixedDate = new SimpleDateFormat("yyyy-MM-dd").parse(fixedValue);
return useBitSailType ? new DateColumn(fixedDate) : fixedDate;
}
throw new RuntimeException("Unsupported type " + typeInformation);
}

@SuppressWarnings("checkstyle:MagicNumber")
private Object createColumn(TypeInformation<?> typeInformation, Set<String> existValues, Boolean useBitSailType) {
boolean isNull = randomNullInt > random.nextInt(10);
Expand Down Expand Up @@ -158,6 +212,9 @@ public void initPlugin() throws Exception {
if (!uniqueFields.isEmpty() && totalCount > 1000) {
LOG.warn("Unique fields is set and total count is larger than 1000, which may cause OOM problem.");
}

this.fixedObjects = initFixedObject(inputSliceConfig.get(FakeReaderOptions.FIXED_COLUMNS), columnInfos,
rowTypeInfo, useBitSailType);
}

@Override
Expand Down
59 changes: 59 additions & 0 deletions bitsail-connectors/connector-doris/pom.xml
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-connectors</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-doris</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<mysql.version>5.1.49</mysql.version>
</properties>

<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!-- doris requirement mysql client version greater than 5.1 -->
<version>${mysql.version}</version>
</dependency>

<!-- test -->
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-test</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-fake</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.doris;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Objects;

@Data
@NoArgsConstructor
public class DorisConnectionHolder {
private Connection dorisConnection;
private Statement statement;

public DorisConnectionHolder(Connection dorisConnection, Statement statement) {
this.dorisConnection = dorisConnection;
this.statement = statement;
}

public void closeDorisConnection() throws IOException {
try {
if (Objects.nonNull(statement)) {
statement.close();
}
if (Objects.nonNull(dorisConnection)) {
dorisConnection.close();
}
statement = null;
dorisConnection = null;
} catch (Exception e) {
throw new IOException("failed to close statement or doris connection", e);
}
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.doris.committer;

import com.bytedance.bitsail.connector.doris.config.DorisOptions;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class DorisCommittable implements Serializable {
//TODO support 2PC commit
public DorisOptions dorisOptions;
}