Skip to content

Commit

Permalink
[INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)
Browse files Browse the repository at this point in the history
  • Loading branch information
thesumery committed Oct 25, 2022
1 parent d8cdae1 commit f8b1efa
Show file tree
Hide file tree
Showing 22 changed files with 2,229 additions and 620 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma

protected JsonDynamicSchemaFormat() {
this.rowDataConverters =
new JsonToRowDataConverters(true, false, TimestampFormat.SQL);
new JsonToRowDataConverters(true, false, TimestampFormat.ISO_8601);
}

/**
Expand Down
8 changes: 8 additions & 0 deletions inlong-sort/sort-connectors/iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@
</includes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.inlong.sort.base</pattern>
<shadedPattern>
org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.base
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.IcebergTableSource;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -49,6 +50,12 @@

import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;

/**
Expand Down Expand Up @@ -109,6 +116,17 @@ private static CatalogTable loadCatalogTable(Context context) {
return GET_CATALOG_TABLE.invoke(context);
}

private static CatalogLoader createCatalogLoader(Map<String, String> tableProps) {
Configuration flinkConf = new Configuration();
tableProps.forEach(flinkConf::setString);

String catalogName = flinkConf.getString(CATALOG_NAME);
Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
return FlinkCatalogFactory.createCatalogLoader(catalogName, tableProps, hadoopConf);
}

private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
Map<String, String> tableProps,
String databaseName,
Expand Down Expand Up @@ -185,15 +203,21 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());

TableLoader tableLoader;
if (catalog != null) {
tableLoader = createTableLoader(catalog, objectPath);
boolean multipleSink = Boolean.parseBoolean(
tableProps.getOrDefault(SINK_MULTIPLE_ENABLE.key(), SINK_MULTIPLE_ENABLE.defaultValue().toString()));
if (multipleSink) {
CatalogLoader catalogLoader = createCatalogLoader(tableProps);
return new IcebergTableSink(null, tableSchema, catalogTable, catalogLoader);
} else {
tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
objectPath.getObjectName());
TableLoader tableLoader;
if (catalog != null) {
tableLoader = createTableLoader(catalog, objectPath);
} else {
tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
objectPath.getObjectName());
}
return new IcebergTableSink(tableLoader, tableSchema, catalogTable, null);
}

return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
}

@Override
Expand All @@ -212,6 +236,13 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);

options.add(SINK_MULTIPLE_ENABLE);
options.add(SINK_MULTIPLE_FORMAT);
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_ADD_COLUMN_POLICY);
options.add(SINK_MULTIPLE_DEL_COLUMN_POLICY);
return options;
}

Expand All @@ -220,3 +251,4 @@ public String factoryIdentifier() {
return FACTORY_IDENTIFIER;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.inlong.sort.iceberg;

import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.flink.FlinkTypeVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;

import java.util.List;
import java.util.stream.Collectors;

public class FlinkTypeToType extends FlinkTypeVisitor<Type> {

private final RowType root;
private int nextId;

public FlinkTypeToType(RowType root) {
this.root = root;
// the root struct's fields use the first ids
this.nextId = root.getFieldCount();
}

private int getNextId() {
int next = nextId;
nextId += 1;
return next;
}

@Override
public Type visit(CharType charType) {
return Types.StringType.get();
}

@Override
public Type visit(VarCharType varCharType) {
return Types.StringType.get();
}

@Override
public Type visit(BooleanType booleanType) {
return Types.BooleanType.get();
}

@Override
public Type visit(BinaryType binaryType) {
return Types.FixedType.ofLength(binaryType.getLength());
}

@Override
public Type visit(VarBinaryType varBinaryType) {
return Types.BinaryType.get();
}

@Override
public Type visit(DecimalType decimalType) {
return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
}

@Override
public Type visit(TinyIntType tinyIntType) {
return Types.IntegerType.get();
}

@Override
public Type visit(SmallIntType smallIntType) {
return Types.IntegerType.get();
}

@Override
public Type visit(IntType intType) {
return Types.IntegerType.get();
}

@Override
public Type visit(BigIntType bigIntType) {
return Types.LongType.get();
}

@Override
public Type visit(FloatType floatType) {
return Types.FloatType.get();
}

@Override
public Type visit(DoubleType doubleType) {
return Types.DoubleType.get();
}

@Override
public Type visit(DateType dateType) {
return Types.DateType.get();
}

@Override
public Type visit(TimeType timeType) {
return Types.TimeType.get();
}

@Override
public Type visit(TimestampType timestampType) {
return Types.TimestampType.withoutZone();
}

@Override
public Type visit(LocalZonedTimestampType localZonedTimestampType) {
return Types.TimestampType.withZone();
}

@Override
public Type visit(ArrayType arrayType) {
Type elementType = arrayType.getElementType().accept(this);
if (arrayType.getElementType().isNullable()) {
return Types.ListType.ofOptional(getNextId(), elementType);
} else {
return Types.ListType.ofRequired(getNextId(), elementType);
}
}

@Override
public Type visit(MultisetType multisetType) {
Type elementType = multisetType.getElementType().accept(this);
return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
}

@Override
public Type visit(MapType mapType) {
// keys in map are not allowed to be null.
Type keyType = mapType.getKeyType().accept(this);
Type valueType = mapType.getValueType().accept(this);
if (mapType.getValueType().isNullable()) {
return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
} else {
return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
}
}

@Override
@SuppressWarnings("ReferenceEquality")
public Type visit(RowType rowType) {
List<NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
boolean isRoot = root == rowType;

List<Type> types = rowType.getFields().stream()
.map(f -> f.getType().accept(this))
.collect(Collectors.toList());

for (int i = 0; i < rowType.getFieldCount(); i++) {
int id = isRoot ? i : getNextId();

RowType.RowField field = rowType.getFields().get(i);
String name = field.getName();
String comment = field.getDescription().orElse(null);

if (field.getType().isNullable()) {
newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
} else {
newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
}
}

return Types.StructType.of(newFields);
}
}
Loading

0 comments on commit f8b1efa

Please sign in to comment.