Skip to content
Permalink
Browse files
fix flink schema and doris schema column mapping (#30)
fix flink schema and doris schema column mapping
  • Loading branch information
JNSimba committed Apr 25, 2022
1 parent 706baa0 commit 73299bf4aa40a98e6e5b64662abb1b587a6ee4bd
Showing 3 changed files with 16 additions and 3 deletions.
@@ -104,6 +104,14 @@ public boolean getUseOldApi() {
return useOldApi;
}

public void setReadFields(String readFields) {
this.readFields = readFields;
}

public void setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
}

public static Builder builder() {
return new Builder();
}
@@ -50,8 +50,8 @@ public DorisRowConverter(RowType rowType) {
* @param record from rowBatch
*/
public GenericRowData convert(List record){
GenericRowData rowData = new GenericRowData(record.size());
for (int i = 0; i < record.size(); i++) {
GenericRowData rowData = new GenericRowData(deserializationConverters.length);
for (int i = 0; i < deserializationConverters.length ; i++) {
rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
}
return rowData;
@@ -38,11 +38,12 @@
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* The {@link DorisDynamicTableSource} is used during planning.
@@ -76,6 +77,10 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
.map(item->String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));

if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {

0 comments on commit 73299bf

Please sign in to comment.