Skip to content

Latest commit

 

History

History
463 lines (372 loc) · 18.5 KB

20230705_01.md

File metadata and controls

463 lines (372 loc) · 18.5 KB

PolarDB和PostgreSQL开源通过hdfs_fdw 访问hive|spark on hadoop

作者

digoal

日期

2023-07-05

标签

PostgreSQL , PolarDB , hive , spark , hadoop , hdfs , hdfs_fdw


背景

通过hdfs_fdw, PostgreSQL和PolarDB可以直接访问存储在hadoop中的数据, 并且支持pushdown查询条件, 使得在PostgreSQL和PolarDB中可以直接利用hive和spark来提升存储和计算能力.

省去了一道ETL和应用跨库拼装数据的工序, 简化开发成本.

1、什么是 Apache Hadoop?
https://hadoop.apache.org/

Apache™ Hadoop® 项目开发用于可靠、可扩展、分布式计算的开源软件。Apache Hadoop 软件库是一个框架,允许使用简单的编程模型跨计算机集群分布式处理大型数据集。它旨在从单个服务器扩展到数千台机器,每台机器都提供本地计算和存储。该库本身不是依靠硬件来提供高可用性,而是旨在检测和处理应用程序层的故障,以便在计算机集群之上提供高可用性服务,而每台计算机都可能容易出现故障。详细信息可以在这里找到(https://hadoop.apache.org/)。Hadoop 可以从此位置下载(http://hadoop.apache.org/releases.html),并可以按照给定的步骤进行安装在这里(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html)。

2、什么是 Apache Hive?
https://hive.apache.org/

Apache Hive™ 数据仓库软件有助于查询和管理分布式存储中的大型数据集。Hive 提供了一种将结构投影到该数据上并使用称为 HiveQL 的类 SQL 语言查询数据的机制。同时,当在 HiveQL 中表达此逻辑不方便或效率低下时,该语言还允许传统的 Map/Reduce 程序员插入自定义映射器和化简器。

Hive 有两个版本:HiveServer1 和 HiveServer2,可以从该站点下载(https://hive.apache.org/downloads.html)。FDW 仅支持 HiveServer2。

3、什么是 Apache Spark?
http://spark.apache.org/

Apache Spark™ 是一个通用分布式计算框架,支持多种用例。它提供实时流以及批处理,速度快、易于使用且分析复杂。Spark不提供存储层,它依赖于第三方存储提供商,如Hadoop、HBASE、Cassandra、S3等。Spark与Hadoop无缝集成,可以处理现有数据。Spark SQL 与 HiveQL 100% 兼容,可以使用 Spark Thrift Server 作为 HiveServer2 的替代品。

用法参考

https://github.com/EnterpriseDB/hdfs_fdw/blob/master/README.md

Authentication Support

The FDW supports NOSASL and LDAP authentication modes. In order to use
NOSASL do not specify any OPTIONS while creating user mapping. For LDAP
username and password must be specified in OPTIONS while creating user
mapping.

Usage

While creating the foreign server object for HDFS FDW the following can
be specified in options:

  • host: IP Address or hostname of the Hive Thrift Server OR Spark
    Thrift Server. Defaults to localhost.
  • port: Port number of the Hive Thrift Server OR Spark Thrift
    Server. Defaults to 10000.
  • client_type: hiveserver2 or spark. Hive and Spark both support
    HiveQL and are compatible but there are few differences like the
    behaviour of ANALYZE command and connection string for the NOSASL case.
    Default is hiveserver2.
  • auth_type: NOSASL or LDAP. Specify which authentication type
    is required while connecting to the Hive or Spark server. Default is
    unspecified and the FDW uses the username option in the user mapping to
    infer the auth_type. If the username is empty or not specified it uses
    NOSASL.
  • connect_timeout: Connection timeout, default value is 300 seconds.
  • query_timeout: Query timeout is not supported by the Hive JDBC
    driver.
  • fetch_size: A user-specified value that is provided as a parameter
    to the JDBC API setFetchSize. The default value is 10000.
  • log_remote_sql: If true, logging will include SQL commands
    executed on the remote hive server and the number of times that a scan
    is repeated. The default is false.
  • use_remote_estimate: Include the use_remote_estimate to instruct
    the server to use EXPLAIN commands on the remote server when estimating
    processing costs. By default, use_remote_estimate is false, and remote
    tables are assumed to have 1000 rows.
  • enable_join_pushdown: If true, pushes the join between two foreign
    tables from the same foreign server, instead of fetching all the rows
    for both the tables and performing a join locally. This option can also
    be set for an individual table, and if any of the tables involved in the
    join has set it to false then the join will not be pushed down. The
    table-level value of the option takes precedence over the server-level
    option value. Default is true.
  • enable_aggregate_pushdown: If true, pushes aggregate operations to the
    foreign server instead of performing them locally. This option can also
    be set for an individual table, and if any of the tables involved in the
    query has set it to false then the operations will not be pushed down. The
    table-level value of the option takes precedence over the server-level
    option value. Default is true.
  • enable_order_by_pushdown: If true, pushes the ORDER BY clause to the
    foreign server instead of performing a sort locally. This option can also
    be set for an individual table, and if any of the tables involved in the
    query has set it to false then the ORDER BY will not be pushed down. The
    table-level value of the option takes precedence over the server-level
    option value. Default is true.

When creating user mapping following options can be provided:

  • username: The name of the user for authentication on the Hive server.
  • password: The password of the user for authentication on the Hive
    server.

HDFS can be used through either Hive or Spark. In this case both Hive
and Spark store metadata in the configured metastore. In the metastore
databases and tables can be created using HiveQL. While creating foreign
table object for the foreign server the following can be specified in
options:

  • dbname: Name of the metastore database to query. Default is
    'default'.
  • table_name: Name of the metastore table. Default is the same as
    foreign table name.
  • enable_join_pushdown: Similar to the server-level option, but can be
    configured at table level as well. Default is true.
  • enable_aggregate_pushdown: Similar to the server-level option, but can
    be configured at table level as well. Default is true.
  • enable_order_by_pushdown: Similar to the server-level option, but can
    be configured at table level as well. Default is true.

GUC variables:

  • hdfs_fdw.enable_join_pushdown: If true, pushes the join between two
    foreign tables from the same foreign server, instead of fetching all the
    rows for both the tables and performing a join locally. Default is true.

  • hdfs_fdw.enable_aggregate_pushdown: If true, pushes aggregate
    operations to the foreign server, instead of fetching rows from the
    foreign server and performing the operations locally. Default is true.

  • hdfs_fdw.enable_order_by_pushdown: If true, pushes the order by
    operation to the foreign server, instead of fetching rows from the
    foreign server and performing the sort locally. Default is false.

Using HDFS FDW with Apache Hive on top of Hadoop

Step 1: Download [weblogs_parse][8] and follow instructions from this
[site][9].

Step 2: Upload weblog_parse.txt file using these commands:

hadoop fs -mkdir /weblogs  
hadoop fs -mkdir /weblogs/parse  
hadoop fs -put weblogs_parse.txt /weblogs/parse/part-00000  

Step 3: Start HiveServer if not already running using following command:

$HIVE_HOME/bin/hiveserver2  

or

$HIVE_HOME/bin/hive --service hiveserver2  

Step 4: Connect to HiveServer2 using hive beeline client.

e.g.

$ beeline  
Beeline version 1.0.1 by Apache Hive  
beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl  

Step 5: Create Table in Hive

CREATE TABLE weblogs  
	(  
		client_ip           STRING,  
		full_request_date   STRING,  
		day                 STRING,  
		month               STRING,  
		month_num           INT,  
		year                STRING,  
		hour                STRING,  
		minute              STRING,  
		second              STRING,  
		timezone            STRING,  
		http_verb           STRING,  
		uri                 STRING,  
		http_status_code    STRING,  
		bytes_returned      STRING,  
		referrer            STRING,  
		user_agent          STRING  
	)  
	row format delimited  
	fields terminated by '\t';  

Step 6: Load data in weblogs table:

hadoop fs -cp /weblogs/parse/part-00000 /user/hive/warehouse/weblogs/  

Step 7: Access data from PostgreSQL:

Now we are ready to use the the weblog table in PostgreSQL, we need to
follow these steps once we are connected using psql:

-- set the GUC variables appropriately, e.g. :  
hdfs_fdw.jvmpath='/home/edb/Projects/hadoop_fdw/jdk1.8.0_111/jre/lib/amd64/server/'  
hdfs_fdw.classpath='/usr/local/edbas/lib/postgresql/HiveJdbcClient-1.0.jar:  
                    /home/edb/Projects/hadoop_fdw/hadoop/share/hadoop/common/hadoop-common-2.6.4.jar:  
                    /home/edb/Projects/hadoop_fdw/apache-hive-1.0.1-bin/lib/hive-jdbc-1.0.1-standalone.jar'  
  
-- load extension first time after install  
CREATE EXTENSION hdfs_fdw;  
  
-- create server object  
CREATE SERVER hdfs_server  
	FOREIGN DATA WRAPPER hdfs_fdw  
	OPTIONS (host '127.0.0.1');  
  
-- create user mapping  
CREATE USER MAPPING FOR postgres  
	SERVER hdfs_server OPTIONS (username 'hive_username', password 'hive_password');  
  
-- create foreign table  
CREATE FOREIGN TABLE weblogs  
	(  
		client_ip                TEXT,  
		full_request_date        TEXT,  
		day                      TEXT,  
		month                    TEXT,  
		month_num                INTEGER,  
		year                     TEXT,  
		hour                     TEXT,  
		minute                   TEXT,  
		second                   TEXT,  
		timezone                 TEXT,  
		http_verb                TEXT,  
		uri                      TEXT,  
		http_status_code         TEXT,  
		bytes_returned           TEXT,  
		referrer                 TEXT,  
		user_agent               TEXT  
	)  
	SERVER hdfs_server  
	OPTIONS (dbname 'default', table_name 'weblogs');  
  
-- select from table  
SELECT DISTINCT client_ip IP, count(*)  
	FROM weblogs GROUP BY IP HAVING count(*) > 5000 ORDER BY 1;  
       ip        | count   
-----------------+-------  
 13.53.52.13     |  5494  
 14.323.74.653   | 16194  
 322.6.648.325   | 13242  
 325.87.75.336   |  6500  
 325.87.75.36    |  6498  
 361.631.17.30   | 64979  
 363.652.18.65   | 10561  
 683.615.622.618 | 13505  
(8 rows)  
  
-- EXPLAIN output showing WHERE clause being pushed down to remote server.  
EXPLAIN (VERBOSE, COSTS OFF)  
	SELECT client_ip, full_request_date, uri FROM weblogs  
	WHERE http_status_code = 200;  
                                                   QUERY PLAN                                                     
----------------------------------------------------------------------------------------------------------------  
 Foreign Scan on public.weblogs  
   Output: client_ip, full_request_date, uri  
   Remote SQL: SELECT client_ip, full_request_date, uri FROM default.weblogs WHERE ((http_status_code = '200'))  
(3 rows)  

Using HDFS FDW with Apache Spark on top of Hadoop

Step 1: Download & install Apache Spark in local mode.

Step 2: In the folder $SPARK_HOME/conf create a file
spark-defaults.conf containing the following line

spark.sql.warehouse.dir hdfs://localhost:9000/user/hive/warehouse  

By default spark uses derby for both meta data and the data itself
(called warehouse in spark). In order to have spark use hadoop as
warehouse we have to add this property.

Step 3: Start Spark Thrift Server

./start-thriftserver.sh  

Step 4: Make sure Spark thrift server is running using log file

Step 5: Create a local file names.txt with below data:

$ cat /tmp/names.txt  
1,abcd  
2,pqrs  
3,wxyz  
4,a_b_c  
5,p_q_r  
,  

Step 6: Connect to Spark Thrift Server2 using spark beeline client.

e.g.

$ beeline  
Beeline version 1.2.1.spark2 by Apache Hive  
beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl org.apache.hive.jdbc.HiveDriver  

Step 7: Getting the sample data ready on spark:

Run the following commands in beeline command line tool:-

./beeline  
Beeline version 1.2.1.spark2 by Apache Hive  
beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl org.apache.hive.jdbc.HiveDriver  
Connecting to jdbc:hive2://localhost:10000/default;auth=noSasl  
Enter password for jdbc:hive2://localhost:10000/default;auth=noSasl:   
Connected to: Spark SQL (version 2.1.1)  
Driver: Hive JDBC (version 1.2.1.spark2)  
Transaction isolation: TRANSACTION_REPEATABLE_READ  
0: jdbc:hive2://localhost:10000> create database my_test_db;  
+---------+--+  
| Result  |  
+---------+--+  
+---------+--+  
No rows selected (0.379 seconds)  
0: jdbc:hive2://localhost:10000> use my_test_db;  
+---------+--+  
| Result  |  
+---------+--+  
+---------+--+  
No rows selected (0.03 seconds)  
0: jdbc:hive2://localhost:10000> create table my_names_tab(a int, name string)  
                                 row format delimited fields terminated by ' ';  
+---------+--+  
| Result  |  
+---------+--+  
+---------+--+  
No rows selected (0.11 seconds)  
0: jdbc:hive2://localhost:10000>  
  
0: jdbc:hive2://localhost:10000> load data local inpath '/tmp/names.txt'  
                                 into table my_names_tab;  
+---------+--+  
| Result  |  
+---------+--+  
+---------+--+  
No rows selected (0.33 seconds)  
0: jdbc:hive2://localhost:10000> select * from my_names_tab;  
+-------+---------+--+  
|   a   |  name   |  
+-------+---------+--+  
| 1     | abcd    |  
| 2     | pqrs    |  
| 3     | wxyz    |  
| 4     | a_b_c   |  
| 5     | p_q_r   |  
| NULL  | NULL    |  
+-------+---------+--+  

Here are the corresponding files in hadoop:

$ hadoop fs -ls /user/hive/warehouse/  
Found 1 items  
drwxrwxrwx   - org.apache.hive.jdbc.HiveDriver supergroup          0 2020-06-12 17:03 /user/hive/warehouse/my_test_db.db  
  
$ hadoop fs -ls /user/hive/warehouse/my_test_db.db/  
Found 1 items  
drwxrwxrwx   - org.apache.hive.jdbc.HiveDriver supergroup          0 2020-06-12 17:03 /user/hive/warehouse/my_test_db.db/my_names_tab  

Step 8: Access data from PostgreSQL:

Connect to Postgres using psql:

-- set the GUC variables appropriately, e.g. :  
hdfs_fdw.jvmpath='/home/edb/Projects/hadoop_fdw/jdk1.8.0_111/jre/lib/amd64/server/'  
hdfs_fdw.classpath='/usr/local/edbas/lib/postgresql/HiveJdbcClient-1.0.jar:  
                    /home/edb/Projects/hadoop_fdw/hadoop/share/hadoop/common/hadoop-common-2.6.4.jar:  
                    /home/edb/Projects/hadoop_fdw/apache-hive-1.0.1-bin/lib/hive-jdbc-1.0.1-standalone.jar'  
  
-- load extension first time after install  
CREATE EXTENSION hdfs_fdw;  
  
-- create server object  
CREATE SERVER hdfs_server  
	FOREIGN DATA WRAPPER hdfs_fdw  
	OPTIONS (host '127.0.0.1', port '10000', client_type 'spark', auth_type 'NOSASL');  
  
-- create user mapping  
CREATE USER MAPPING FOR postgres  
	SERVER hdfs_server OPTIONS (username 'spark_username', password 'spark_password');  
  
-- create foreign table  
CREATE FOREIGN TABLE f_names_tab( a int, name varchar(255)) SERVER hdfs_svr  
	OPTIONS (dbname 'testdb', table_name 'my_names_tab');  
  
-- select the data from foreign server  
SELECT * FROM f_names_tab;  
 a |  name   
---+--------  
 1 | abcd  
 2 | pqrs  
 3 | wxyz  
 4 | a_b_c  
 5 | p_q_r  
 0 |  
(6 rows)  
  
-- EXPLAIN output showing WHERE clause being pushed down to remote server.  
EXPLAIN (verbose, costs off)  
	SELECT name FROM f_names_tab  
	WHERE a > 3;  
                                QUERY PLAN                                  
--------------------------------------------------------------------------  
 Foreign Scan on public.f_names_tab  
   Output: name  
   Remote SQL: SELECT name FROM my_test_db.my_names_tab WHERE ((a > '3'))  
(3 rows)  

Please note that we are using the same port while creating foreign
server because Spark Thrift Server is compatible with Hive Thrift
Server. Applications using Hiveserver2 would work with Spark except
for the behaviour of ANALYZE command and the connection string in case
of NOSASL. It is better to use ALTER SERVER and change the client_type
option if Hive is to be replaced with Spark.

digoal's wechat