Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ class PlannerModule {
// flink-table-runtime or flink-dist itself
"org.codehaus.janino",
"org.codehaus.commons",
"org.apache.commons.lang3",
// Used by org.reflections
"javassist"))
"org.apache.commons.lang3"))
.toArray(String[]::new);

private static final String[] COMPONENT_CLASSPATH = new String[] {"org.apache.flink"};
Expand Down
26 changes: 0 additions & 26 deletions flink-table/flink-table-planner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,6 @@ under the License.
<artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<!-- Utility to scan classpaths -->
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Test dependencies -->

<dependency>
Expand Down Expand Up @@ -372,9 +354,6 @@ under the License.

<!-- For legacy string expressions in Table API -->
<include>org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}</include>

<!-- ReflectionsUtil -->
<include>org.reflections:reflections</include>
</includes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -410,11 +389,6 @@ under the License.
<shadedPattern>org.apache.flink.table.shaded.com.jayway</shadedPattern>
</relocation>

<!-- Other table dependencies -->
<relocation>
<pattern>org.reflections</pattern>
<shadedPattern>org.apache.flink.table.shaded.org.reflections</shadedPattern>
</relocation>
<relocation>
<!-- icu4j's dependencies -->
<pattern>com.ibm.icu</pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,34 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;

import java.util.List;

import static org.apache.flink.table.planner.plan.nodes.exec.ExecNode.FIELD_NAME_TYPE;

/**
* The representation of execution information for a {@link FlinkPhysicalRel}.
*
* @param <T> The type of the elements that result from this node.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "class")
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = FIELD_NAME_TYPE,
visible = true)
@JsonTypeIdResolver(ExecNodeTypeIdResolver.class)
@Internal
public interface ExecNode<T> extends ExecNodeTranslator<T> {

String FIELD_NAME_ID = "id";
String FIELD_NAME_TYPE = "type";
String FIELD_NAME_DESCRIPTION = "description";
String FIELD_NAME_INPUT_PROPERTIES = "inputProperties";
String FIELD_NAME_OUTPUT_TYPE = "outputType";

/** Gets the ID of this node. */
@JsonProperty(value = FIELD_NAME_ID)
/** The unique ID of the node. */
@JsonProperty(value = FIELD_NAME_ID, index = 0)
int getId();

/** Returns a string which describes this node. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.plan.nodes.exec;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
Expand All @@ -31,6 +30,7 @@

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,54 +46,44 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class ExecNodeBase<T> implements ExecNode<T> {

/** The unique identifier for each ExecNode in the json plan. */
@JsonIgnore private final int id;
private final String description;

@JsonIgnore private final String description;
private final LogicalType outputType;

@JsonIgnore private final LogicalType outputType;
private final List<InputProperty> inputProperties;

@JsonIgnore private final List<InputProperty> inputProperties;
private List<ExecEdge> inputEdges;

@JsonIgnore private List<ExecEdge> inputEdges;
private transient Transformation<T> transformation;

@JsonIgnore private transient Transformation<T> transformation;
/** Holds the context information (id, name, version) as deserialized from a JSON plan. */
@JsonProperty(value = FIELD_NAME_TYPE, access = JsonProperty.Access.WRITE_ONLY)
private final ExecNodeContext context;

/** This is used to assign a unique ID to every ExecNode. */
private static Integer idCounter = 0;

/** Generate an unique ID for ExecNode. */
public static int getNewNodeId() {
idCounter++;
return idCounter;
}

/** Reset the id counter to 0. */
@VisibleForTesting
public static void resetIdCounter() {
idCounter = 0;
/**
* Retrieves the default context from the {@link ExecNodeMetadata} annotation to be serialized
* into the JSON plan.
*/
@JsonProperty(value = FIELD_NAME_TYPE, access = JsonProperty.Access.READ_ONLY, index = 1)
protected final ExecNodeContext getContextFromAnnotation() {
return ExecNodeContext.newContext(this.getClass()).withId(getId());
}

// used for json creator
protected ExecNodeBase(
int id,
ExecNodeContext context,
List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
this.id = id;
this.context = checkNotNull(context).withId(id);
this.inputProperties = checkNotNull(inputProperties);
this.outputType = checkNotNull(outputType);
this.description = checkNotNull(description);
}

protected ExecNodeBase(
List<InputProperty> inputProperties, LogicalType outputType, String description) {
this(getNewNodeId(), inputProperties, outputType, description);
}

@Override
public final int getId() {
return id;
return context.getId();
}

@Override
Expand Down Expand Up @@ -191,7 +181,7 @@ protected String getOperatorDescription(Configuration config) {
protected String getFormattedOperatorDescription(String description, Configuration config) {
if (config.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SIMPLIFY_OPERATOR_NAME_ENABLED)) {
return String.format("[%d]:%s", id, description);
return String.format("[%d]:%s", getId(), description);
}
return description;
}
Expand All @@ -200,7 +190,7 @@ protected String getFormattedOperatorName(
String detailName, String simplifiedName, Configuration config) {
if (config.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SIMPLIFY_OPERATOR_NAME_ENABLED)) {
return String.format("%s[%d]", simplifiedName, id);
return String.format("%s[%d]", simplifiedName, getId());
}
return detailName;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;

import javax.annotation.Nullable;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Helper class that holds the necessary identifier fields that are used for JSON plan serialization
* and deserialization. It is instantiated using {@link ExecNodeContext#newContext(Class)} when
* creating a new instance of an {@link ExecNode}, so that is contains the info from the {@link
* ExecNodeMetadata} annotation of the class with the latest {@link ExecNodeMetadata#version()}. It
* can also be instantiated with {@link ExecNodeContext#ExecNodeContext(String)} automatically when
* the {@link ExecNode} is deserialized from a JSON Plan, and in this case the {@link
* ExecNodeContext} contains the version that is read from the JSON Plan and not the latest one. The
* serialization format is {@code <name>_<version>}, see {@link ExecNodeContext#getTypeAsString()}.
*/
@Internal
public final class ExecNodeContext {

/** This is used to assign a unique ID to every ExecNode. */
private static final AtomicInteger idCounter = new AtomicInteger(0);

/** Generate an unique ID for ExecNode. */
public static int newNodeId() {
return idCounter.incrementAndGet();
}

/** Reset the id counter to 0. */
@VisibleForTesting
public static void resetIdCounter() {
idCounter.set(0);
}

private final Integer id;
private final String name;
private final Integer version;

private ExecNodeContext() {
this(null, null, null);
}

private ExecNodeContext(String name, Integer version) {
this(null, name, version);
}

/**
* @param id The unique id of the {@link ExecNode}. See {@link ExecNode#getId()}. It can be null
* initially and then later set by using {@link #withId(int)} which creates a new instance
* of {@link ExecNodeContext} since it's immutable. This way we can satisfy both the {@link
* ExecNodeBase#ExecNodeBase(int, ExecNodeContext, List, LogicalType, String)} ctor, which
* is used for the {@link JsonCreator} ctors, where the {@code id} and the {@code context}
* are read separately, and the {@link ExecNodeBase#getContextFromAnnotation()} which
* creates a new context with a new id provided by: {@link #newNodeId()}.
* @param name The name of the {@link ExecNode}. See {@link ExecNodeMetadata#name()}.
* @param version The version of the {@link ExecNode}. See {@link ExecNodeMetadata#version()}.
*/
private ExecNodeContext(@Nullable Integer id, String name, Integer version) {
this.id = id;
this.name = name;
this.version = version;
}

@JsonCreator
public ExecNodeContext(String value) {
this.id = null;
String[] split = value.split("_");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain the format in the class JavaDoc.

this.name = split[0];
this.version = Integer.valueOf(split[1]);
}

/** The unique identifier for each ExecNode in the JSON plan. */
int getId() {
return checkNotNull(id);
}

/** The type identifying an ExecNode in the JSON plan. See {@link ExecNodeMetadata#name()}. */
public String getName() {
return name;
}

/** The version of the ExecNode in the JSON plan. See {@link ExecNodeMetadata#version()}. */
public Integer getVersion() {
return version;
}

/**
* Set the unique ID of the node, so that the {@link ExecNodeContext}, together with the type
* related {@link #name} and {@link #version}, stores all the necessary info to uniquely
* reconstruct the {@link ExecNode}, and avoid storing the {@link #id} independently as a field
* in {@link ExecNodeBase}.
*/
public ExecNodeContext withId(int id) {
return new ExecNodeContext(id, this.name, this.version);
}

/**
* Returns the {@link #name} and {@link #version}, to be serialized into the JSON plan as one
* string, which in turn will be parsed by {@link ExecNodeContext#ExecNodeContext(String)} when
* deserialized from a JSON plan or when needed by {@link
* ExecNodeTypeIdResolver#typeFromId(DatabindContext, String)}.
*/
@JsonValue
public String getTypeAsString() {
return name + "_" + version;
}

@Override
public String toString() {
return getId() + "_" + getTypeAsString();
}

public static <T extends ExecNode<?>> ExecNodeContext newContext(Class<T> execNodeClass) {
ExecNodeMetadata metadata = ExecNodeMetadataUtil.latestAnnotation(execNodeClass);
if (metadata == null) {
if (!ExecNodeMetadataUtil.isUnsupported(execNodeClass)) {
throw new IllegalStateException(
String.format(
"ExecNode: %s is not listed in the unsupported classes since it is not annotated with: %s.",
execNodeClass.getCanonicalName(),
ExecNodeMetadata.class.getSimpleName()));
}
return new ExecNodeContext();
}
if (!ExecNodeMetadataUtil.execNodes().contains(execNodeClass)) {
throw new IllegalStateException(
String.format(
"ExecNode: %s is not listed in the supported classes and yet is annotated with: %s.",
execNodeClass.getCanonicalName(),
ExecNodeMetadata.class.getSimpleName()));
}
return new ExecNodeContext(metadata.name(), metadata.version());
}
}
Loading