Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/content/docs/development/query-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,33 @@ SELECT * FROM MyTable$snapshots;
By querying one table's snapshots table, you can know the commit and expiration
information about that table and time travel through the data.

## Schemas Table

You can query the historical schemas of the table through Flink SQL.

```sql
SELECT * FROM MyTable$schemas;

+-----------+--------------------------------+----------------+--------------+---------+---------+
| schema_id | fields | partition_keys | primary_keys | options | comment |
+-----------+--------------------------------+----------------+--------------+---------+---------+
| 0 | [{"id":0,"name":"word","typ... | [] | ["word"] | {} | |
| 1 | [{"id":0,"name":"word","typ... | [] | ["word"] | {} | |
| 2 | [{"id":0,"name":"word","typ... | [] | ["word"] | {} | |
+-----------+--------------------------------+----------------+--------------+---------+---------+
3 rows in set
```

You can join the snapshots table and schemas table to get the fields of given snapshots.

```sql

SELECT s.snapshot_id, t.schema_id, t.fields
FROM MyTable$snapshots s JOIN MyTable$schemas t
ON s.schema_id=t.schema_id where s.snapshot_id=100;

```

## Options Table

You can query the table's option information which is specified from the DDL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.Test;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -70,4 +71,56 @@ public void testCreateMetaTable() {
"Table name[%s] cannot contain '%s' separator",
"T$aa$bb", METADATA_TABLE_SPLITTER));
}

@Test
public void testSchemasTable() throws Exception {
sql(
"CREATE TABLE T(a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')");
sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')");

assertThat(sql("SHOW CREATE TABLE T$schemas").toString())
.isEqualTo(
"[+I[CREATE TABLE `TABLE_STORE`.`default`.`T$schemas` (\n"
+ " `schema_id` BIGINT NOT NULL,\n"
+ " `fields` VARCHAR(2147483647) NOT NULL,\n"
+ " `partition_keys` VARCHAR(2147483647) NOT NULL,\n"
+ " `primary_keys` VARCHAR(2147483647) NOT NULL,\n"
+ " `options` VARCHAR(2147483647) NOT NULL,\n"
+ " `comment` VARCHAR(2147483647)\n"
+ ") ]]");

List<Row> result = sql("SELECT * FROM T$schemas order by schema_id");

assertThat(result.toString())
.isEqualTo(
"[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ "{\"id\":2,\"name\":\"c\",\"type\":\"VARCHAR(2147483647)\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], "
+ "+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},"
+ "{\"id\":2,\"name\":\"c\",\"type\":\"VARCHAR(2147483647)\"}], [], [\"a\"], "
+ "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ]]");
}

@Test
public void testSnapshotsSchemasTable() throws Exception {
sql("CREATE TABLE T (a INT, b INT)");
sql("INSERT INTO T VALUES (1, 2)");
sql("INSERT INTO T VALUES (3, 4)");
sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')");
sql("INSERT INTO T VALUES (5, 6)");
sql("INSERT INTO T VALUES (7, 8)");

List<Row> result =
sql(
"SELECT s.snapshot_id, s.schema_id, t.fields FROM "
+ "T$snapshots s JOIN T$schemas t ON s.schema_id=t.schema_id");
assertThat(result.stream().map(Row::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder(
"+I[1, 0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]",
"+I[2, 0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]",
"+I[3, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]",
"+I[4, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.store.table.Table;

import static org.apache.flink.table.store.table.metadata.OptionsTable.OPTIONS;
import static org.apache.flink.table.store.table.metadata.SchemasTable.SCHEMAS;
import static org.apache.flink.table.store.table.metadata.SnapshotsTable.SNAPSHOTS;

/** Loader to load metadata {@link Table}s. */
Expand All @@ -33,6 +34,8 @@ public static Table load(String metadata, Path location) {
return new SnapshotsTable(location);
case OPTIONS:
return new OptionsTable(location);
case SCHEMAS:
return new SchemasTable(location);
default:
throw new UnsupportedOperationException(
"Unsupported metadata table type: " + metadata);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.store.table.metadata;

import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.IteratorRecordReader;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.SerializationUtils;
import org.apache.flink.table.store.table.Table;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.store.utils.ProjectedRowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;

/** A {@link Table} for showing schemas of table. */
public class SchemasTable implements Table {

private static final long serialVersionUID = 1L;

public static final String SCHEMAS = "schemas";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new RowType.RowField("schema_id", new BigIntType(false)),
new RowType.RowField("fields", SerializationUtils.newStringType(false)),
new RowType.RowField(
"partition_keys", SerializationUtils.newStringType(false)),
new RowType.RowField(
"primary_keys", SerializationUtils.newStringType(false)),
new RowType.RowField(
"options", SerializationUtils.newStringType(false)),
new RowType.RowField(
"comment", SerializationUtils.newStringType(true))));

private final Path location;

public SchemasTable(Path location) {
this.location = location;
}

@Override
public String name() {
return location.getName() + METADATA_TABLE_SPLITTER + SCHEMAS;
}

@Override
public RowType rowType() {
return TABLE_TYPE;
}

@Override
public TableScan newScan() {
return new SchemasScan();
}

@Override
public TableRead newRead() {
return new SchemasRead();
}

private class SchemasScan implements TableScan {

@Override
public TableScan withFilter(Predicate predicate) {
return this;
}

@Override
public Plan plan() {
return () -> Collections.singletonList(new SchemasSplit(location));
}
}

/** {@link Split} implementation for {@link SchemasTable}. */
private static class SchemasSplit implements Split {

private static final long serialVersionUID = 1L;

private final Path location;

private SchemasSplit(Path location) {
this.location = location;
}

@Override
public long rowCount() {
return new SchemaManager(location).listAllIds().size();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemasSplit that = (SchemasSplit) o;
return Objects.equals(location, that.location);
}

@Override
public int hashCode() {
return Objects.hash(location);
}
}

/** {@link TableRead} implementation for {@link SchemasTable}. */
private static class SchemasRead implements TableRead {

private int[][] projection;

@Override
public TableRead withFilter(Predicate predicate) {
return this;
}

@Override
public TableRead withProjection(int[][] projection) {
this.projection = projection;
return this;
}

@Override
public RecordReader<RowData> createReader(Split split) throws IOException {
if (!(split instanceof SchemasSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((SchemasSplit) split).location;
Iterator<TableSchema> schemas = new SchemaManager(location).listAll().iterator();
Iterator<RowData> rows = Iterators.transform(schemas, this::toRow);
if (projection != null) {
rows =
Iterators.transform(
rows, row -> ProjectedRowData.from(projection).replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}

private RowData toRow(TableSchema schema) {
return GenericRowData.of(
schema.id(),
toJson(schema.fields()),
toJson(schema.partitionKeys()),
toJson(schema.primaryKeys()),
toJson(schema.options()),
StringData.fromString(schema.comment()));
}

private StringData toJson(Object obj) {
ObjectMapper objectMapper = new ObjectMapper();
try {
// Trim all spaces
JsonNode node = objectMapper.readTree(JsonSerdeUtil.toJson(obj));
return StringData.fromString(node.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ public void testMetadataTable() {
.select("snapshot_id", "schema_id", "commit_user", "commit_kind")
.collectAsList();
assertThat(rows.toString()).isEqualTo("[[1,0,user,APPEND]]");

spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.schemasTable (\n"
+ "a BIGINT,\n"
+ "b STRING) USING tablestore\n"
+ "COMMENT 'table comment'\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");
spark.sql("ALTER TABLE default.schemasTable ADD COLUMN c STRING");
List<Row> schemas =
spark.table("tablestore.default.`schemasTable$schemas`").collectAsList();
List<?> fieldsList = schemas.stream().map(row -> row.get(1)).collect(Collectors.toList());
assertThat(fieldsList.toString())
.isEqualTo(
"[[{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"VARCHAR(2147483647)\"}], "
+ "[{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"b\",\"type\":\"VARCHAR(2147483647)\"},"
+ "{\"id\":2,\"name\":\"c\",\"type\":\"VARCHAR(2147483647)\"}]]");
}

@Test
Expand Down