Skip to content

Commit

Permalink
[INLONG-6274][Sort] Rename multiple sink operator to make it more fit
Browse files Browse the repository at this point in the history
the scene
  • Loading branch information
thexiay committed Oct 25, 2022
1 parent 6936c68 commit 0319dce
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleStreamWriter;
import org.apache.inlong.sort.iceberg.sink.multiple.MultipleWriteResult;
import org.apache.inlong.sort.iceberg.sink.multiple.RecordWithSchema;
import org.apache.inlong.sort.iceberg.sink.multiple.WholeDatabaseMigrationOperator;
import org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,7 +94,7 @@ public class FlinkSink {
private static final String ICEBERG_MULTIPLE_FILES_COMMITTER_NAME =
IcebergMultipleFilesCommiter.class.getSimpleName();
private static final String ICEBERG_WHOLE_DATABASE_MIGRATION_NAME =
WholeDatabaseMigrationOperator.class.getSimpleName();
DynamicSchemaHandleOperator.class.getSimpleName();

private FlinkSink() {
}
Expand Down Expand Up @@ -218,7 +218,7 @@ public Builder tableSchema(TableSchema newTableSchema) {

/**
* The catalog loader is used for loading tables in {@link IcebergMultipleStreamWriter} and
* {@link WholeDatabaseMigrationOperator} lazily, we need this loader because in multiple sink scene which table
* {@link DynamicSchemaHandleOperator} lazily, we need this loader because in multiple sink scene which table
* to load is determined in runtime, so we should hold a {@link org.apache.iceberg.catalog.Catalog} at runtime.
*
* @param catalogLoader to load iceberg catalog inside tasks.
Expand Down Expand Up @@ -518,7 +518,7 @@ private SingleOutputStreamOperator<MultipleWriteResult> appendMultipleWriter(Dat
// upsert mode will be initialized at runtime

int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
WholeDatabaseMigrationOperator routeOperator = new WholeDatabaseMigrationOperator(
DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator(
catalogLoader,
multipleSinkOption);
SingleOutputStreamOperator<RecordWithSchema> routeStream = input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@

import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;

public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<RecordWithSchema>
public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema>
implements OneInputStreamOperator<RowData, RecordWithSchema>, ProcessingTimeCallback {

private static final Logger LOG = LoggerFactory.getLogger(WholeDatabaseMigrationOperator.class);
private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
private static final long HELPER_DEBUG_INTERVEL = 10 * 60 * 1000;

private final ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -85,7 +85,7 @@ public class WholeDatabaseMigrationOperator extends AbstractStreamOperator<Recor
// blacklist to filter schema update failed table
private transient Set<TableIdentifier> blacklist;

public WholeDatabaseMigrationOperator(CatalogLoader catalogLoader,
public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
MultipleSinkOption multipleSinkOption) {
this.catalogLoader = catalogLoader;
this.multipleSinkOption = multipleSinkOption;
Expand Down Expand Up @@ -117,8 +117,7 @@ public void close() throws Exception {

@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
String wholeData = element.getValue().getString(0).toString();
JsonNode jsonNode = objectMapper.readTree(wholeData);
JsonNode jsonNode = dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));

TableIdentifier tableId = parseId(jsonNode);
if (blacklist.contains(tableId)) {
Expand Down

0 comments on commit 0319dce

Please sign in to comment.