From 1e9b3ac4c34f97f5ccf3a639cc74b7081eeaab37 Mon Sep 17 00:00:00 2001 From: jerryyue Date: Wed, 27 Apr 2022 20:41:27 +0800 Subject: [PATCH] [HUDI-3953]Flink Hudi module should support low-level source and sink api --- .../org/apache/hudi/sink/utils/Pipelines.java | 5 + .../org/apache/hudi/util/HoodiePipeline.java | 263 ++++++++++++++++++ .../hudi/sink/ITTestDataStreamWrite.java | 2 +- .../hudi/sink/utils/ITTestHoodiePipeline.java | 220 +++++++++++++++ .../hudi/table/ITTestHoodieDataSource.java | 2 +- 5 files changed, 490 insertions(+), 2 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ITTestHoodiePipeline.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 3b2ee39528a8b..b5bd9e763165b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -41,6 +41,7 @@ import org.apache.hudi.sink.partitioner.BucketIndexPartitioner; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.util.HoodiePipeline; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -48,16 +49,20 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; import java.util.HashMap; +import java.util.List; import java.util.Map; /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java new file mode 100644 index 0000000000000..58d17b681be63 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieTableFactory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A tool class to construct hoodie flink pipeline. + * + *

How to use ?

+ * Method {@link #builder(String)} returns a pipeline builder. The builder + * can then define the hudi table columns, primary keys and partitions. + * + *

An example:

+ *
+ *    HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
+ *    DataStreamSink sinkStream = builder
+ *        .column("f0 int")
+ *        .column("f1 varchar(10)")
+ *        .column("f2 varchar(20)")
+ *        .pk("f0,f1")
+ *        .partition("f2")
+ *        .sink(input, false);
+ *  
+ */ +public class HoodiePipeline { + + private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class); + + /** + * Returns the builder for hoodie pipeline construction. + */ + public static Builder builder(String tableName) { + return new Builder(tableName); + } + + /** + * Builder for hudi source/sink pipeline construction. + */ + public static class Builder { + private final String tableName; + private final List columns; + private final Map options; + + private String pk; + private List partitions; + + public Builder self() { + return this; + } + + private Builder(String tableName) { + this.tableName = tableName; + this.columns = new ArrayList<>(); + this.options = new HashMap<>(); + this.partitions = new ArrayList<>(); + } + + /** + * Add a table column definition. + * + * @param column the column format should be in the form like 'f0 int' + */ + public Builder column(String column) { + this.columns.add(column); + return self(); + } + + /** + * Add primary keys. + */ + public Builder pk(String... pks) { + this.pk = String.join(",", pks); + return self(); + } + + /** + * Add partition fields. + */ + public Builder partition(String... partitions) { + this.partitions = new ArrayList<>(Arrays.asList(partitions)); + return self(); + } + + /** + * Add a config option. + */ + public Builder option(ConfigOption option, Object val) { + this.options.put(option.key(), val.toString()); + return self(); + } + + public Builder option(String key, Object val) { + this.options.put(key, val.toString()); + return self(); + } + + public Builder options(Map options) { + this.options.putAll(options); + return self(); + } + + public DataStreamSink sink(DataStream input, boolean bounded) { + TableDescriptor tableDescriptor = getTableDescriptor(); + return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded); + } + + public TableDescriptor getTableDescriptor() { + EnvironmentSettings environmentSettings = EnvironmentSettings + .newInstance() + .build(); + TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings); + String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions); + tableEnv.executeSql(sql); + String currentCatalog = tableEnv.getCurrentCatalog(); + ResolvedCatalogTable catalogTable = null; + String defaultDatabase = null; + try { + Catalog catalog = tableEnv.getCatalog(currentCatalog).get(); + defaultDatabase = catalog.getDefaultDatabase(); + catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName)); + } catch (TableNotExistException e) { + throw new HoodieException("Create table " + this.tableName + " exception", e); + } + ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName); + return new TableDescriptor(tableId, catalogTable); + } + + public DataStream source(StreamExecutionEnvironment execEnv) { + TableDescriptor tableDescriptor = getTableDescriptor(); + return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getResolvedCatalogTable()); + } + } + + private static String getCreateHoodieTableDDL( + String tableName, + List fields, + Map options, + String pkField, + List partitionField) { + StringBuilder builder = new StringBuilder(); + builder.append("create table ").append(tableName).append("(\n"); + for (String field : fields) { + builder.append(" ").append(field).append(",\n"); + } + builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n") + .append(")\n"); + if (!partitionField.isEmpty()) { + String partitons = partitionField + .stream() + .map(partitionName -> "`" + partitionName + "`") + .collect(Collectors.joining(",")); + builder.append("PARTITIONED BY (").append(partitons).append(")\n"); + } + builder.append("with (\n" + + " 'connector' = 'hudi'"); + options.forEach((k, v) -> builder.append(",\n") + .append(" '").append(k).append("' = '").append(v).append("'")); + builder.append("\n)"); + return builder.toString(); + } + + /** + * low-level sink api for insert a datastream to hoodie table described by options and schema + * @param input The Input DataStream + * @param tablePath The tablePath to search a hoodie table in catalog + * @param catalogTable The catalog table to describe hoodie table schema + * @param isBouned The flag to indicate whether a batch execution mode + * @return The data stream sink return by insert a data stream to hoodie table + */ + private static DataStreamSink sink(DataStream input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBouned) { + FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable, + Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); + HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); + return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context) + .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBouned))) + .consumeDataStream(input); + } + + /** + * low-level source api for read a hoodie table who described by options and schema as a datastream source + * @param execEnv The flink stream execute environment + * @param tablePath The tablePath to search a hoodie table in catalog + * @param catalogTable The catalog table to describe hoodie table schema + * @return the source data stream read from a hoodie table + */ + private static DataStream source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) { + FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable, + Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); + HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); + DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory + .createDynamicTableSource(context)) + .getScanRuntimeProvider(new ScanRuntimeProviderContext()); + return dataStreamScanProvider.produceDataStream(execEnv); + } + + /*** + * a table descriptor for tableId and resolvedCatalogTable. + */ + public static class TableDescriptor { + private ObjectIdentifier tableId; + private ResolvedCatalogTable resolvedCatalogTable; + + public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) { + this.tableId = tableId; + this.resolvedCatalogTable = resolvedCatalogTable; + } + + public ObjectIdentifier getTableId() { + return tableId; + } + + public ResolvedCatalogTable getResolvedCatalogTable() { + return resolvedCatalogTable; + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index b9deb43a97c53..c8aca909aaa4e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -69,7 +69,7 @@ */ public class ITTestDataStreamWrite extends TestLogger { - private static final Map> EXPECTED = new HashMap<>(); + public static final Map> EXPECTED = new HashMap<>(); private static final Map> EXPECTED_TRANSFORMER = new HashMap<>(); private static final Map> EXPECTED_CHAINED_TRANSFORMER = new HashMap<>(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ITTestHoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ITTestHoodiePipeline.java new file mode 100644 index 0000000000000..95e7ed98c71e2 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ITTestHoodiePipeline.java @@ -0,0 +1,220 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodiePipeline; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestUtils; +import org.apache.hudi.utils.source.ContinuousFileSource; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.sink.ITTestDataStreamWrite.EXPECTED; +import static org.apache.hudi.utils.TestData.assertRowDataEquals; + +/** + * IT test for test hoodie pipeline. + */ +public class ITTestHoodiePipeline { + + public static final List RESULT = new ArrayList<>(); + @TempDir + File tempFile; + private StreamExecutionEnvironment execEnv; + private boolean isMor; + private Map options = new HashMap<>(); + private TableEnvironment streamTableEnv; + + @BeforeEach + void beforEach() { + this.execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4"); + options.put("table.type", HoodieTableType.MERGE_ON_READ.name()); + options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id"); + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + + this.isMor = options.get(FlinkOptions.TABLE_TYPE.key()).equals(HoodieTableType.MERGE_ON_READ.name()); + + EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); + streamTableEnv = TableEnvironmentImpl.create(settings); + streamTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Configuration execConf = streamTableEnv.getConfig().getConfiguration(); + execConf.setString("execution.checkpointing.interval", "2s"); + // configure not to retry after failure + execConf.setString("restart-strategy", "fixed-delay"); + execConf.setString("restart-strategy.fixed-delay.attempts", "0"); + } + + @Test + public void testSink() throws Exception { + DataStream dataStream = inputDataStream(this.execEnv, options, isMor); + + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink") + .column("uuid string not null") + .column("name string") + .column("age int") + .column("`ts` timestamp(3)") + .column("`partition` string") + .pk("uuid") + .partition("partition") + .options(this.options); + builder.sink(dataStream, false); + + execute(); + TestData.checkWrittenFullData(tempFile, EXPECTED); + } + + @Test + public void testSource() throws Exception { + insertHoodie(); + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + options.clear(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit); + + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source") + .column("uuid string not null") + .column("name string") + .column("age int") + .column("`ts` timestamp(3)") + .column("`partition` string") + .pk("uuid") + .partition("partition") + .options(this.options); + DataStream rowDataDataStream = builder.source(execEnv); + + rowDataDataStream.addSink(new SinkFunction() { + @Override + public void invoke(RowData value, Context context) throws Exception { + RESULT.add(value); + } + }); + execEnv.execute(); + TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data + assertRowDataEquals(RESULT, TestData.dataSetInsert(5, 6)); + } + + public void execute() throws Exception { + JobClient client = this.execEnv.executeAsync("hoodiePipeline"); + if (isMor) { + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + } else { + // wait for the streaming job to finish + client.getJobExecutionResult().get(); + } + } + + public void insertHoodie() throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ"); + + // write 3 batches of data set + TestData.writeData(TestData.dataSetInsert(1, 2), conf); + TestData.writeData(TestData.dataSetInsert(3, 4), conf); + TestData.writeData(TestData.dataSetInsert(5, 6), conf); + } + + public DataStream inputDataStream(StreamExecutionEnvironment execEnv, Map options, Boolean isMor) { + // Read from file source + RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(Configuration.fromMap(options))).getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601); + String sourcePath = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data")).toString(); + + + DataStream dataStream; + if (isMor) { + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + dataStream = execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(1); + } else { + dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 1)) + .name("continuous_file_source").setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); + } + return dataStream; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 786a45cac7ac9..a9b36667f40aa 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1276,7 +1276,7 @@ private static Stream indexAndPartitioningParams() { return Stream.of(data).map(Arguments::of); } - private void execInsertSql(TableEnvironment tEnv, String insert) { + public static void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish try {