Skip to content
2 changes: 1 addition & 1 deletion docs/content/filesystems/hdfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ You may not have to do anything, if you are in a hadoop environment. Otherwise p
configure your HDFS:

1. Set environment variable `HADOOP_HOME` or `HADOOP_CONF_DIR`.
2. Configure `'fs.hdfs.hadoopconf'` in the paimon catalog.
2. Configure `'hadoop-conf-dir'` in the paimon catalog.

The first approach is recommended.

Expand Down
2 changes: 2 additions & 0 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You sho

The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs://path/to/warehouse`. In addition, metadata is also stored in Hive metastore.

If your Hive requires security authentication such as Kerberos, LDAP, Ranger and so on. You can specify the hive-conf-dir parameter to the hive-site.xml file path.

```sql
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
Expand Down
6 changes: 6 additions & 0 deletions docs/content/maintenance/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ Options for paimon catalog.

{{< generated/catalog_configuration >}}

### HiveCatalogOptions

Options for Hive catalog.

{{< generated/hive_catalog_configuration >}}

### FlinkConnectorOptions

Flink connector options for paimon.
Expand Down
24 changes: 24 additions & 0 deletions docs/layouts/shortcodes/generated/hive_catalog_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>hadoop-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>File directory of the core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml. Currently, only local file system paths are supported.</td>
</tr>
<tr>
<td><h5>hive-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>File directory of the hive-site.xml , used to create HiveMetastoreClient and security authentication, such as Kerberos, LDAP, Ranger and so on</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class HadoopUtils {
public static final String HADOOP_CONF_ENV = "HADOOP_CONF_DIR";

/** Path to Hadoop configuration. */
public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
public static final String PATH_HADOOP_CONFIG = "hadoop-conf-dir";

public static Configuration getHadoopConfiguration(Options options) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void loadOverlappingConfig() throws Exception {
final String k5 = "key5";

final String v1 = "from HADOOP_CONF_DIR";
final String v2 = "from Paimon config `fs.hdfs.hadoopconf`";
final String v2 = "from Paimon config `hadoop-conf-dir`";
final String v4 = "from HADOOP_HOME/etc/hadoop";
final String v5 = "from HADOOP_HOME/conf";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation(
"paimon-flink/paimon-flink-common", "org.apache.paimon.flink"),
new OptionsClassLocation(
"paimon-flink/paimon-flink-common", "org.apache.paimon.flink.kafka")
"paimon-flink/paimon-flink-common", "org.apache.paimon.flink.kafka"),
new OptionsClassLocation(
"paimon-hive/paimon-hive-catalog", "org.apache.paimon.hive")
};
static final String DEFAULT_PATH_PREFIX = "src/main/java";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.TableType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.hive.LegacyHiveClasses;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -53,7 +52,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -68,6 +74,7 @@
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;

/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
Expand All @@ -83,13 +90,15 @@ public class HiveCatalog extends AbstractCatalog {
private static final String STORAGE_HANDLER_CLASS_NAME =
"org.apache.paimon.hive.PaimonStorageHandler";

public static final String HIVE_SITE_FILE = "hive-site.xml";

private final HiveConf hiveConf;
private final String clientClassName;
private final IMetaStoreClient client;

public HiveCatalog(FileIO fileIO, Configuration hadoopConfig, String clientClassName) {
public HiveCatalog(FileIO fileIO, HiveConf hiveConf, String clientClassName) {
super(fileIO);
this.hiveConf = new HiveConf(hadoopConfig, HiveConf.class);
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
this.client = createClient(hiveConf, clientClassName);
}
Expand Down Expand Up @@ -521,9 +530,100 @@ static IMetaStoreClient createClient(HiveConf hiveConf, String clientClassName)
} catch (Exception e) {
throw new RuntimeException(e);
}
return StringUtils.isNullOrWhitespaceOnly(
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
return isNullOrWhitespaceOnly(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
? client
: HiveMetaStoreClient.newSynchronizedClient(client);
}

public static HiveConf createHiveConf(
@Nullable String hiveConfDir, @Nullable String hadoopConfDir) {
// create HiveConf from hadoop configuration with hadoop conf directory configured.
Configuration hadoopConf = null;
if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
if (hadoopConf == null) {
String possiableUsedConfFiles =
"core-site.xml | hdfs-site.xml | yarn-site.xml | mapred-site.xml";
throw new RuntimeException(
"Failed to load the hadoop conf from specified path:" + hadoopConfDir,
new FileNotFoundException(
"Please check the path none of the conf files ("
+ possiableUsedConfFiles
+ ") exist in the folder."));
}
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
}
// ignore all the static conf file URLs that HiveConf may have set
HiveConf.setHiveSiteLocation(null);
HiveConf.setLoadMetastoreConfig(false);
HiveConf.setLoadHiveServer2Config(false);
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);

LOG.info("Setting hive conf dir as {}", hiveConfDir);

if (hiveConfDir != null) {
org.apache.hadoop.fs.Path hiveSite =
new org.apache.hadoop.fs.Path(hiveConfDir, HIVE_SITE_FILE);
if (!hiveSite.toUri().isAbsolute()) {
hiveSite = new org.apache.hadoop.fs.Path(new File(hiveSite.toString()).toURI());
}
try (InputStream inputStream = hiveSite.getFileSystem(hadoopConf).open(hiveSite)) {
hiveConf.addResource(inputStream, hiveSite.toString());
// trigger a read from the conf to avoid input stream is closed
isEmbeddedMetastore(hiveConf);
} catch (IOException e) {
throw new RuntimeException(
"Failed to load hive-site.xml from specified path:" + hiveSite, e);
}
hiveConf.addResource(hiveSite);
}

return hiveConf;
}

public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
}

/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
*
* @param hadoopConfDir Hadoop conf directory path.
* @return A Hadoop configuration instance.
*/
public static Configuration getHadoopConfiguration(String hadoopConfDir) {
if (new File(hadoopConfDir).exists()) {
List<File> possiableConfFiles = new ArrayList<File>();
File coreSite = new File(hadoopConfDir, "core-site.xml");
if (coreSite.exists()) {
possiableConfFiles.add(coreSite);
}
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
if (hdfsSite.exists()) {
possiableConfFiles.add(hdfsSite);
}
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
if (yarnSite.exists()) {
possiableConfFiles.add(yarnSite);
}
// Add mapred-site.xml. We need to read configurations like compression codec.
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
if (mapredSite.exists()) {
possiableConfFiles.add(mapredSite);
}
if (possiableConfFiles.isEmpty()) {
return null;
} else {
Configuration hadoopConfiguration = new Configuration();
for (File confFile : possiableConfFiles) {
hadoopConfiguration.addResource(
new org.apache.hadoop.fs.Path(confFile.getAbsolutePath()));
}
return hadoopConfiguration;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.utils.Preconditions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;

import static org.apache.paimon.hive.HiveCatalog.createHiveConf;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;

/** Factory to create {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {

private static final String IDENTIFIER = "hive";

private static final ConfigOption<String> METASTORE_CLIENT_CLASS =
ConfigOptions.key("metastore.client.class")
.stringType()
Expand All @@ -60,12 +62,17 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
+ IDENTIFIER
+ " catalog");

Configuration hadoopConfig = new Configuration();
context.options().toMap().forEach(hadoopConfig::set);
hadoopConfig.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
hadoopConfig.set(
HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse.toUri().toString());
String hiveConfDir = context.options().get(HIVE_CONF_DIR);
String hadoopConfDir = context.options().get(HADOOP_CONF_DIR);
HiveConf hiveConf = createHiveConf(hiveConfDir, hadoopConfDir);

// always using user-set parameters overwrite hive-site.xml parameters
context.options().toMap().forEach(hiveConf::set);
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse.toUri().toString());

String clientClassName = context.options().get(METASTORE_CLIENT_CLASS);

return new HiveCatalog(fileIO, hadoopConfig, context.options().get(METASTORE_CLIENT_CLASS));
return new HiveCatalog(fileIO, hiveConf, clientClassName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.hive;

import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;

/** Options for hive catalog. */
public final class HiveCatalogOptions {

public static final String IDENTIFIER = "hive";

public static final ConfigOption<String> HIVE_CONF_DIR =
ConfigOptions.key("hive-conf-dir")
.stringType()
.noDefaultValue()
.withDescription(
"File directory of the hive-site.xml , used to create HiveMetastoreClient and security authentication, such as Kerberos, LDAP, Ranger and so on");

public static final ConfigOption<String> HADOOP_CONF_DIR =
ConfigOptions.key("hadoop-conf-dir")
.stringType()
.noDefaultValue()
.withDescription(
"File directory of the core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml. Currently, only local file system paths are supported.");

private HiveCatalogOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public void testCustomMetastoreClient() throws Exception {
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'hive-conf-dir' = '"
+ hiveShell.getBaseDir().getRoot().getPath()
+ HIVE_CONF
+ "',",
" 'metastore.client.class' = '"
+ TestHiveMetaStoreClient.class.getName()
+ "'",
Expand All @@ -96,6 +100,10 @@ public void testCreateExistTableInHive() throws Exception {
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'hive-conf-dir' = '"
+ hiveShell.getBaseDir().getRoot().getPath()
+ HIVE_CONF
+ "',",
" 'metastore.client.class' = '"
+ CreateFailHiveMetaStoreClient.class.getName()
+ "'",
Expand Down Expand Up @@ -127,6 +135,10 @@ public void testAlterTableFailedInHive() throws Exception {
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'hive-conf-dir' = '"
+ hiveShell.getBaseDir().getRoot().getPath()
+ HIVE_CONF
+ "',",
" 'metastore.client.class' = '"
+ AlterFailHiveMetaStoreClient.class.getName()
+ "'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public void testCustomMetastoreClient() throws Exception {
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'hive-conf-dir' = '"
+ hiveShell.getBaseDir().getRoot().getPath()
+ HIVE_CONF
+ "',",
" 'metastore.client.class' = '"
+ TestHiveMetaStoreClient.class.getName()
+ "'",
Expand All @@ -96,6 +100,10 @@ public void testCreateExistTableInHive() throws Exception {
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'hive-conf-dir' = '"
+ hiveShell.getBaseDir().getRoot().getPath()
+ HIVE_CONF
+ "',",
" 'metastore.client.class' = '"
+ CreateFailHiveMetaStoreClient.class.getName()
+ "'",
Expand Down Expand Up @@ -127,6 +135,10 @@ public void testAlterTableFailedInHive() throws Exception {
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'hive-conf-dir' = '"
+ hiveShell.getBaseDir().getRoot().getPath()
+ HIVE_CONF
+ "',",
" 'metastore.client.class' = '"
+ AlterFailHiveMetaStoreClient.class.getName()
+ "'",
Expand Down
Loading