Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2226]: Rename Flink Model Arctic to Amoro #2812

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;

import org.apache.amoro.flink.table.ArcticDynamicSource;
import org.apache.amoro.flink.table.AmoroDynamicSource;
import org.apache.amoro.flink.table.FlinkSource;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.PrimaryKeySpec;
Expand Down Expand Up @@ -116,7 +116,7 @@ public static TableSchema toSchema(
}

/**
* Add watermark info to help {@link FlinkSource} and {@link ArcticDynamicSource} distinguish the
* Add watermark info to help {@link FlinkSource} and {@link AmoroDynamicSource} distinguish the
* watermark field. For now, it only be used in the case of Arctic as dim-table.
*/
public static TableSchema getPhysicalSchemaForDimTable(TableSchema tableSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
import org.apache.amoro.flink.table.DynamicTableFactory;
import org.apache.amoro.flink.table.descriptors.ArcticValidator;
import org.apache.amoro.flink.util.ArcticUtils;
import org.apache.amoro.flink.util.AmoroUtils;
import org.apache.amoro.mixed.MixedFormatCatalog;
import org.apache.amoro.scan.CombinedScanTask;
import org.apache.amoro.scan.KeyedTableScanTask;
Expand Down Expand Up @@ -207,7 +207,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)

List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
return CatalogTable.of(
toSchema(arcticSchema, ArcticUtils.getPrimaryKeys(table), arcticProperties).toSchema(),
toSchema(arcticSchema, AmoroUtils.getPrimaryKeys(table), arcticProperties).toSchema(),
null,
partitionKeys,
arcticProperties);
Expand Down Expand Up @@ -740,7 +740,7 @@ private CatalogTable toCatalogTable(MixedTable table, TableIdentifier tableIdent

List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
return new CatalogTableImpl(
toSchema(arcticSchema, ArcticUtils.getPrimaryKeys(table), arcticProperties),
toSchema(arcticSchema, AmoroUtils.getPrimaryKeys(table), arcticProperties),
partitionKeys,
arcticProperties,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.amoro.flink.lookup;

import org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction;
import org.apache.amoro.flink.table.ArcticTableLoader;
import org.apache.amoro.flink.table.AmoroTableLoader;
import org.apache.amoro.hive.io.reader.AbstractAdaptHiveKeyedDataReader;
import org.apache.amoro.table.MixedTable;
import org.apache.flink.configuration.Configuration;
Expand All @@ -35,17 +35,17 @@
import java.util.function.Predicate;

/** A lookup function for {@link RowData} type. */
public class ArcticRowDataLookupFunction extends LookupFunction {
public class AmoroRowDataLookupFunction extends LookupFunction {
private static final long serialVersionUID = -7694050999266540499L;
private final BasicLookupFunction<RowData> basicLookupFunction;

public ArcticRowDataLookupFunction(
public AmoroRowDataLookupFunction(
TableFactory<RowData> tableFactory,
MixedTable mixedTable,
List<String> joinKeys,
Schema projectSchema,
List<Expression> filters,
ArcticTableLoader tableLoader,
AmoroTableLoader tableLoader,
Configuration config,
Predicate<RowData> predicate,
AbstractAdaptHiveKeyedDataReader<RowData> flinkArcticMORDataReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.amoro.flink.lookup;

import static org.apache.amoro.flink.table.descriptors.ArcticValidator.LOOKUP_RELOADING_INTERVAL;
import static org.apache.amoro.flink.util.ArcticUtils.loadArcticTable;
import static org.apache.amoro.flink.util.AmoroUtils.loadArcticTable;
import static org.apache.flink.util.Preconditions.checkArgument;

import org.apache.amoro.flink.read.MixedIncrementalLoader;
import org.apache.amoro.flink.read.hybrid.enumerator.MergeOnReadIncrementalPlanner;
import org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction;
import org.apache.amoro.flink.table.ArcticTableLoader;
import org.apache.amoro.flink.table.AmoroTableLoader;
import org.apache.amoro.hive.io.reader.AbstractAdaptHiveKeyedDataReader;
import org.apache.amoro.table.MixedTable;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -65,7 +65,7 @@ public class BasicLookupFunction<T> implements Serializable {
private final List<String> joinKeys;
private final Schema projectSchema;
private final List<Expression> filters;
private final ArcticTableLoader loader;
private final AmoroTableLoader loader;
private long nextLoadTime = Long.MIN_VALUE;
private final long reloadIntervalSeconds;
private MixedIncrementalLoader<T> incrementalLoader;
Expand All @@ -85,7 +85,7 @@ public BasicLookupFunction(
List<String> joinKeys,
Schema projectSchema,
List<Expression> filters,
ArcticTableLoader tableLoader,
AmoroTableLoader tableLoader,
Configuration config,
Predicate<T> predicate,
AbstractAdaptHiveKeyedDataReader<T> flinkArcticMORDataReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import org.apache.amoro.flink.read.hybrid.assigner.ShuffleSplitAssigner;
import org.apache.amoro.flink.read.hybrid.assigner.SplitAssigner;
import org.apache.amoro.flink.read.hybrid.assigner.StaticSplitAssigner;
import org.apache.amoro.flink.read.hybrid.enumerator.ArcticSourceEnumState;
import org.apache.amoro.flink.read.hybrid.enumerator.ArcticSourceEnumStateSerializer;
import org.apache.amoro.flink.read.hybrid.enumerator.ArcticSourceEnumerator;
import org.apache.amoro.flink.read.hybrid.enumerator.StaticArcticSourceEnumerator;
import org.apache.amoro.flink.read.hybrid.reader.ArcticSourceReader;
import org.apache.amoro.flink.read.hybrid.enumerator.AmoroSourceEnumState;
import org.apache.amoro.flink.read.hybrid.enumerator.AmoroSourceEnumStateSerializer;
import org.apache.amoro.flink.read.hybrid.enumerator.AmoroSourceEnumerator;
import org.apache.amoro.flink.read.hybrid.enumerator.StaticAmoroSourceEnumerator;
import org.apache.amoro.flink.read.hybrid.reader.AmoroSourceReader;
import org.apache.amoro.flink.read.hybrid.reader.ReaderFunction;
import org.apache.amoro.flink.read.hybrid.split.ArcticSplit;
import org.apache.amoro.flink.read.hybrid.split.ArcticSplitSerializer;
import org.apache.amoro.flink.read.source.ArcticScanContext;
import org.apache.amoro.flink.table.ArcticTableLoader;
import org.apache.amoro.flink.read.hybrid.split.AmoroSplit;
import org.apache.amoro.flink.read.hybrid.split.AmoroSplitSerializer;
import org.apache.amoro.flink.read.source.AmoroScanContext;
import org.apache.amoro.flink.table.AmoroTableLoader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand All @@ -42,29 +42,29 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;

/**
* Arctic Source based of FLIP-27.
* Amoro Source based of FLIP-27.
*
* <p>If ArcticSource is used as a build table in lookup join, it will be implemented by temporal
* join. Two source should use processing time as watermark. ArcticSource will generate watermark
* after first splits planned by ArcticSourceEnumerator having been finished.
* <p>If AmoroSource is used as a build table in lookup join, it will be implemented by temporal
* join. Two source should use processing time as watermark. AmoroSource will generate watermark
* after first splits planned by AmoroSourceEnumerator having been finished.
*/
public class ArcticSource<T>
implements Source<T, ArcticSplit, ArcticSourceEnumState>, ResultTypeQueryable<T> {
public class AmoroSource<T>
implements Source<T, AmoroSplit, AmoroSourceEnumState>, ResultTypeQueryable<T> {
private static final long serialVersionUID = 1L;
private final ArcticScanContext scanContext;
private final AmoroScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final TypeInformation<T> typeInformation;
private final ArcticTableLoader loader;
private final AmoroTableLoader loader;
private final String tableName;
/**
* generate arctic watermark. This is only for lookup join arctic table, and arctic table is used
* as build table, i.e. right table.
* generate amoro watermark. This is only for lookup join amoro table, and amoro table is used as
* build table, i.e. right table.
*/
private final boolean dimTable;

public ArcticSource(
ArcticTableLoader loader,
ArcticScanContext scanContext,
public AmoroSource(
AmoroTableLoader loader,
AmoroScanContext scanContext,
ReaderFunction<T> readerFunction,
TypeInformation<T> typeInformation,
String tableName,
Expand All @@ -83,45 +83,44 @@ public Boundedness getBoundedness() {
}

@Override
public SourceReader<T, ArcticSplit> createReader(SourceReaderContext readerContext) {
return new ArcticSourceReader<>(
public SourceReader<T, AmoroSplit> createReader(SourceReaderContext readerContext) {
return new AmoroSourceReader<>(
readerFunction, readerContext.getConfiguration(), readerContext, dimTable);
}

@Override
public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext) {
public SplitEnumerator<AmoroSplit, AmoroSourceEnumState> createEnumerator(
SplitEnumeratorContext<AmoroSplit> enumContext) {
return createEnumerator(enumContext, null);
}

private SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext, ArcticSourceEnumState enumState) {
private SplitEnumerator<AmoroSplit, AmoroSourceEnumState> createEnumerator(
SplitEnumeratorContext<AmoroSplit> enumContext, AmoroSourceEnumState enumState) {
SplitAssigner splitAssigner;
if (scanContext.isStreaming()) {
splitAssigner = new ShuffleSplitAssigner(enumContext, tableName, enumState);
return new ArcticSourceEnumerator(
return new AmoroSourceEnumerator(
enumContext, splitAssigner, loader, scanContext, enumState, dimTable);
} else {
splitAssigner = new StaticSplitAssigner(enumState);
return new StaticArcticSourceEnumerator(
enumContext, splitAssigner, loader, scanContext, null);
return new StaticAmoroSourceEnumerator(enumContext, splitAssigner, loader, scanContext, null);
}
}

@Override
public SplitEnumerator<ArcticSplit, ArcticSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<ArcticSplit> enumContext, ArcticSourceEnumState checkpoint) {
public SplitEnumerator<AmoroSplit, AmoroSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<AmoroSplit> enumContext, AmoroSourceEnumState checkpoint) {
return createEnumerator(enumContext, checkpoint);
}

@Override
public SimpleVersionedSerializer<ArcticSplit> getSplitSerializer() {
return new ArcticSplitSerializer();
public SimpleVersionedSerializer<AmoroSplit> getSplitSerializer() {
return new AmoroSplitSerializer();
}

@Override
public SimpleVersionedSerializer<ArcticSourceEnumState> getEnumeratorCheckpointSerializer() {
return new ArcticSourceEnumStateSerializer();
public SimpleVersionedSerializer<AmoroSourceEnumState> getEnumeratorCheckpointSerializer() {
return new AmoroSourceEnumStateSerializer();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.amoro.flink.read;

import org.apache.amoro.data.DataFileType;
import org.apache.amoro.flink.read.hybrid.split.ArcticSplit;
import org.apache.amoro.flink.read.hybrid.split.AmoroSplit;
import org.apache.amoro.flink.read.hybrid.split.ChangelogSplit;
import org.apache.amoro.flink.read.hybrid.split.MergeOnReadSplit;
import org.apache.amoro.flink.read.hybrid.split.SnapshotSplit;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class FlinkSplitPlanner {

private FlinkSplitPlanner() {}

public static List<ArcticSplit> planFullTable(KeyedTable keyedTable, AtomicInteger splitCount) {
public static List<AmoroSplit> planFullTable(KeyedTable keyedTable, AtomicInteger splitCount) {
CloseableIterable<CombinedScanTask> combinedScanTasks = keyedTable.newScan().planTasks();
BaseAndChangeTask baseAndChangeTask = BaseAndChangeTask.of(combinedScanTasks);
return planFullTable(baseAndChangeTask, splitCount);
Expand All @@ -72,10 +72,10 @@ public static List<ArcticSplit> planFullTable(KeyedTable keyedTable, AtomicInteg
* @param keyedTable The {@link KeyedTable} to scan.
* @param filters Optional list of filters to apply to the scan.
* @param splitCount The atomic integer to track the split count.
* @return The list of planned {@link ArcticSplit} included {@link SnapshotSplit}, {@link
* @return The list of planned {@link AmoroSplit} included {@link SnapshotSplit}, {@link
* ChangelogSplit}.
*/
public static List<ArcticSplit> planFullTable(
public static List<AmoroSplit> planFullTable(
KeyedTable keyedTable, List<Expression> filters, AtomicInteger splitCount) {
KeyedTableScan keyedTableScan = keyedTable.newScan();
if (filters != null) {
Expand All @@ -86,10 +86,10 @@ public static List<ArcticSplit> planFullTable(
return planFullTable(baseAndChangeTask, splitCount);
}

private static List<ArcticSplit> planFullTable(
private static List<AmoroSplit> planFullTable(
BaseAndChangeTask baseAndChangeTask, AtomicInteger splitCount) {
Collection<MixedFileScanTask> baseTasks = baseAndChangeTask.allBaseTasks();
List<ArcticSplit> allSplits =
List<AmoroSplit> allSplits =
baseTasks.stream()
.map(
arcticFileScanTask ->
Expand All @@ -98,7 +98,7 @@ private static List<ArcticSplit> planFullTable(
.collect(Collectors.toList());

Collection<TransactionTask> changeTasks = baseAndChangeTask.transactionTasks();
List<ArcticSplit> changeSplits = planChangeTable(changeTasks, splitCount);
List<AmoroSplit> changeSplits = planChangeTable(changeTasks, splitCount);
allSplits.addAll(changeSplits);

return allSplits;
Expand All @@ -111,16 +111,16 @@ private static List<ArcticSplit> planFullTable(
* @param keyedTable The {@link KeyedTable} to scan.
* @param filters Optional list of filters to apply to the scan.
* @param splitCount The atomic integer to track the split count.
* @return The list of planned {@link ArcticSplit} included {@link MergeOnReadSplit}.
* @return The list of planned {@link AmoroSplit} included {@link MergeOnReadSplit}.
*/
public static List<ArcticSplit> mergeOnReadPlan(
public static List<AmoroSplit> mergeOnReadPlan(
KeyedTable keyedTable, List<Expression> filters, AtomicInteger splitCount) {
KeyedTableScan keyedTableScan = keyedTable.newScan();
if (filters != null) {
filters.forEach(keyedTableScan::filter);
}
CloseableIterable<CombinedScanTask> combinedScanTasks = keyedTableScan.planTasks();
List<ArcticSplit> morSplits = Lists.newArrayList();
List<AmoroSplit> morSplits = Lists.newArrayList();
try (CloseableIterator<CombinedScanTask> initTasks = combinedScanTasks.iterator()) {

while (initTasks.hasNext()) {
Expand All @@ -137,16 +137,16 @@ public static List<ArcticSplit> mergeOnReadPlan(
return morSplits;
}

public static List<ArcticSplit> planChangeTable(
public static List<AmoroSplit> planChangeTable(
ChangeTableIncrementalScan tableIncrementalScan, AtomicInteger splitCount) {
CloseableIterable<FileScanTask> tasks = tableIncrementalScan.planFiles();
BaseAndChangeTask baseAndChangeTask = BaseAndChangeTask.ofIceberg(tasks);
return planChangeTable(baseAndChangeTask.transactionTasks(), splitCount);
}

private static List<ArcticSplit> planChangeTable(
private static List<AmoroSplit> planChangeTable(
Collection<TransactionTask> transactionTasks, AtomicInteger splitCount) {
List<ArcticSplit> changeTasks = new ArrayList<>(transactionTasks.size());
List<AmoroSplit> changeTasks = new ArrayList<>(transactionTasks.size());
transactionTasks.forEach(
transactionTask -> {
PartitionAndNodeGroup partitionAndNodeGroup =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.amoro.flink.read;

import org.apache.amoro.flink.read.hybrid.enumerator.ArcticEnumeratorOffset;
import org.apache.amoro.flink.read.hybrid.enumerator.AmoroEnumeratorOffset;
import org.apache.amoro.flink.read.hybrid.enumerator.ContinuousEnumerationResult;
import org.apache.amoro.flink.read.hybrid.enumerator.ContinuousSplitPlanner;
import org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction;
import org.apache.amoro.flink.read.hybrid.split.ArcticSplit;
import org.apache.amoro.flink.read.hybrid.split.AmoroSplit;
import org.apache.amoro.hive.io.reader.AbstractAdaptHiveKeyedDataReader;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterator;
Expand Down Expand Up @@ -52,8 +52,8 @@ public class MixedIncrementalLoader<T> implements AutoCloseable {
private final DataIteratorReaderFunction<T> readerFunction;
private AbstractAdaptHiveKeyedDataReader<T> flinkArcticMORDataReader;
private final List<Expression> filters;
private final AtomicReference<ArcticEnumeratorOffset> enumeratorPosition;
private final Queue<ArcticSplit> splitQueue;
private final AtomicReference<AmoroEnumeratorOffset> enumeratorPosition;
private final Queue<AmoroSplit> splitQueue;

public MixedIncrementalLoader(
ContinuousSplitPlanner continuousSplitPlanner,
Expand Down Expand Up @@ -100,7 +100,7 @@ public boolean hasNext() {
}

public CloseableIterator<T> next() {
ArcticSplit split = splitQueue.poll();
AmoroSplit split = splitQueue.poll();
if (split == null) {
throw new IllegalStateException("next() called, but no more valid splits");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.amoro.flink.read;

import org.apache.amoro.flink.read.hybrid.split.ArcticSplit;
import org.apache.amoro.flink.read.hybrid.split.AmoroSplit;
import org.apache.amoro.flink.read.hybrid.split.ChangelogSplit;
import org.apache.amoro.scan.MixedFileScanTask;

Expand All @@ -32,7 +32,7 @@

/**
* This is a group of the partitions and nodes of the arctic table, it can plan different nodes and
* different partitions into different {@link ArcticSplit}.
* different partitions into different {@link AmoroSplit}.
*/
public class PartitionAndNodeGroup {
AtomicInteger splitCount = new AtomicInteger();
Expand All @@ -54,12 +54,12 @@ public PartitionAndNodeGroup splitCount(AtomicInteger splitCount) {
return this;
}

List<ArcticSplit> planSplits() {
List<AmoroSplit> planSplits() {
Map<String, Map<Long, Node>> nodes = new HashMap<>();
plan(true, nodes);
plan(false, nodes);

List<ArcticSplit> splits = new ArrayList<>();
List<AmoroSplit> splits = new ArrayList<>();

nodes
.values()
Expand Down