Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31239][hive] Fix native sum function can't get the corrected value when the argument type is string #22031

Closed
wants to merge 5 commits into from

Conversation

lsyldliu
Copy link
Contributor

What is the purpose of the change

Currently, for the following case:

tableEnv.executeSql("create table test_sum_dec(a int, x string, z decimal(10, 5))");
tableEnv.executeSql(
                "insert into test_sum_dec values (1, 'b', null), "
                        + "(1, 'b', 1.2), "
                        + "(2, 'b', null), "
                        + "(2, 'b', null),"
                        + "(4, '1', null),"
                        + "(4, 'b', null)")
        .await();

List<Row> result =
        CollectionUtil.iteratorToList(
                tableEnv.executeSql("select a, sum(x) from test_sum_dec group by a")
                        .collect());
assertThat(result.toString()).isEqualTo("[+I[1,null], +I[2, null], +I[4, 1.0]]");

The native sum function return [+I[1,null], +I[2, null], +I[4, 1.0]], but hive sum function return [+I[1,0.0], +I[2,0.0], +I[4, 1.0]]. The native function return result is not consistent with hive, this is a bug, so we should fix it.

Brief change log

  • Fix native sum function can't get the corrected value when the argument type is string
  • Add some notes in document about table.exec.hive.native-agg-function.enabled option can't turned on per job when using it via SqlClient

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests in HiveDialectAggITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicabled)

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 27, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, I left some comments

@@ -100,6 +100,7 @@ the option `table.exec.hive.native-agg-function.enabled`, which brings significa
</table>

<span class="label label-danger">Attention</span> The ability of the native aggregation functions doesn't fully align with Hive built-in aggregation functions now, for example, some data types are not supported. If performance is not a bottleneck, you don't need to turn on this option.
In addition, `table.exec.hive.native-agg-function.enabled` option can't be turned on per job when using it via SqlClient, currently, only the module level is supported. Users should turn on this option first and then load HiveModule. This issue will be fixed in [FLINK-31193](https://issues.apache.org/jira/browse/FLINK-31193).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need add the issue number in the doc.

};
}

@Override
public Expression getValueExpression() {
return sum;
return ifThenElse(isTrue(isEmpty), nullOf(getResultType()), sum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the hive behavior when input is empty?

btw, please add a test to cover this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive code as following:

@AggregationType(estimable = true)
    static class SumLongAgg extends SumAgg<Long> {
      @Override
      public int estimate() { return JavaDataModel.PRIMITIVES1 + JavaDataModel.PRIMITIVES2; }
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      SumLongAgg result = new SumLongAgg();
      reset(result);
      return result;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
      SumLongAgg myagg = (SumLongAgg) agg;
      myagg.empty = true;
      myagg.sum = 0L;
      myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
    }

    private boolean warned = false;

    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
      assert (parameters.length == 1);
      try {
        if (isEligibleValue((SumLongAgg) agg, parameters[0])) {
          ((SumLongAgg)agg).empty = false;
          ((SumLongAgg)agg).sum += PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI);
        }
      } catch (NumberFormatException e) {
        if (!warned) {
          warned = true;
          LOG.warn(getClass().getSimpleName() + " "
              + StringUtils.stringifyException(e));
        }
      }
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
      if (partial != null) {
        SumLongAgg myagg = (SumLongAgg) agg;
        myagg.empty = false;
        if (isWindowingDistinct()) {
          throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial");
        } else {
            myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
        }
      }
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      SumLongAgg myagg = (SumLongAgg) agg;
      if (myagg.empty) {
        return null;
      }
      result.set(myagg.sum);
      return result;
    }

It returns a null value if all elements are null.

@lsyldliu
Copy link
Contributor Author

lsyldliu commented Mar 3, 2023

@godfreyhe Thanks for your reviewing, I've addressed your comments. Could you help retain two commits when you merge?

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@godfreyhe godfreyhe closed this in 263555c Mar 3, 2023
godfreyhe pushed a commit that referenced this pull request Mar 3, 2023
…native-agg-function.enabled option can't turned on per job when using it via SqlClient

This closes #22031
godfreyhe pushed a commit that referenced this pull request Mar 3, 2023
…alue when the argument type is string

This closes #22031

(cherry picked from commit 263555c)
godfreyhe pushed a commit that referenced this pull request Mar 3, 2023
…native-agg-function.enabled option can't turned on per job when using it via SqlClient

This closes #22031

(cherry picked from commit 62a3b99)
lindong28 pushed a commit to lindong28/flink that referenced this pull request Mar 4, 2023
…alue when the argument type is string

This closes apache#22031

(cherry picked from commit 263555c)
lindong28 pushed a commit to lindong28/flink that referenced this pull request Mar 4, 2023
…native-agg-function.enabled option can't turned on per job when using it via SqlClient

This closes apache#22031

(cherry picked from commit 62a3b99)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants