Skip to content

Conversation

@xishuaidelin
Copy link
Contributor

What is the purpose of the change

  • This pull request Extend the CompiledPlan to read from/write to Flink's FileSystem which contributes to the FLINK-31791

Brief change log

  • *Modify compiledPlanOperation and executePlanOperation to support flink's filesystem which could be verified by sqlddl(other)OperationConverterTest *
  • Modify Planreference and InternalPlan to support the path of flink's filesystem
  • Modify TableEnv#compilePlanAndWrite and verify this for TableEnvTest#testCompileAndExecutePlanWithFlinkFilesystem

Verifying this change

This change could be verified by the testSqlExecutePlanPath in SqlOtherToOperationConverterTest, testSqlcompilePlanPath in SqlOtherToOperationConverterTest and testCompileAndExecutePlanWithFlinkFilesystem in TableEnvironmentTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented May 6, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@LadyForest LadyForest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @xishuaidelin, thanks for your contribution.
I have briefly reviewed the code, which may be inaccurate, but the interface should remain the same. What do you think?

@xishuaidelin
Copy link
Contributor Author

@flinkbot run azure

Copy link
Contributor

@LadyForest LadyForest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @xishuaidelin, thanks for your update.
I think we may reuse org.apache.flink.table.resource.ResourceManager to take over the URI check and resource download/cleanup. You can take the impl for the CREATE FUNCTION USING JAR statement as a reference.

@xishuaidelin
Copy link
Contributor Author

@flinkbot run azure

Copy link
Contributor

@LadyForest LadyForest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your update.

However, it doesn't work as expected.

I tried on the s3 & local file systems, but both failed; please check.
image
image

filePath, TableConfigOptions.PLAN_FORCE_RECOMPILE.key()));
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this back

if (ifNotExists) {
return loadPlan(PlanReference.fromFile(filePath));
if (fs.isDistributedFS()) {
URL localUrl = resourceManager.downloadResource(filePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unnatural to do so. Can we wrap up this logic?

@LadyForest
Copy link
Contributor

LadyForest commented May 28, 2023

By the way, you can evaluate the param table.resources.download-dir by the command SET; to verify that the remote plan is downloaded under this dir and is cleaned up when the session closes.

Nit: you can use minio to mock s3


compiledPlan.writeToFile(file, false);
compiledPlan.writeToFile(localPath, false);
resourceManager.updateFilePath(filePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but I think we should not and probably cannot write files to a remote FileSystem.

Although users have configured related conf in flink-conf.yaml, such as accessKey and accessSecret, having permission to read files under a bucket does not mean having the same write permission.

If users feel it is necessary, they can upload files manually. Implementing this through the framework may have many limitations.

}

/**
* register the filePath of flink filesystem. If it is remote filesystem and the file exists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: JavaDoc comments should start with the capitalized sentence. You can take https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html
as a ref.

Copy link
Contributor

@LadyForest LadyForest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your update. However, the code quality is not yet at a level where it can be merged. As time is tight, I will take over your work and continue to make improvements. Can we review the changes together at a later time?

sql = String.format(
"COMPILE and EXECUTE plan '%s' FOR INSERT INTO MySink SELECT * FROM MyTable",
path)
tableEnv.executeSql(sql)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this test? It does not assert anything.

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
execEnv.setParallelism(1)
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(execEnv, settings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of re-creating a tableEnv?

"Child operation of CompileAndExecuteOperation must be either a "
+ "ModifyOperation or a StatementSetOperation.");
this.filePath = filePath;
this.filePath = new Path(filePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we don't need to change this.

public class ExecutePlanOperation implements Operation {

private final String filePath;
private final Path filePath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

operation instanceof StatementSetOperation || operation instanceof ModifyOperation,
"child operation of CompileOperation must be either a ModifyOperation or a StatementSetOperation");
this.filePath = filePath;
this.filePath = new Path(filePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public void runSQL(String sqlPath, Map<String, String> varsMap) throws Exception {
try (ClusterController clusterController = flink.startCluster(1)) {
List<String> sqlLines = initializeSqlLines(sqlPath, varsMap);
executeSqlStatements(clusterController, sqlLines);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why duplicate the method and assert nothing?

import java.util.Map;

/** End-to-End tests for compile and execute remote file. */
public class CompileAndExecuteRemoteFileITCase extends SqlITCaseBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test shares the same HDFS configuration with UsingRemoteJarITCase. Maybe deserve a common base class

MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();

hdPath = new org.apache.hadoop.fs.Path("/test.json");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the purpose of declaring this variable is to delete this path in the after method, then it can be turned into a local variable in after.

hdfs = hdPath.getFileSystem(hdConf);

} catch (Throwable e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good practice to do so...


@Test
public void testCompilePlanRemoteFile() throws Exception {
runSQL("compile_plan_use_remote_file_e2e.sql", generateReplaceVars());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not assert anything...

* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET execution.runtime-mode = $MODE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COMPILE PLAN statement is not supported under batch mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants