Skip to content

Commit

Permalink
[HUDI-3953]Flink Hudi module should support low-level source and sink…
Browse files Browse the repository at this point in the history
… api
  • Loading branch information
jerryyue authored and didi committed Apr 29, 2022
1 parent 6ec039b commit 1e9b3ac
Show file tree
Hide file tree
Showing 5 changed files with 490 additions and 2 deletions.
Expand Up @@ -41,23 +41,28 @@
import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.HoodiePipeline;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down
@@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util;

import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTableFactory;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* A tool class to construct hoodie flink pipeline.
*
* <p>How to use ?</p>
* Method {@link #builder(String)} returns a pipeline builder. The builder
* can then define the hudi table columns, primary keys and partitions.
*
* <p>An example:</p>
* <pre>
* HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
* DataStreamSink<?> sinkStream = builder
* .column("f0 int")
* .column("f1 varchar(10)")
* .column("f2 varchar(20)")
* .pk("f0,f1")
* .partition("f2")
* .sink(input, false);
* </pre>
*/
public class HoodiePipeline {

private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);

/**
* Returns the builder for hoodie pipeline construction.
*/
public static Builder builder(String tableName) {
return new Builder(tableName);
}

/**
* Builder for hudi source/sink pipeline construction.
*/
public static class Builder {
private final String tableName;
private final List<String> columns;
private final Map<String, String> options;

private String pk;
private List<String> partitions;

public Builder self() {
return this;
}

private Builder(String tableName) {
this.tableName = tableName;
this.columns = new ArrayList<>();
this.options = new HashMap<>();
this.partitions = new ArrayList<>();
}

/**
* Add a table column definition.
*
* @param column the column format should be in the form like 'f0 int'
*/
public Builder column(String column) {
this.columns.add(column);
return self();
}

/**
* Add primary keys.
*/
public Builder pk(String... pks) {
this.pk = String.join(",", pks);
return self();
}

/**
* Add partition fields.
*/
public Builder partition(String... partitions) {
this.partitions = new ArrayList<>(Arrays.asList(partitions));
return self();
}

/**
* Add a config option.
*/
public Builder option(ConfigOption<?> option, Object val) {
this.options.put(option.key(), val.toString());
return self();
}

public Builder option(String key, Object val) {
this.options.put(key, val.toString());
return self();
}

public Builder options(Map<String, String> options) {
this.options.putAll(options);
return self();
}

public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
TableDescriptor tableDescriptor = getTableDescriptor();
return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
}

public TableDescriptor getTableDescriptor() {
EnvironmentSettings environmentSettings = EnvironmentSettings
.newInstance()
.build();
TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings);
String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions);
tableEnv.executeSql(sql);
String currentCatalog = tableEnv.getCurrentCatalog();
ResolvedCatalogTable catalogTable = null;
String defaultDatabase = null;
try {
Catalog catalog = tableEnv.getCatalog(currentCatalog).get();
defaultDatabase = catalog.getDefaultDatabase();
catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName));
} catch (TableNotExistException e) {
throw new HoodieException("Create table " + this.tableName + " exception", e);
}
ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName);
return new TableDescriptor(tableId, catalogTable);
}

public DataStream<RowData> source(StreamExecutionEnvironment execEnv) {
TableDescriptor tableDescriptor = getTableDescriptor();
return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getResolvedCatalogTable());
}
}

private static String getCreateHoodieTableDDL(
String tableName,
List<String> fields,
Map<String, String> options,
String pkField,
List<String> partitionField) {
StringBuilder builder = new StringBuilder();
builder.append("create table ").append(tableName).append("(\n");
for (String field : fields) {
builder.append(" ").append(field).append(",\n");
}
builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n")
.append(")\n");
if (!partitionField.isEmpty()) {
String partitons = partitionField
.stream()
.map(partitionName -> "`" + partitionName + "`")
.collect(Collectors.joining(","));
builder.append("PARTITIONED BY (").append(partitons).append(")\n");
}
builder.append("with (\n"
+ " 'connector' = 'hudi'");
options.forEach((k, v) -> builder.append(",\n")
.append(" '").append(k).append("' = '").append(v).append("'"));
builder.append("\n)");
return builder.toString();
}

/**
* low-level sink api for insert a datastream to hoodie table described by options and schema
* @param input The Input DataStream
* @param tablePath The tablePath to search a hoodie table in catalog
* @param catalogTable The catalog table to describe hoodie table schema
* @param isBouned The flag to indicate whether a batch execution mode
* @return The data stream sink return by insert a data stream to hoodie table
*/
private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBouned) {
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBouned)))
.consumeDataStream(input);
}

/**
* low-level source api for read a hoodie table who described by options and schema as a datastream source
* @param execEnv The flink stream execute environment
* @param tablePath The tablePath to search a hoodie table in catalog
* @param catalogTable The catalog table to describe hoodie table schema
* @return the source data stream read from a hoodie table
*/
private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) {
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory
.createDynamicTableSource(context))
.getScanRuntimeProvider(new ScanRuntimeProviderContext());
return dataStreamScanProvider.produceDataStream(execEnv);
}

/***
* a table descriptor for tableId and resolvedCatalogTable.
*/
public static class TableDescriptor {
private ObjectIdentifier tableId;
private ResolvedCatalogTable resolvedCatalogTable;

public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) {
this.tableId = tableId;
this.resolvedCatalogTable = resolvedCatalogTable;
}

public ObjectIdentifier getTableId() {
return tableId;
}

public ResolvedCatalogTable getResolvedCatalogTable() {
return resolvedCatalogTable;
}
}
}
Expand Up @@ -69,7 +69,7 @@
*/
public class ITTestDataStreamWrite extends TestLogger {

private static final Map<String, List<String>> EXPECTED = new HashMap<>();
public static final Map<String, List<String>> EXPECTED = new HashMap<>();
private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<>();
private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER = new HashMap<>();

Expand Down

0 comments on commit 1e9b3ac

Please sign in to comment.