Skip to content
Harish Butani edited this page Nov 2, 2012 · 1 revision

The hive-rt branch

SQL supported

  • a tableSource in a Windowing Query can be:
    • a Hive Query (ie a SubQuery in HQL terms)
    • a Hive Table
    • or a PTF invocation
  • A PTF invocation is of the form:
funcName(tableSource
         [partition by ...
           [order by ...]
         ]
         funcArgs...
         )
  • partition by means same thing as distribute by in HQL.
  • order by really means sort by as used in HQL.
  • We don’t support cluster by syntactically but semantically we support this by making order by optional.
  • The input to a PTF can be a tableSource so PTFs can be chained.
    • But only the first tableSource in the chain can be a Query or table.
  • Each PTF may:
    • have a partition and order specified; but this is optional.
  • But the first tableSource must have a partition clause specified.

Window Clauses:

  • A SelectExpression may be over a window. The select expression must be a UDAF expression.
  • The query may have windows specified that can be referenced in the window clauses.

Expressions:

  • The expression part of Windowing grammar matches Hive grammar.
  • With couple of additions: lead and lag functions. These are usable in Select Expressions, UDAF argument expressions, and the Where clause.
  • Lead and Lag let you reference columns from surrounding rows in the partition relative to the current row.

Examples:

/* Eg 1 */
select  p_mfgr,p_name, p_size,
  count(*) as c,
  count(distinct p_size) as cd,
  p_size - lag(p_size,1) as deltaSz,
  first_value(p_size, true) over w1 as fv2,
  sum(p_size) over rows between unbounded preceding and current row as s1,
  sum(p_size) over range between p_size 5 less and current row as s2,
from part_demo
partition by p_mfgr
order by p_mfgr
window w1 as rows between 2 preceding and 2 following

/* NPath Eg */
select origin_city_name, fl_num, year, month, day_of_month, sz, tpath
from npath( 
  flights_tiny 
  partition by fl_num 
  order by year, month, day_of_month, 
  'ONTIME.LATE+', 
  'LATE', arr_delay > 15, 
  'EARLY', arr_delay < 0,
  'ONTIME', arr_delay >=0 and arr_delay <= 15,
  'origin_city_name, fl_num, year, month, day_of_month, size(tpath) as sz, tpath as tpath'
) 
  • Example 1 shows: Windowing clauses with Range based and Value based windows, the use of lag, windows defined globally at the Query level.
  • Example 2 shows the NPath table function.
  • More examples in the java/test and groovy/test areas in GitHub.

What’s missing

  • Queries only have this form: no joins, group bys, having.
    • So use Embedded Hive Query to have output of complex data flow into a PTF chain.
    • But can only write out PTF processing to a Table/Partition or hdfs location. Cannot flow into the next operation.
  • All windows in the Window clauses must have the same partition & Order specification.
  • Even if first Input is already partitioned/bucketed you still need to specify the partition specification.

The meaning of a Windowing Query.

  • A Windowing Query implies the processing of a PTF chain.
  • Each step may optionally specify how to partition the input.
  • The first step is required to specify how to partition the Input.
  • The windowing clauses are translated into the execution of a special WindowingTableFunction PTF, which is appended to the PTF chain.
  • So for e.g.
select .., udaf1() over w1, udaf2() over w1,...
from ptf3(
       ptf2(
             ptf1(tableSource 
                  p1 
                  o1, 
                  args..
                  )
             args...
             )
        p3
        o3
        args...
       )

=>

tableSource -> (p1, o1) -> ptf1 -> ptf2 -> (p3, o3) -> ptf3 -> windowptf
tableSource -> ptf1 -> (p1, o1) -> ptf1 -> ptf2 -> (p3, o3) -> ptf3 -> windowptf

  • $(p1,o1)$ implies that the data is partitioned and ordered before it is fed into the next function in the chain.
  • In addition a PTF may operate on the input data before it is partitioned and ordered. The second chain shows the flow if ptf1 was a map-side PTF. If a PTF operates on the data on the map-side it is allowed to reshape the data.

Physical Plan

  • A PTF chain needs to be broken up into a separate Job at each place where the input needs to be partitioned.
  • Note a PTF that has a map side must specify a partition clause. The semantics don’t make sense otherwise.

The Partition and PTF interface

  • A Partition is the unit that is exchanged between Functions in the PTF chain.
  • A Partition is a disk spillable collection of rows. It has:
    • An InputObjectInspector used when adding rows to it.
    • An Output SerDe, used to spill to disk and used when rows are obtained from it.
    • Provides a PartitionIterator, that is a java Iterator with the ability to do lead/lag relative to the current position.
  • A PTF has the following contract:
    • during translation it is given information about its Input OI and argument Expressions and asked for the OutputOI.
    • during execution it is given a Partition with a Serde matching the inputOI and asked to return an output Partition.
    • A PTF may optionally have map-side logic. In this case it must during translation provide the mapside OutputOI and during execution compute the mapside Partition from an input Partition.

Execution of a PTF chain as a Hive Operator Chain.

PTF Operator

  • we introduced a new Hive Operator: PTFOperator
  • this is the bridge from the Hive Operator execution to our PTF chain.
  • the PTF Operator behaves like the Group By Operator:
    • it collects rows with the same Key into a Partition and hands to the PTF chain.
    • In the close it handles the last Partition.

Hive Operator Chain

  • There are 2 chain patterns.
  • When there is no map side processing the chain is:
TableScan -> ReduceSink -> Extract -> PTFOp -> FileSink
  • When there is map side processing the chain is:
TableScan -> PTFOp -> ReduceSink -> Extract -> PTFOp -> FileSink
  • The overall chain is broken based on the rules listed above. This code has not been brought into the hive-rt branch yet.

Folding into HQL

  • Some thoughts:
    • bringing the grammar elements into Hive grammar seems straightforward:
      • introduce Query level window clauses
      • allow window clause in select expressions
      • introduce the concept of a PTF, allow a fromSource to be a PTF invocation.
    • a QueryStat that has a PTF as the formSource cannot contain joins.
      • this is not required, but makes it easier to think of translation. Use SubQuery style if joining is needed.
      • Innermost PTF in PTF chain must have partition specified.
        • Except if data is partition/bucketed. Need to think about what

    this means. Do we run PTF Chain on map-side?

    • Possible cases:
      • Query with PTF chain + Windowing clauses
      • Query with PTF + Windowing clauses + outer Partition/Group By
      • Query with Windowing clauses only.
    • Do we need the PTF Operator or can we extend the Group By Operator.

Clone this wiki locally