Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions docs/content/docs/how-to/writing-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,59 @@ under the License.

# Writing Tables

You can use the `INSERT` statement to inserts new rows into a table
or overwrites the existing data in the table. The inserted rows can
be specified by value expressions or result from a query.

## Syntax

```sql
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
```
- part_spec

An optional parameter that specifies a comma-separated list of key and value pairs for partitions.
Note that one can use a typed literal (e.g., date’2019-01-02’) in the partition spec.

Syntax: PARTITION ( partition_col_name = partition_col_val [ , ... ] )

- column_list

An optional parameter that specifies a comma-separated list of columns belonging to the
table_identifier table.

Syntax: (col_name1 [, column_name2, ...])

{{< hint info >}}

All specified columns should exist in the table and not be duplicated from each other.
It includes all columns except the static partition columns.

The size of the column list should be exactly the size of the data from VALUES clause or query.

{{< /hint >}}

- value_expr

Specifies the values to be inserted. Either an explicitly specified value or a NULL can be
inserted. A comma must be used to separate each value in the clause. More than one set of
values can be specified to insert multiple rows.

Syntax: VALUES ( { value | NULL } [ , … ] ) [ , ( … ) ]

{{< hint info >}}

Currently, Flink doesn't support use NULL directly, so the NULL should be cast to actual
data type by `CAST (NULL AS data_type)`.

{{< /hint >}}

For more information, please check the syntax document:

[Flink INSERT Statement](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/)

[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)

## Applying Records/Changes to Tables

{{< tabs "insert-into-example" >}}
Expand Down Expand Up @@ -87,3 +140,60 @@ INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ..
{{< /tab >}}

{{< /tabs >}}

## Purging tables

You can use `INSERT OVERWRITE` to purge tables by inserting empty value.

{{< tabs "purge-tables-syntax" >}}

{{< tab "Flink" >}}

```sql
INSERT OVERWRITE MyTable SELECT * FROM MyTable WHERE false
```

{{< /tab >}}

{{< /tabs >}}

## Purging a Partition

Particularly, you can use `INSERT OVERWRITE` to purge data of a partition by inserting empty value to the partition:

{{< tabs "purge-partition-syntax" >}}

{{< tab "Flink" >}}

```sql
INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
```

{{< /tab >}}

{{< /tabs >}}

The following SQL is an example:

{{< tabs "purge-partition-example" >}}

{{< tab "Flink" >}}

```sql
-- table definition
CREATE TABLE MyTable (
k0 INT,
k1 INT,
v STRING
) PARTITIONED BY (k0, k1);

-- you can use
INSERT OVERWRITE MyTable PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false

-- or
INSERT OVERWRITE MyTable PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false
```

{{< /tab >}}

{{< /tabs >}}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;

import org.junit.Test;

Expand Down Expand Up @@ -381,6 +383,76 @@ public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception {
changelogRow("+I", "US Dollar")));
}

@Test
public void testPurgeTableUsingBatchOverWrite() throws Exception {
TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv.executeSql(
String.format(
"CREATE CATALOG test_catalog WITH ("
+ "'type'='table-store', 'warehouse'='%s')",
TEMPORARY_FOLDER.newFolder().toURI()));
tEnv.useCatalog("test_catalog");
tEnv.executeSql("CREATE TABLE test (k1 INT, k2 STRING, v STRING)");

validateOverwriteResult(tEnv, "test", "", "*", Collections.emptyList());
}

@Test
public void testPurgePartitionUsingBatchOverWrite() throws Exception {
TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
tEnv.executeSql(
String.format(
"CREATE CATALOG test_catalog WITH ("
+ "'type'='table-store', 'warehouse'='%s')",
TEMPORARY_FOLDER.newFolder().toURI()));
tEnv.useCatalog("test_catalog");

// single partition key
tEnv.executeSql(
"CREATE TABLE test_single (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1)");

validateOverwriteResult(
tEnv,
"test_single",
"PARTITION (k1 = 0)",
"k2, v",
Arrays.asList(
changelogRow("+I", 1, "2023-01-01", "flink"),
changelogRow("+I", 1, "2023-01-02", "table"),
changelogRow("+I", 1, "2023-01-02", "store")));

// multiple partition keys and overwrite one partition key
tEnv.executeSql(
"CREATE TABLE test_multiple0 (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1, k2)");

validateOverwriteResult(
tEnv,
"test_multiple0",
"PARTITION (k1 = 0)",
"k2, v",
Arrays.asList(
changelogRow("+I", 1, "2023-01-01", "flink"),
changelogRow("+I", 1, "2023-01-02", "table"),
changelogRow("+I", 1, "2023-01-02", "store")));

// multiple partition keys and overwrite all partition keys
tEnv.executeSql(
"CREATE TABLE test_multiple1 (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1, k2)");

validateOverwriteResult(
tEnv,
"test_multiple1",
"PARTITION (k1 = 0, k2 = '2023-01-01')",
"v",
Arrays.asList(
changelogRow("+I", 0, "2023-01-02", "world"),
changelogRow("+I", 1, "2023-01-01", "flink"),
changelogRow("+I", 1, "2023-01-02", "table"),
changelogRow("+I", 1, "2023-01-02", "store")));
}

@Test
public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
// input is dailyRatesChangelogWithoutUB()
Expand Down Expand Up @@ -1507,6 +1579,47 @@ public void testStreamingInsertOverwrite() throws Exception {

// ------------------------ Tools ----------------------------------

private void validateOverwriteResult(
TableEnvironment tEnv,
String table,
String partitionSpec,
String selectSpec,
List<Row> expected)
throws Exception {
tEnv.executeSql(
String.format("INSERT INTO %s VALUES ", table)
+ "(0, '2023-01-01', 'hi'), "
+ "(0, '2023-01-01', 'hello'), "
+ "(0, '2023-01-02', 'world'), "
+ "(1, '2023-01-01', 'flink'), "
+ "(1, '2023-01-02', 'table'), "
+ "(1, '2023-01-02', 'store')")
.await();

assertThat(
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM " + table).collect()))
.containsExactlyInAnyOrderElementsOf(
Arrays.asList(
changelogRow("+I", 0, "2023-01-01", "hi"),
changelogRow("+I", 0, "2023-01-01", "hello"),
changelogRow("+I", 0, "2023-01-02", "world"),
changelogRow("+I", 1, "2023-01-01", "flink"),
changelogRow("+I", 1, "2023-01-02", "table"),
changelogRow("+I", 1, "2023-01-02", "store")));

tEnv.executeSql(
String.format(
"INSERT OVERWRITE %s %s SELECT %s FROM %s WHERE false",
table, partitionSpec, selectSpec, table))
.await();

assertThat(
CollectionUtil.iteratorToList(
tEnv.executeSql("SELECT * FROM " + table).collect()))
.containsExactlyInAnyOrderElementsOf(expected);
}

private String collectAndCheckBatchReadWrite(
boolean partitioned,
boolean hasPk,
Expand Down