Skip to content

load hive-site.xml for flink catalog#1558

Closed
zhangjun0x01 wants to merge 3 commits intoapache:masterfrom
zhangjun0x01:load_hive_conf
Closed

load hive-site.xml for flink catalog#1558
zhangjun0x01 wants to merge 3 commits intoapache:masterfrom
zhangjun0x01:load_hive_conf

Conversation

@zhangjun0x01
Copy link
Contributor

In order to be compatible with various flink submission modes, FlinkCatalogFactory provides a variable hive-site-path to specify the path of hive-site.xml, which can be a local path or other no local storage

this is for issue #1437

I submitted a pr (#1527) before, but it contained some conflicts and included other PRs. I did not handle it well, so I close it and open a new PR.

@rdblue could you help review it and give me some suggestions ? thank you

@kbendick Thanks for your suggestions,I have update the code according to your suggestion

String scheme = path.toUri().getScheme();
if (scheme == null) {
// for case : /tmp/hive-site.xml
return HIVE_SITE_SCHEME_FILE;
Copy link
Contributor

@kbendick kbendick Oct 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most projects I've worked on, it's possible to set the default scheme. I know this is true for Flink as well, which has fs.default-scheme, which is used for paths that don't have a scheme specified.

Given that this is specifically for Flink, would it make more sense to fall back to that configuration value if it's available? Perhaps one of the more regular Flink contributors could chime in here. I'd be more likely to default to file as the scheme over HDFS, but I run all of my Flink deployments in containers so I'm likely biased.

String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
switch (catalogType) {
case "hive":
case ICEBERG_CATALOG_TYPE_HIVE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes seem unrelated to loading hive-site.xml. Could you remove them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I think kbendick's suggestion is right. We should extract some constants, but it is unrelated to load hive-site.xml. I will remove them. If necessary ,we can open another PR.

properties.add(HADOOP_WAREHOUSE_LOCATION);
properties.add(DEFAULT_DATABASE);
properties.add(BASE_NAMESPACE);
properties.add(HIVE_SITE_PATH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand, this is trying to add the hive site path to the catalog's configuration properties? Why not load it from the environment? That's the typical way to load these configuration files.

}
}

private void downloadFromHdfs(Configuration configuration, Path hdfsHiveSitePath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never seen code like this needed. Usually, hive-site.xml is loaded from the classpath. Here's some code that does it from our Spark build:

    val configFile = someClassLoader.getResource("hive-site.xml")
    if (configFile != null) {
      hadoopConfiguration.addResource(configFile)
    }

config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, isHadoopCatalog ? "hadoop" : "hive");
config.put(FlinkCatalogFactory.HADOOP_WAREHOUSE_LOCATION, "file:" + warehouse);
String path = this.getClass().getClassLoader().getResource("hive-site.xml").getPath();
config.put(FlinkCatalogFactory.HIVE_SITE_PATH, path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this only works because the file is in the local FS for tests, and the FS is assumed to be "file" when there is no scheme. I don't think this is right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refer to the practice in flink, if no scheme is specified, such as /tmp/abc, it will be converted to a local file path according to different operating system

@rdblue
Copy link
Contributor

rdblue commented Oct 8, 2020

@zhangjun0x01, @JingsongLi, how does Flink normally handle hive-site.xml files? This seems strange to me, so I'd like to understand if this is something that Flink would normally do.

@zhangjun0x01
Copy link
Contributor Author

@zhangjun0x01, @JingsongLi, how does Flink normally handle hive-site.xml files? This seems strange to me, so I'd like to understand if this is something that Flink would normally do.

hi,@rdblue:
As far as I know, flink provides a hiveConfDir to get hive-site.xml, it is indeed no problem before flink 1.11, but flink 1.11 provides an application mode to submit flink job, in this case, the user jar is parsed in the jobmanager node of flink, but this jobmanager node is very likely to have no hive-site.xml file. If we provide a local path to get hive-site.xml, the program will not find it in application mode , so I thought of a way to be compatible with flink's various submission modes . We provide a local or hdfs path to get hive-site.xml.I described possible problems in the #1437

I think this should also be a issue with flink. What do you think?@JingsongLi

@JingsongLi
Copy link
Contributor

JingsongLi commented Oct 9, 2020

Hi @rdblue @zhangjun0x01

  • Spark has a built-in hive integration. So its hadoop conf contains hive-site.xml. (As @rdblue said, in SharedState.loadHiveConfFile)
  • But in Flink, Hive integration is optional. So the HadoopUtils.getHadoopConfiguration(..) not contains hive-site.xml. So just like @zhangjun0x01 said, flink-connector-hive provides a simple way, users need to pass a hiveConfDir to Flink HiveCatalog. (Now the hiveConfDir must be a local path)

I created https://issues.apache.org/jira/browse/FLINK-19541 for track it.

Personally, I think we can provide two way in Flink:

  • From hiveConfDir, it should be URI which contains scheme and full path. This means it can be loaded by Hadoop Filesystem.
  • From the classpath, just like Spark do.

@zhangjun0x01 zhangjun0x01 force-pushed the load_hive_conf branch 2 times, most recently from 9a77514 to dfa85e3 Compare October 10, 2020 09:12
@zhangjun0x01
Copy link
Contributor Author

hi:
@rdblue ,@JingsongLi , I update the PR to load hive-site.xml from path first. If the user does not provide the path, then load it from the classpath. Do you think any problems?

@openinx openinx added this to the Java 0.10.0 Release milestone Oct 10, 2020
@openinx
Copy link
Member

openinx commented Oct 10, 2020

I think this patch should be included in the next release 0.10.0, so I marked it in the milestone. I will take a look .

@rdblue
Copy link
Contributor

rdblue commented Oct 10, 2020

From hiveConfDir, it should be URI which contains scheme and full path. This means it can be loaded by Hadoop Filesystem.

Can you help me understand Flink's behavior a bit more?

  • If hiveConfDir is a Flink option, then why doesn't Flink automatically load the hive-site.xml file when it creates a Configuration? It doesn't seem like this should be delegated to libraries that run inside Flink. I would expect Flink to create the Configuration based on application options and pass that to libraries.
  • If Flink allows the user to set a separate hiveConfDir, why is that not just add that directory to the classpath so that hive-site.xml can be loaded like normal? That's what happens in most Hadoop setup scripts: extra config folders are added to the classpath so that the regular method of loading config files works.

@zhangjun0x01
Copy link
Contributor Author

hi,@rdblue:
I don't know much about the original design idea of flink, I just talk about my views.

In order to use Hadoop features (such as deploying flink to yarn cluster), flink needs to load the classpath of hadoop, but hive is not necessary for flink to deploy, so there is no need to load hive when loading the Hadoop Configuration.

In addition, flink provides a unified catalog api to access external systems. Hivecatlog, jdbccatalog, etc. are all sub class used to access different external systems, so hive is not a plugin or extension of flink. Therefore, we cannot load hive into the startup script of flink when flink is started.

I don't know if my understanding is correct. @JingsongLi

@openinx
Copy link
Member

openinx commented Oct 12, 2020

Can you help me understand Flink's behavior a bit more?

@rdblue , you may want to read this flink document. There's a application mode in flink, means users could just upload the iceberg-flink-runtime.jar to HDFS, and then start the command like the following:

bin/flink run-application -p 5 \
-d \
-t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://hadoopcluster/data/flink/libs/" \
hdfs://hadoopcluster/data/flink/user-lib/flink-kafka-iceberg.jar

it will bootstrap a separate flink cluster on yarn for the current submission, and the started flink jobmanager will execute the main() entry from the user-provided jar. In that case, the classpath or environment inside yarn container should not be modifiable by the user-provided jar.

Providing a flink config key to indicate the hive-site URI sounds good to me. then we could submit the flink job in application mode like following:

bin/flink run-application -p 5 \
-d \
-t yarn-application \
-D iceberg.hive-site.path=hdfs://config/hive-site.xml \
-yD yarn.provided.lib.dirs="hdfs://hadoopcluster/data/flink/libs/" \
hdfs://hadoopcluster/data/flink/user-lib/flink-kafka-iceberg.jar

Above we are talking about a Flink Datastream job. For Flink SQL job, we could do the similar thing like HiveCatalog in flink:

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'hive-conf-dir'='hdfs://config/hive-site.xml '
);

For most cases, we could just set the warehouse instead of specifying a hive-conf-dir as Ryan suggest:

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://data-dir '
);

@zhangjun0x01
Copy link
Contributor Author

hi,@rdblue,@openinx:
I add a warehouse for hivecatalog (flink,spark),If the user provides a warehouse, it will overwrite the value in hive-site.xml,
could you help review it again ? thanks

@JingsongLi
Copy link
Contributor

From hiveConfDir, it should be URI which contains scheme and full path. This means it can be loaded by Hadoop Filesystem.

Can you help me understand Flink's behavior a bit more?

  • If hiveConfDir is a Flink option, then why doesn't Flink automatically load the hive-site.xml file when it creates a Configuration? It doesn't seem like this should be delegated to libraries that run inside Flink. I would expect Flink to create the Configuration based on application options and pass that to libraries.
  • If Flink allows the user to set a separate hiveConfDir, why is that not just add that directory to the classpath so that hive-site.xml can be loaded like normal? That's what happens in most Hadoop setup scripts: extra config folders are added to the classpath so that the regular method of loading config files works.

Why not automatically load the hive-site.xml file when it creates a Configuration?
Flink just want to get Hadoop Configuration for connecting Yarn and reading HDFS.
The hive connector is only an optional connector of Flink, it is not built-in, so the read of built-in Hadoop config will not process hive configuration (hive-site.xml). And the hiveConfDir is in Hive connector, it should be called explicitly.

Why is that not just add that directory to the classpath so that hive-site.xml can be loaded like normal?
Now, the only way is system env, just like:

String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
  loadConfs(hadoopHome + "/conf");
  loadConfs(hadoopHome + "/etc/hadoop");
}

zhangjun added 2 commits October 13, 2020 15:00
load hive-site.xml from path,then from classpath

fix checkstyle

fix checkstyle
update comment

fix test code
public static final String HADOOP_WAREHOUSE_LOCATION = "warehouse";

public static final String HIVE_SITE_PATH = "hive-site-path";
public static final String HIVE_SITE_SCHEME_FILE = "file";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we plan to support hadoop path , would we still need the file or hdfs schema ? Just load those configurations files by hadoop filesystem ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this, if we specify a remote path, as the following code .

    Configuration conf = new Configuration();   
    Path path = new Path("hdfs://localhost/data/flink/conf/hive-site.xml");
    conf.addResource(path);
    String warehouseLocation = conf.get("hive.metastore.warehouse.dir");
    System.out.println(warehouseLocation);

It will not be able to get the attributes in hive-site.xml, the result is null.

If you specify a local path,

 Path path = new Path("file:///tmp/hive-site.xml");

we can get it, so I use this scheme to make some judgments. If it is a non-local path, download it to the local first, and then load it.

public static final String HIVE_CLIENT_POOL_SIZE = "clients";
public static final String HADOOP_WAREHOUSE_LOCATION = "warehouse";

public static final String HIVE_SITE_PATH = "hive-site-path";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we align with the HiveCatalog in flink by using hive-conf-dir ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,I will update it

@rdblue
Copy link
Contributor

rdblue commented Oct 13, 2020

Okay, I understand why Flink won't load hive-site.xml: because Flink doesn't have direct Hive integration and that is the application's responsibility. Then, it also makes sense that the user might be creating a catalog using SQL, and would have no ability to set up the Hive config directly.

What I don't understand is why Iceberg should do anything other than ensure that hive-site.xml is added as a default resource from the classpath.

Most configuration for the HiveCatalog should come from catalog options, not from hive-site.xml. That config file would be primarily used for global defaults, so it makes more sense to me for it to be a default resource that is shared across all catalogs. If it is shared, then there is no need to load a path that is specific to each catalog. Would it be valuable to support multiple hive-site.xml configs, one per catalog?

If Hive only tuning defaults are controlled by hive-site.xml, then why not load it the normal way from the application classpath? That gives the user a way to add folders (by adding to the classpath) or to bundle the config file into their application Jar. When an application's Jars are being localized, the config file could be handled just like other classpath artifacts. That's the normal way of distributing config files.

In summary, unless we need catalog-specific Hive configs, I don't see why we wouldn't use the "normal" ways to distribute and add a hive-site.xml file. It doesn't seem like something we need Iceberg to handle, beyond ensuring that the hive-site.xml from the classpath is added as a default resource. Is there a use case that I'm misunderstanding?

@zhangjun0x01
Copy link
Contributor Author

Let me talk about my thoughts. in flink, the use of hive is only the behavior of the application, so hive-site.xml and hive jars are not loaded at flink startup, and are loaded by the flink application.
If the user's flink program is a jar, we can include hive-site.xml into the jar , but for users who use flink sql client, this may not be easy to implement. If we want to load hive-site.xml into the classpath and as a global configuration, this can only be loaded by flink at startup, but this is contrary to the design of flink.
In addition, if a user's application loads hive-site.xml to the classpath, and then uses it as a shared global configuration for other applications, I think this may not be easy to implement.

@openinx
Copy link
Member

openinx commented Oct 15, 2020

What I don't understand is why Iceberg should do anything other than ensure that hive-site.xml is added as a default resource from the classpath.

For flink SQL, we usually create only one iceberg-flink-runtime.jar and make it in the flink classpath. Different users would use the same flink distribution to run their flink sql jobs, if the iceberg-flink-runtime.jar is bounded with the specific hive-site.xml in its resources, then how could different users use the same distribution to access different hive metastore ? That would require different users to build different distribution ?

For hadoop configurations, we provide HADOOP_HOME environment , or fs.hdfs.hadoopconf config keys in flink configuration file , or HADOOP_CONF_DIR environment to load hadoop(hdfs-site.xml, core-site.xml) configurations, then it won't have the hive config issues. I created a patch to provide the similar behavior here : https://github.com/apache/iceberg/pull/1586/files#diff-dfee8e9c94fb35806da6eea03a18614d2c5ad778563749493452829bcaec7cc1R95.

@zhangjun0x01 , for uploading the configurations files to hdfs and downloading & loading it for flink stream job in application mode, I'm thinking that it's over designed now. Besides the hive-site.xml, hadoop configurations are loaded either from environment or classpath or path configured in flink conf file, would the flink module in iceberg also need to download those and loading them ? That does not make sense. A better way is following the current flink design , bundled the hive-site.xml and other related config files into your flink datastream jar, and upload it to flink cluster. Flink DataStream job submission is more flexible that Flink SQL, it make sense to build a separate bundled jar per job.

I provided a more reasonable patch (#1586) to handle hive-site.xml (As the flink 0.10.0 release is coming, and we hope to resolve this thing as soon as possible, so I pull requested the patch for the same issue, your discussion is valuable @zhangjun0x01 , hope you don't mind, Thanks).

@zhangjun0x01
Copy link
Contributor Author

@openinx it don't matter

@rdblue
Copy link
Contributor

rdblue commented Oct 21, 2020

It sounds like we should move forward with #1586, so I'm going to close this one. Thanks for the additional information to explain why we should add a conf dir property to catalogs. I think the logic in #1586 makes sense.

@rdblue rdblue closed this Oct 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants