Skip to content

Latest commit

 

History

History
510 lines (387 loc) · 23.5 KB

9.md

File metadata and controls

510 lines (387 loc) · 23.5 KB

九、使用 HiveQL 的查询

Hive 的最大驱动因素是 HiveQL 的广泛功能,以及它对于任何有 SQL 经验的人来说都很容易采用。识别、加载和转换数据的复杂性可以在开发或运营团队中隔离开来,这将使分析师可以使用熟悉的语法自由地查询大量数据。

HiveQL 随着 Hive 的新版本不断扩展,该语言甚至已经集成到 Apache Spark 中,因此内存中的大数据工作负载也可以基于 HiveQL。

到目前为止,我们看到的语言语句与它们的 SQL 语句基本相似,对于更高级的 HiveQL 特性也是如此。我们将在本章中介绍这些功能,以及 Hive 提供的一些功能和在 Hive 中合并自定义功能的机制。

HiveQL 支持内部、外部、交叉和半连接。但是,联接并不像在 SQL 数据库中那样得到充分的支持,因为 Hive 只支持比较相等值的联接——您不能基于 Hive 中具有不同值(x <> y)的列来联接表。

基本连接使用标准的 SQL 语法——代码清单 93 连接服务器和 server _ logs 表,并返回第一个日志条目。

93:内部连接

  > select
  s.name, s.ipaddresses[0], l.loggedat, l.loglevel from server_logs l join
  servers s on l.serverid = s.name limit 1;
  +---------+--------------+-------------+-------------+--+
  | s.name 
  |     _c1      | l.loggedat  | l.loglevel  |
  +---------+--------------+-------------+-------------+--+
  | SCSVR1  |
  192.168.2.1  | 1439546226  | W           |
  +---------+--------------+-------------+-------------+--+
  > select
  s.name, s.ipaddresses[0], l.loggedat, l.loglevel from server_logs l, servers
  s where l.serverid = s.name limit 1;
  +---------+--------------+-------------+-------------+--+
  | s.name 
  |     _c1      | l.loggedat  | l.loglevel  |
  +---------+--------------+-------------+-------------+--+
  | SCSVR1  |
  192.168.2.1  | 1439546226  | W           |
  +---------+--------------+-------------+-------------+--+

两个查询返回相同的结果,因为显式语法(join … on)可以与隐式语法(其中表是命名的,联接规范在 where 子句中)互换。

类似地,外部连接的指定方式与 SQL 数据库相同,对输出也有相同的影响——代码清单 94 返回从未记录过任何日志的所有服务器。

94:左外部连接

  > select
  s.name, s.site["dc"] from servers s left outer join server_logs l
  on s.name = l.serverid where l.serverid is null; 
  +---------+---------+--+
  | s.name  |  
  _c1   |
  +---------+---------+--+
  | SCSVR2  |
  london  |
  | SCSVR3  |
  dublin  |
  +---------+---------+--+

交叉连接语句返回表的笛卡尔乘积,就像在 SQL 中一样,只有左半连接是不常见的。大多数 SQL 数据库都支持这种连接,但是它们不使用显式子句。这种连接相当于获取一个表中的所有行,而另一个表中存在匹配的列值。

在 SQL 数据库中,这通常在 where exists 子句中完成,但是 HiveQL 有一个显式的连接类型。代码清单 95 显示了它的外观,因为它只返回那些记录了日志的服务器。

95:左半连接

  >
  select s.name, s.site["dc"] from servers s left semi join
  server_logs l on s.name = l.serverid; 
  +---------+---------+--+
  | s.name  |  
  c1    |
  +---------+---------+--+
  | SCSVR1  |
  london  |
  +---------+---------+--+

只要您的连接有效,您就可以连接任何数据库对象,无论它们使用什么存储引擎。到目前为止,示例已经将外部表服务器与内部表服务器日志结合在一起。代码清单 96 连接了一个内部表(all_devices)、一个 JSON 格式的外部 HDFS 表(devices)和一个外部 HBase 表的视图(device_events_period)。

96:连接配置单元和 HBase 表

  > select
  de.period, de.eventname, d.device.deviceclass from device_events_period de
  join all_devices ad on ad.deviceid = de.deviceid join devices d on
  ad.deviceclass = d.device.deviceclass where de.period like '201601%';
  +------------+---------------+--------------+--+
  | de.period 
  | de.eventname  | deviceclass  |
  +------------+---------------+--------------+--+
  | 20160128  
  | power.off     | tablet       |
  +------------+---------------+--------------+--+

与任何 SQL 数据库一样,连接大型表会影响性能。Hive 尽可能优化连接。随着 0.11 版本的发布,Hive 查询引擎在连接优化方面变得越来越复杂,尽管 HiveQL 支持显式优化的查询提示,但通常并不需要。

例如,如果在映射任务中将整个表加载到内存中,Hive 可以更有效地连接到小表。这可以在带有 mapjoin 提示的查询中显式请求,但是如果设置 hive.auto.convert.join 为 true,则 Hive 可以自动请求。

带有 union 条款的连接结果将是我们将讨论的最后一个连接。这让我们可以用相同的列结构组合两个结果集。当前版本的 Hive (1.2.1)支持 union all(包括重复行)和 union distinct(省略重复行)。

代码清单 97 显示了两个 union 子句的结果,还演示了一个带有子查询(union)的外部查询(count),表明子查询必须在 HiveQL 中命名。

97:联合所有和联合不同

  >
  select count(1) from (select * from devices a union all select * from devices
  b) s;
  +-----+--+
  | c0  |
  +-----+--+
  | 4   |
  +-----+--+
  >
  select count(1) from (select * from devices a union distinct select * from
  devices b) s;
  +-----+--+
  | c0  |
  +-----+--+
  | 2   |
  +-----+--+

Hive 支持按一列或多列对结果进行分组的基本聚合,以及更高级的窗口函数。

Hive 中的基本聚合是通过 group by 子句完成的,该子句定义了一个或多个要聚合的列,并支持标准的 SQL 聚合函数,如 sum、avg、count、min 和 max。在同一个查询中,可以对不同的列使用几个函数。

在代码清单 98 中,我们按照生成 syslog 条目的进程对它们进行分组,只选择条目超过 1,000 个的进程,并显示进程名称、条目数量以及进程记录的最大消息的大小。

98:按聚合分组

  > select
  process, count(process) as entryCount, max(length(message)) largestMessage
  from syslogs group by process having count(process) > 1000 order by
  largestMessage desc;
  ...
  +-----------------+-------------+-----------------+--+
  |    
  process     | entrycount  | largestmessage  |
  +-----------------+-------------+-----------------+--+
  |
  NetworkManager  | 1863        | 201             |
  |
  kernel          | 7444        | 191             |
  |
  thermald        | 2224        | 136             |
  |
  systemd         | 1232        | 93              |
  +-----------------+-------------+-----------------+--+

| | 提示:不能在查询的 order by 子句中包含 count 和 max 之类的函数,但是如果在 select 子句中对这些函数的结果进行别名化,则可以按别名进行排序。在这个例子中,我们使用了 order by largestMessage,但是如果我们尝试 order by max(长度(消息)),我们会从 Hive 编译器得到一个错误。 |

与在 SQL 中一样,除非列在 group 子句中或者是分组集的聚合函数,否则不能通过查询将列包括在分组中。SQL:2003 规范定义了窗口函数,允许您在单个查询中聚合数据集的多个分区。

| | 注意:Hive 支持 SQL:2003 窗口,但是要注意术语冲突——窗口函数中的分区与 Hive 的表分区无关;它们是同名的独立功能。 |

使用分析函数,您可以在同一个查询中获得行级数据和聚合。代码清单 99 显示了对 syslogs 的查询,它告诉我们哪些进程和进程 id 的日志条目包含单词“CRON”(带有缩写结果)。

99:分区查询

  > select
  date_format(loggedat, 'HH:mm'), process, message, count() over(partition by
  message order by loggedat) from syslogs where upper(message) like '%CRON%';
  …
  +--------+----------+------------------------------------------------------
  |   c0   |
  process  |                                          
  message                                           | c3  |
  +--------+----------+---------------------------------------------------------------------------------------------+-----+--+
  | 19:53  |
  cron     | (CRON) INFO (Running @reboot
  jobs)                                                          | 5   |
  ...
  | 19:53  |
  anacron  | Job `cron.daily' started                                                                   
  | 1   |
  | 19:53  |
  anacron  | Job `cron.daily'
  terminated                                                                 |
  2   |
  | 19:52  |
  anacron  | Job `cron.daily' terminated                                                                 |
  2   |

这里我们从一个简单的查询中得到复杂的结果。查询选择消息文本和关于日志条目的信息,按照消息文本进行划分,并在该级别进行计数。结果显示了与按文本分组和计数相同的聚合,但它们包括行级别的详细信息——我们可以看到“anacron”处理了同一条消息两次,并且可以看到它被记录的不同时间。

分析分区可以出现在多个列上,这是一种有趣的数据呈现方式。在 over 子句中为上述查询添加 order by,可以让我们按时间顺序查看结果,同时查看到目前为止已经记录了多少条相同的消息,如代码清单 100 所示。

100:分区和有序查询

  > select
  date_format(loggedat, 'HH:mm:ss'), process, message, count() over(partition
  by message order by loggedat) from syslogs where upper(message) like
  '%CRON%REBOOT%';
  ...
  +-----------+----------+-------------------------------------+-----+--+
  |    c0     |
  process  |               message               | c3  |
  +-----------+----------+-------------------------------------+-----+--+
  | 19:52:21  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 1   |
  | 19:52:43  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 2   |
  | 19:53:21  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 3   |
  | 19:53:32  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 4   |
  | 19:54:09  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 5   |
  +-----------+----------+-------------------------------------+-----+--+

按消息划分和按时间戳排序为我们提供了有关消息何时被记录的增量视图。我们可以使用窗口函数更进一步。

窗口函数使您可以生成一个结果集,其中一行的值与当前行之前或之后的行中的值进行比较。您可以为窗口显式设置范围,也可以使用隐式定义窗口的函数(通常默认为当前行两侧的单行)。

您可以在同一个查询中组合标量值、行级聚合和窗口函数。代码清单 101 重复了前面的查询,但是对于每一行,它都指定了从这一行到前一行和后一行的时间距离。

101:带滞后和超前的窗口

  > select
  date_format(loggedat, 'HH:mm:ss'), process, message, count() over(partition
  by message order by loggedat), lag(loggedat) over(partition by message order
  by loggedat) - loggedat, lead(loggedat) over(partition by message order by
  loggedat) - loggedat from syslogs where upper(message) like '%CRON%REBOOT%';
  ...
  +-----------+----------+-------------------------------------+-----+-------
  |    c0     |
  process  |               message               | c3  |           c4          
  |          c5           |
  +-----------+----------+-------------------------------------+-----+-------
  | 19:52:21  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 1   | NULL                  
  | 0 00:00:21.455000000  |
  | 19:52:43  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 2   | -0 00:00:21.455000000 
  | 0 00:00:37.773000000  |
  | 19:53:21  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 3   | -0 00:00:37.773000000 
  | 0 00:00:10.958000000  |
  | 19:53:32  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 4   | -0 00:00:10.958000000 
  | 0 00:00:37.695000000  |
  | 19:54:09  |
  cron     | (CRON) INFO (Running @reboot jobs)  | 5   | -0 00:00:37.695000000 
  | NULL                  |
  +-----------+----------+-------------------------------------+-----+-------

这里,lag 函数获取前一行的 loggedAt 值(第一行为空),lead 获取下一行的 loggedAt 值。减去当前行的 loggedAt 值,得到结果集中的时间距离。

开窗函数是获得强大结果的一种相对简单的方法,例如查找值在一段时间内的变化,识别趋势,以及计算百分位数和等级。

我们已经看到了许多 HiveQL 内置函数的例子,它们通常都有很好的名称和语法,所以它们无需太多介绍。Hive 有一套随运行时一起提供的内置函数,这意味着无论使用哪个平台,它们都是相同的。

内置函数的语言手册是一份综合参考资料,按照适用的数据类型列出了所有可用的函数,并包括引入这些函数的版本。

Hive 包括 150 多个内置功能;这里我将介绍一些最有用的。

ETL 和 ELT 过程几乎总是包含日期转换,Hive 很好地支持高保真 TIMESTAMP 类型和其他常见的日期字符串或数字表示之间的转换。

在代码清单 102 中,我们获取当前的 UNIX 时间戳(以秒为单位,使用系统时钟),并在字符串和长整型日期值之间进行转换。

102:日期转换功能

  > select
  unix_timestamp() from_clock, unix_timestamp('2016-02-07 21:00:00')
  from_string, from_unixtime(1454878996L) from_long;
  +-------------+--------------+----------------------+--+
  | from_clock 
  | from_string  |      from_long       |
  +-------------+--------------+----------------------+--+
  | 1454879183 
  | 1454878800   | 2016-02-07 21:03:16  |
  +-------------+--------------+----------------------+--+

一旦我们有了时间戳,我们就可以提取它的一部分,添加或减去其他日期,或者得到日期之间的天数,如代码清单 103 所示。

103:日期操作功能

  > select weekofyear(to_date(current_timestamp)) week,
  date_add('2016-02-07 21:00:00', 10) addition, datediff('2016-02-07',
  '2016-01-31') difference;
  +-------+-------------+-------------+--+
  | week  | 
  addition   | difference  |
  +-------+-------------+-------------+--+
  | 5     |
  2016-02-17  | 7           |
  +-------+-------------+-------------+--+

Hive 包含了在字符串中查找文本(instr)、拆分字符串(substr)和连接字符串(concat)的所有常用函数。它还包括一些有用的重载函数,这些函数允许用一条语句执行常见任务,如代码清单 104 所示,它操作一个 URL。

104:字符串操作功能

  > select concat_ws('.', 'blog', 'sixeyed', 'com') as blog,
  parse_url('https://blog.sixeyed.com', 'PROTOCOL') as protocol; 
  +-------------------+-----------+--+
  |      
  blog        | protocol  |
  +-------------------+-----------+--+
  |
  blog.sixeyed.com  | https     |
  +-------------------+-----------+--+

用于丰富语义分析的字符串函数(ngrams 和 context-ngrams 为句子中的词频提供文本分析)和标准语言处理函数也存在。代码清单 105 显示了两个单词之间的距离(使用常见的 Levenshtein 相似性度量)和一个单词的语音表示。

105:语言处理功能

  > select levenshtein('hive', 'hbase'), soundex('hive');
  +------+-------+--+
  | _c0  | 
  _c1  |
  +------+-------+--+
  | 3    |
  H100  |
  +------+-------+--+

与字符串一样,Hive 支持所有常见的数学函数(圆、地板、abs),以及更不寻常的函数——如三角函数(sin、cos、tan)。具有广泛的内置数学函数使得简单查询的复杂分析成为可能。

代码清单 106 显示了一些有用的数学函数。

106:数学函数

  > select pmod(14, 3) modulus, sqrt(91) root, factorial(6) factorial;
  +----------+--------------------+------------+--+
  | modulus 
  |        root        | factorial  |
  +----------+--------------------+------------+--+
  | 2        |
  9.539392014169456  | 720        |
  +----------+--------------------+------------+--+

通过支持集合数据类型,Hive 提供了处理数组和映射的功能。只提供了少量功能,但它们涵盖了您可能需要的所有功能。

代码清单 107 展示了如何从数组中提取一个值并对数组进行排序,然后将它们组合起来找到最大的值(数组函数用于定义文字数组)。

107:收集功能

  > select array(26, 54, 43)[0], sort_array(array(26, 54, 43)),
  sort_array(array(26, 54, 43))[size(array(26, 54, 43))-1];
  +------+-------------+------+--+
  | _c0  |    
  _c1     | _c2  |
  +------+-------------+------+--+
  | 26   |
  [26,43,54]  | 54   |
  +------+-------------+------+--+

类似的函数可以用于映射列类型,这样您也可以将键集或值集提取为数组。适合这两种类型的一个关键函数是 explode,它从一个集合中生成一个表,如代码清单 108 所示。

108:分解收集数据

  > select explode(split('Hive Succinctly', ' '));
  +-------------+--+
  |     col    
  |
  +-------------+--+
  | Hive       
  |
  | Succinctly 
  |
  +-------------+--+

使用 explode,您可以从单个列值中提取多行,并将其用于与其他表连接。

标准的 SQL 函数在 Hive 中是可用的,例如在类型之间转换(强制转换)、检查是否有空值(isnull、isnotnull)以及进行比较(if)。从一个范围返回一个值的函数也是标准的 SQL,如代码清单 109 所示。

109:在值之间选择

  > select nvl(NULL, 'default') `nvl`, coalesce('first',
  'second', NULL) `coalesce`, case true when cast(1 as boolean) then 'expected'
  else 'unexpected' end `case`;
  +----------+-----------+-----------+--+
  |   nvl    |
  coalesce  |   case    |
  +----------+-----------+-----------+--+
  | default  |
  first     | expected  |
  +----------+-----------+-----------+--+

更不寻常的函数偶尔也会有用,如代码清单 110 所示。

110:其他功能

  > select pi() `pi`, current_user() `whoami`,
  hash('@eltonstoneman') `hash`;
  +--------------------+---------+-------------+--+
  |        
  pi         | whoami  |    hash     |
  +--------------------+---------+-------------+--+
  |
  3.141592653589793  | root    | 1326977505  |
  +--------------------+---------+-------------+--+

Hive 继续通过 AES 加密、SHA 和 MD5 哈希以及 Hive 2.0.0 中提供的 CRC32 计算向内置集中添加函数。

您可以用自己的用户定义函数来扩展 Hive。这是一种将常用逻辑封装在 HiveQL 之外的语言中的简洁方式。本机语言是 Java,但是 Hive 支持 Hadoop 流,因此您可以用任何语言编写可以通过操作系统命令行调用的函数。

这意味着您可以用自己喜欢的语言编写函数,并通过单元测试和版本控制来确保定制组件的质量。如果您可以在从标准输入读取和向标准输出写入的命令行中包装函数,您也可以使用现有的代码库。

一个 UDF 包含两个方面——使这个库对 Hive 运行时可用,以及注册这个函数以便在 Hive 查询中使用它。

Java UDFs 是最简单的操作。您编写一个扩展 org . Apache . Hadoop . hive . QL . exec . UDF 的类,然后构建一个 JAR 并将其复制到 Hive 的辅助文件夹(用 HIVE_AUX_JARS_PATH 设置指定)。接下来,使用创建临时函数[别名]将该类注册为[UDF _ 类名]。这样,您可以使用 UDF 别名调用该函数。

流媒体控制台应用更复杂一些。我们将使用 Python,一个简单的将增值税加到整数的应用,如代码清单 111 所示。

111:一个简单的 Python 脚本

  #!/usr/bin/python
  import sys
  for line in
  sys.stdin:
    line =
  line.strip()
    print
  int(line) * 1.2

接下来我们需要复制。使用代码清单 112 中的添加文件命令将文件复制到 Hive。

112:将 Python 脚本添加到配置单元

  > add file /tmp/add_vat_udf.py;
  INFO  : Added
  resources: [/tmp/add_vat_udf.py]

现在我们可以使用 transform … using 子句访问 UDF,该子句将为 Hive 查询的 ResultSet 中的每一行调用一次命令。代码清单 113 显示了被调用的 UDF。

113:在 HiveQL 中调用 Python 脚本

  > select transform(input) using 'python add_vat_udf.py' as
  vat_added from (select explode(array(10, 120, 1400)) as input) a;
  +------------+--+
  | vat_added 
  |
  +------------+--+
  | 12.0      
  |
  | 144.0     
  |
  | 1680.0    
  |

| | 注意:不能包含结果集中的列和转换后的列,最终结果必须完全来自转换。因此,如果我想在查询中包含原始净值,我不能在转换前将其添加到 select 语句中,我需要在 Python 脚本中写出输入,并将其与输出分开。 |

对 SQL 的熟悉,加上复杂的分析函数和自己编写函数的能力,使 HiveQL 成为一种强大且可扩展的查询语言。

在大多数情况下,您可以对任何数据源运行完全相同的查询,无论是内部 Hive 表、包含数千个 TSV 文件的 HDFS 文件夹结构,还是包含数十亿行的 HBase 表。

Hive 编译器生成它能生成的最有效的映射/缩减作业集,以便以一种可以在 Hadoop 上执行的格式来表示 HiveQL 查询。呈现一个可以在幕后触发数百个计算小时而无需进一步用户交互的简单界面,使 Hive 成为大数据堆栈中一个有吸引力的组件。

下一步

我们在这本小书里谈了很多,但还有很多要学的。为了简洁地说明 Hive,我将重点放在了语言和运行时的功能部分,这意味着我甚至没有涉及性能、查询优化或 Hive 在其他客户端上的使用。如果你想了解更多关于 Hive 的知识,这些是显而易见的下一步。

简洁的 Docker 图像是一个很好的开始。它是在 Hadoop 以伪分布式模式运行的情况下设置的,并针对 SHART 进行了配置,这意味着它适用于测试长时间运行的查询。您可以使用 Docker 的复制命令将您自己的数据放到容器中,然后使用我们在第 6 章使用 Hive 的 ETL 中看到的工具将其加载到 Hive 中。

之后,如果您想在生产环境中试用 Hive,看看它如何以更大的规模处理您的数据,您可以轻松地启动一个在云中运行 Hive 的 Hadoop 集群。亚马逊网络服务中的弹性地图缩减平台和微软 Azure 中的 HDInsight 平台都支持 Hive,两者都可以让您在短时间内启动并运行。