Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-1171][FLINK] When reading partial fields from Logstore, the number of fields does not match #1172

Merged
merged 7 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,7 +65,7 @@
/**
* This is a log source table api, create log queue consumer e.g. {@link LogKafkaSource}
*/
public class LogDynamicSource implements ScanTableSource, SupportsWatermarkPushDown {
public class LogDynamicSource implements ScanTableSource, SupportsWatermarkPushDown, SupportsProjectionPushDown {

private static final Logger LOG = LoggerFactory.getLogger(LogDynamicSource.class);

Expand All @@ -71,6 +74,7 @@ public class LogDynamicSource implements ScanTableSource, SupportsWatermarkPushD
private final ReadableConfig tableOptions;
private final Optional<String> consumerChangelogMode;
private final boolean logRetractionEnable;
private int[] projectedFields;

/**
* Watermark strategy that is used to generate per-partition watermark.
Expand Down Expand Up @@ -136,6 +140,14 @@ protected LogKafkaSource createKafkaSource() {
projectedSchema = new Schema(Arrays.stream(valueProjection).mapToObj(columns::get).collect(Collectors.toList()));
}

if (projectedFields != null) {
List<NestedField> projectedSchemaColumns = projectedSchema.columns();
projectedSchema = new Schema(Arrays.stream(projectedFields)
.mapToObj(projectedSchemaColumns::get)
.collect(Collectors.toList()));
}
LOG.info("Schema used for create KafkaSource is: {}", projectedSchema);

LogKafkaSourceBuilder kafkaSourceBuilder = LogKafkaSource.builder(projectedSchema, arcticTable.properties());
kafkaSourceBuilder.setProperties(properties);

Expand All @@ -150,6 +162,14 @@ protected LogPulsarSource createPulsarSource() {
projectedSchema = new Schema(Arrays.stream(valueProjection).mapToObj(columns::get).collect(Collectors.toList()));
}

if (projectedFields != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about abstracting the common logic to a private method?

List<NestedField> projectedSchemaColumns = projectedSchema.columns();
projectedSchema = new Schema(Arrays.stream(projectedFields)
.mapToObj(projectedSchemaColumns::get)
.collect(Collectors.toList()));
}
LOG.info("Schema used for create PulsarSource is: {}", projectedSchema);

LogPulsarSourceBuilder pulsarSourceBuilder = LogPulsarSource.builder(projectedSchema, arcticTable.properties());
pulsarSourceBuilder.setProperties(properties);

Expand Down Expand Up @@ -240,4 +260,18 @@ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}

@Override
public boolean supportsNestedProjection() {
return false;
}

@Override
public void applyProjection(int[][] projectFields) {
this.projectedFields = new int[projectFields.length];
for (int i = 0; i < projectFields.length; i++) {
Preconditions.checkArgument(projectFields[i].length == 1,
"Don't support nested projection now.");
this.projectedFields[i] = projectFields[i][0];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,68 @@ public void testUnpartitionLogSinkSource() throws Exception {
result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testUnpartitionLogSinkSourceWithSelectedFields() throws Exception {
List<Object[]> data = new LinkedList<>();
data.add(new Object[]{1000004, "a", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000015, "b", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000011, "c", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000014, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000015, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});

List<ApiExpression> rows = DataUtil.toRows(data);

Table input = getTableEnv().fromValues(DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("op_time", DataTypes.TIMESTAMP())
),
rows
);
getTableEnv().createTemporaryView("input", input);

sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props));

tableProperties.put(LOCATION, tableDir.getAbsolutePath() + "/" + TABLE);
sql("CREATE TABLE IF NOT EXISTS arcticCatalog." + db + "." + TABLE + "(" +
" id INT, name STRING, op_time TIMESTAMP, PRIMARY KEY (id) NOT ENFORCED) WITH %s", toWithClause(tableProperties));

sql("insert into arcticCatalog." + db + "." + TABLE + " /*+ OPTIONS(" +
"'arctic.emit.mode'='log'" +
", 'log.version'='v1'" +
") */" +
" select * from input");

TableResult result = exec("select id, op_time from arcticCatalog." + db + "." + TABLE +
"/*+ OPTIONS(" +
"'arctic.read.mode'='log'" +
", 'scan.startup.mode'='earliest'" +
")*/" +
"");

Set<Row> actual = new HashSet<>();
try (CloseableIterator<Row> iterator = result.collect()) {
for (Object[] datum : data) {
Row row = iterator.next();
actual.add(row);
}
}

List<Object[]> expected = new LinkedList<>();
expected.add(new Object[]{1000004, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000011, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000014, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});

Assert.assertEquals(DataUtil.toRowSet(expected), actual);
result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testUnPartitionDoubleSink() throws Exception {
List<Object[]> data = new LinkedList<>();
Expand Down Expand Up @@ -538,6 +600,68 @@ public void testPartitionLogSinkSource() throws Exception {
result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testPartitionLogSinkSourceWithSelectedFields() throws Exception {
List<Object[]> data = new LinkedList<>();
data.add(new Object[]{1000004, "a", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000015, "b", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000011, "c", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000014, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000015, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});

List<ApiExpression> rows = DataUtil.toRows(data);

Table input = getTableEnv().fromValues(DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("op_time", DataTypes.TIMESTAMP())
),
rows
);
getTableEnv().createTemporaryView("input", input);

sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props));

tableProperties.put(LOCATION, tableDir.getAbsolutePath() + "/" + TABLE);
sql("CREATE TABLE IF NOT EXISTS arcticCatalog." + db + "." + TABLE + "(" +
" id INT, name STRING, op_time TIMESTAMP, PRIMARY KEY (id) NOT ENFORCED " +
") PARTITIONED BY(op_time) WITH %s", toWithClause(tableProperties));

sql("insert into arcticCatalog." + db + "." + TABLE + " /*+ OPTIONS(" +
"'arctic.emit.mode'='log'" +
", 'log.version'='v1'" +
") */" +
" select * from input");

TableResult result = exec("select id, op_time from arcticCatalog." + db + "." + TABLE +
"/*+ OPTIONS(" +
"'arctic.read.mode'='log'" +
", 'scan.startup.mode'='earliest'" +
")*/" +
"");
Set<Row> actual = new HashSet<>();
try (CloseableIterator<Row> iterator = result.collect()) {
for (Object[] datum : data) {
Row row = iterator.next();
actual.add(row);
}
}

List<Object[]> expected = new LinkedList<>();
expected.add(new Object[]{1000004, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000011, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000014, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});

Assert.assertEquals(DataUtil.toRowSet(expected), actual);
result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testPartitionDoubleSink() throws Exception {
List<Object[]> data = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,67 @@ public void testLogSinkSource() throws Exception {
result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testUnpartitionLogSinkSourceWithSelectedFields() throws Exception {
List<Object[]> data = new LinkedList<>();
data.add(new Object[]{1000004, "a", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000015, "b", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000011, "c", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000014, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000015, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});

List<ApiExpression> rows = DataUtil.toRows(data);
Table input = getTableEnv().fromValues(DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("op_time", DataTypes.TIMESTAMP())
),
rows
);
getTableEnv().createTemporaryView("input", input);

sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props));

tableProperties.put(LOCATION, tableDir.getAbsolutePath() + "/" + TABLE);
sql("CREATE TABLE IF NOT EXISTS arcticCatalog." + db + "." + TABLE + "(" +
" id INT, name STRING, op_time TIMESTAMP) WITH %s", toWithClause(tableProperties));

sql("insert into arcticCatalog." + db + "." + TABLE + " /*+ OPTIONS(" +
"'arctic.emit.mode'='log'" +
", 'log.version'='v1'" +
") */" +
" select * from input");

TableResult result = exec("select id, op_time from arcticCatalog." + db + "." + TABLE +
"/*+ OPTIONS(" +
"'arctic.read.mode'='log'" +
", 'scan.startup.mode'='earliest'" +
")*/" +
"");

Set<Row> actual = new HashSet<>();
try (CloseableIterator<Row> iterator = result.collect()) {
for (Object[] datum : data) {
actual.add(iterator.next());
}
}

List<Object[]> expected = new LinkedList<>();
expected.add(new Object[]{1000004, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000011, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000014, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});

Assert.assertEquals(DataUtil.toRowSet(expected), actual);

result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testUnPartitionDoubleSink() throws Exception {
List<Object[]> data = new LinkedList<>();
Expand Down Expand Up @@ -595,6 +656,67 @@ public void testPartitionLogSinkSource() throws Exception {
result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testPartitionLogSinkSourceWithSelectedFields() throws Exception {
List<Object[]> data = new LinkedList<>();
data.add(new Object[]{1000004, "a", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000015, "b", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000011, "c", LocalDateTime.parse("2022-06-17T10:10:11.0")});
data.add(new Object[]{1000014, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000015, "d", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});
data.add(new Object[]{1000007, "e", LocalDateTime.parse("2022-06-18T10:10:11.0")});

List<ApiExpression> rows = DataUtil.toRows(data);
Table input = getTableEnv().fromValues(DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("op_time", DataTypes.TIMESTAMP())
),
rows
);
getTableEnv().createTemporaryView("input", input);

sql("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props));

tableProperties.put(LOCATION, tableDir.getAbsolutePath() + "/" + TABLE);
sql("CREATE TABLE IF NOT EXISTS arcticCatalog." + db + "." + TABLE + "(" +
" id INT, name STRING, op_time TIMESTAMP) PARTITIONED BY (op_time) WITH %s", toWithClause(tableProperties));

sql("insert into arcticCatalog." + db + "." + TABLE + " /*+ OPTIONS(" +
"'arctic.emit.mode'='log'" +
", 'log.version'='v1'" +
") */" +
" select * from input");

TableResult result = exec("select id, op_time from arcticCatalog." + db + "." + TABLE +
"/*+ OPTIONS(" +
"'arctic.read.mode'='log'" +
", 'scan.startup.mode'='earliest'" +
")*/" +
"");

Set<Row> actual = new HashSet<>();
try (CloseableIterator<Row> iterator = result.collect()) {
for (Object[] datum : data) {
actual.add(iterator.next());
}
}

List<Object[]> expected = new LinkedList<>();
expected.add(new Object[]{1000004, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000011, LocalDateTime.parse("2022-06-17T10:10:11.0")});
expected.add(new Object[]{1000014, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000015, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});
expected.add(new Object[]{1000007, LocalDateTime.parse("2022-06-18T10:10:11.0")});

Assert.assertEquals(DataUtil.toRowSet(expected), actual);

result.getJobClient().ifPresent(TestUtil::cancelJob);
}

@Test
public void testPartitionDoubleSink() throws Exception {
List<Object[]> data = new LinkedList<>();
Expand Down
Loading