Skip to content

[FLINK-30377][table-planner] applySinkProvider use ClientWrapperClassLoader while extracting Type from KeyedStream#21497

Merged
dannycranmer merged 1 commit intoapache:masterfrom
Samrat002:FLINK-30377
Jan 27, 2023
Merged

[FLINK-30377][table-planner] applySinkProvider use ClientWrapperClassLoader while extracting Type from KeyedStream#21497
dannycranmer merged 1 commit intoapache:masterfrom
Samrat002:FLINK-30377

Conversation

@Samrat002
Copy link
Contributor

@Samrat002 Samrat002 commented Dec 13, 2022

Brief change log

  • modifies applySinkProvider function to accept clssloader
  • use the classloader to set context for current thread in applySinkProvider

Verifying this change

/usr/lib/flink/bin/sql-client.sh -j /usr/lib/hudi/hudi-flink-bundle.jar
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-mapreduce/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/etc/tez/conf:/usr/lib/tez/hadoop-shim-0.10.2-amzn-0.jar:/usr/lib/tez/hadoop-shim-2.8-0.10.2-amzn-0.jar:/usr/lib/tez/lib:/usr/lib/tez/LICENSE:/usr/lib/tez/LICENSE-BSD-3clause:/usr/lib/tez/LICENSE-CDDLv1.1-GPLv2_withCPE:/usr/lib/tez/LICENSE-MIT:/usr/lib/tez/LICENSE-SIL_OpenFontLicense-v1.1:/usr/lib/tez/NOTICE:/usr/lib/tez/tez-api-0.10.2-amzn-0.jar:/usr/lib/tez/tez-aux-services-0.10.2-amzn-0.jar:/usr/lib/tez/tez-build-tools-0.10.2-amzn-0.jar:/usr/lib/tez/tez-common-0.10.2-amzn-0.jar:/usr/lib/tez/tez-dag-0.10.2-amzn-0.jar:/usr/lib/tez/tez-examples-0.10.2-amzn-0.jar:/usr/lib/tez/tez-history-parser-0.10.2-amzn-0.jar:/usr/lib/tez/tez-javadoc-tools-0.10.2-amzn-0.jar:/usr/lib/tez/tez-job-analyzer-0.10.2-amzn-0.jar:/usr/lib/tez/tez-mapreduce-0.10.2-amzn-0.jar:/usr/lib/tez/tez-protobuf-history-plugin-0.10.2-amzn-0.jar:/usr/lib/tez/tez-runtime-internals-0.10.2-amzn-0.jar:/usr/lib/tez/tez-runtime-library-0.10.2-amzn-0.jar:/usr/lib/tez/tez-tests-0.10.2-amzn-0.jar:/usr/lib/tez/tez-ui-0.10.2-amzn-0.war:/usr/lib/tez/tez-yarn-timeline-cache-plugin-0.10.2-amzn-0.jar:/usr/lib/tez/tez-yarn-timeline-history-0.10.2-amzn-0.jar:/usr/lib/tez/tez-yarn-timeline-history-with-acls-0.10.2-amzn-0.jar:/usr/lib/tez/tez-yarn-timeline-history-with-fs-0.10.2-amzn-0.jar:/usr/lib/tez/lib/async-http-client-2.12.3.jar:/usr/lib/tez/lib/commons-cli-1.2.jar:/usr/lib/tez/lib/commons-codec-1.11.jar:/usr/lib/tez/lib/commons-collections4-4.1.jar:/usr/lib/tez/lib/commons-io-2.8.0.jar:/usr/lib/tez/lib/commons-lang-2.6.jar:/usr/lib/tez/lib/guava-31.1-jre.jar:/usr/lib/tez/lib/hadoop-annotations.jar:/usr/lib/tez/lib/hadoop-auth.jar:/usr/lib/tez/lib/hadoop-hdfs-client-3.3.3-amzn-0.jar:/usr/lib/tez/lib/hadoop-mapreduce-client-common-3.3.3-amzn-0.jar:/usr/lib/tez/lib/hadoop-mapreduce-client-common.jar:/usr/lib/tez/lib/hadoop-mapreduce-client-core-3.3.3-amzn-0.jar:/usr/lib/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.3.3-amzn-0.jar:/usr/lib/tez/lib/hadoop-yarn-server-web-proxy.jar:/usr/lib/tez/lib/hhadoop-mapreduce-client-core.jar:/usr/lib/tez/lib/javax.servlet-api-3.1.0.jar:/usr/lib/tez/lib/jersey-client-1.19.jar:/usr/lib/tez/lib/jersey-json-1.19.jar:/usr/lib/tez/lib/jettison-1.3.4.jar:/usr/lib/tez/lib/jsr305-3.0.0.jar:/usr/lib/tez/lib/metrics-core-3.1.0.jar:/usr/lib/tez/lib/netty-all-4.1.72.Final.jar:/usr/lib/tez/lib/protobuf-java-2.5.0.jar:/usr/lib/tez/lib/RoaringBitmap-0.7.45.jar:/usr/lib/tez/lib/slf4j-api-1.7.36.jar:/usr/lib/hadoop-lzo/lib/hadoop-lzo-0.4.19.jar:/usr/lib/hadoop-lzo/lib/hadoop-lzo.jar:/usr/lib/hadoop-lzo/lib/native:/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle-1.12.331.jar:/usr/share/aws/aws-java-sdk/LICENSE.txt:/usr/share/aws/aws-java-sdk/NOTICE.txt:/usr/share/aws/aws-java-sdk/README.md:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/animal-sniffer-annotations-1.14.jar:/usr/share/aws/emr/emrfs/lib/annotations-16.0.2.jar:/usr/share/aws/emr/emrfs/lib/aopalliance-1.0.jar:/usr/share/aws/emr/emrfs/lib/bcprov-ext-jdk15on-1.66.jar:/usr/share/aws/emr/emrfs/lib/checker-qual-2.5.2.jar:/usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.54.0.jar:/usr/share/aws/emr/emrfs/lib/error_prone_annotations-2.1.3.jar:/usr/share/aws/emr/emrfs/lib/findbugs-annotations-3.0.1.jar:/usr/share/aws/emr/emrfs/lib/ion-java-1.0.2.jar:/usr/share/aws/emr/emrfs/lib/j2objc-annotations-1.1.jar:/usr/share/aws/emr/emrfs/lib/javax.inject-1.jar:/usr/share/aws/emr/emrfs/lib/jmespath-java-1.12.331.jar:/usr/share/aws/emr/emrfs/lib/jsr305-3.0.2.jar:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/cloudwatch-sink-2.3.0.jar:/usr/share/aws/emr/cloudwatch-sink/lib/cloudwatch-sink.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2022-12-13 06:32:55,021 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /var/lib/flink/yarn/.yarn-properties-hadoop.
2022-12-13 06:32:55,021 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /var/lib/flink/yarn/.yarn-properties-hadoop.

                                   ????????
                               ????????????????
                            ???????        ???????  ?
                          ????   ?????????      ?????
                          ???         ???????    ?????
                            ???            ???   ?????
                              ??       ???????????????
                            ?? ?   ???       ?????? ?????
                            ?????   ????      ????? ?????
                         ???????       ???    ??????? ???
                   ????????? ??         ??    ??????????
                  ????????  ??           ?   ?? ???????
                ????  ???            ?  ?? ???????? ?????
               ???? ? ??          ? ?? ????????    ????  ??
              ???? ????          ??????????       ??? ?? ????
           ???? ?? ???       ???????????         ????  ? ?  ???
           ???  ?? ??? ?????????              ????           ???
           ??    ? ???????              ????????          ??? ??
           ???    ???    ????????????????????            ????  ?
          ????? ???   ??????   ????????                  ????  ??
          ????????  ???????????????                            ??
          ?? ????   ???????  ???       ??????    ??          ???
          ??? ???  ???  ???????            ????   ?????????????
           ??? ?????  ????  ??                ??      ????   ???
           ??   ???   ?     ??                ??              ??
            ??   ??         ??                 ??        ????????
             ?? ?????       ??                  ???????????    ??
              ??   ????      ?                    ???????      ??
               ???   ?????                         ?? ???????????
                ????    ????                     ??????? ????????
                  ?????                          ??  ????  ?????
                      ?????????????????????????????????  ?????

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /home/hadoop/.flink-sql-history

Flink SQL> CREATE TABLE IF NOT EXISTS hudi_flink_sql_mor(
>     uuid VARCHAR(20),
>     name VARCHAR(10),
>     age INT,
>     ts TIMESTAMP(3),
>     `partition` VARCHAR(20)
> )
>

Flink SQL> CREATE TABLE IF NOT EXISTS hudi_flink_sql_mor(
>      uuid VARCHAR(20),
>      name VARCHAR(10),
>      age INT,
>      ts TIMESTAMP(3),
>      `partition` VARCHAR(20)
>  )
>  PARTITIONED BY (`partition`)
>  WITH (
>    'connector' = 'hudi',
>    'path' = 's3://table/hudi-flink-tables/hudi_flink_sql_mor_0/hudi_flink_sql_mor',
>    'hive_sync.enable' = 'true',
>    'hive_sync.mode' = 'hms',
>    'hive_sync.table' = 'hudi_flink_sql_mor',
>    'hive_sync.db' = 'default',
>    'compaction.delta_commits' = '1',
>    'hive_sync.metastore.uris' = 'thrift://ip-172-1-2-3.us-west-210.compute.internal:9083',
>    'hive_sync.partition_fields' = 'partition',
>    'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
>    'table.type' = 'MERGE_ON_READ'
>  );
[INFO] Execute statement succeed.

Flink SQL> INSERT INTO hudi_flink_sql_mor VALUES
>      ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
>      ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
>      ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
>      ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
>      ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
>      ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
>      ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
>      ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
[INFO] Submitting SQL update statement to the cluster...
2022-12-13 06:33:58,717 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at ip-172-1-2-3.us-west-210.compute.internal/172.1.2.3:8032
2022-12-13 06:33:58,896 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at ip-172-1.2.3.us-west-210.compute.internal/172.1.2.3:10200
2022-12-13 06:33:58,904 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-12-13 06:33:58,906 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2022-12-13 06:33:59,009 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface ip-172-1.2.3.us-west-210.compute.internal:39759 of application 'application_1670912378850_0001'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 27f05875b6aae41c57c0f7efa9f0d179

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) no
  • The serializers: (yes / no / don't know) no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
  • The S3 file system connector: (yes / no / don't know) no

Documentation

  • Does this pull request introduce a new feature? (yes / no) no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) no

@Samrat002 Samrat002 marked this pull request as draft December 13, 2022 05:32
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 13, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

throw new TableException("Unsupported sink runtime provider.");
ExecNodeConfig config,
ClassLoader classLoader) {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use TemporaryClassLoaderContext since this seems to be the standard way to flip out the classloader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes ! made changes accordingly
Please review the updated changes

…pperClassLoader while extracting Type from KeyedStream
@Samrat002
Copy link
Contributor Author

@dannycranmer please review the updated changes ^

@dannycranmer
Copy link
Contributor

LGTM, thanks @Samrat002

@dannycranmer dannycranmer merged commit 18a0ea9 into apache:master Jan 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants