Skip to content

Commit

Permalink
Fix TDIGEST.QUANTILE TDIGEST.MERGE TDIGEST.CDF
Browse files Browse the repository at this point in the history
  • Loading branch information
dengliming committed Oct 4, 2022
1 parent 76389f4 commit 430d75a
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 dengliming.
* Copyright 2020-2022 dengliming.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,12 @@ public static void notEmpty(int[] array, String message) {
}
}

public static void notEmpty(double[] array, String message) {
if (array == null || array.length == 0) {
throw new IllegalArgumentException(message);
}
}

public static void isTrue(boolean value, String message) {
if (!value) {
throw new IllegalArgumentException(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 dengliming.
* Copyright 2021-2022 dengliming.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,32 +141,45 @@ public RFuture<Double> getMaxAsync() {
/**
* Returns an estimate of the cutoff such that a specified fraction of the data added to this TDigest would be less than or equal to the cutoff.
* <p>
* TDIGEST.QUANTILE {key} {quantile}
* TDIGEST.QUANTILE {key} quantile [quantile ...]
*
* @return
*/
public Double getQuantile(double quantile) {
return get(getQuantileAsync(quantile));
public List<Double> getQuantile(double... quantiles) {
return get(getQuantileAsync(quantiles));
}

public RFuture<Double> getQuantileAsync(double quantile) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TDIGEST_QUANTILE, getName(), quantile);
public RFuture<List<Double>> getQuantileAsync(double... quantiles) {
RAssert.notEmpty(quantiles, "quantiles must not be empty");

List<Object> params = new ArrayList<>(quantiles.length + 1);
params.add(getName());
for (double quantile : quantiles) {
params.add(quantile);
}
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TDIGEST_QUANTILE, params.toArray());
}

/**
* Returns the fraction of all points added which are <= value.
* <p>
* TDIGEST.CDF {key} {value}
* TDIGEST.CDF {key} value [value ...]
*
* @param value
* @param values
* @return
*/
public Double getCdf(double value) {
return get(getCdfAsync(value));
public List<Double> getCdf(double... values) {
return get(getCdfAsync(values));
}

public RFuture<Double> getCdfAsync(double value) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TDIGEST_CDF, getName(), value);
public RFuture<List<Double>> getCdfAsync(double... values) {
RAssert.notEmpty(values, "values must not be empty");
List<Object> params = new ArrayList<>(values.length + 1);
params.add(getName());
for (double value : values) {
params.add(value);
}
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TDIGEST_CDF, params.toArray());
}

/**
Expand All @@ -180,7 +193,7 @@ public boolean mergeTo(String toKey) {
}

public RFuture<Boolean> mergeToAsync(String toKey) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TDIGEST_MERGE, toKey, getName());
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TDIGEST_MERGE, toKey, 1, getName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 dengliming.
* Copyright 2020-2022 dengliming.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,7 +84,7 @@ public interface RedisCommands {
RedisCommand TDIGEST_INFO = new RedisCommand("TDIGEST.INFO", new ListMultiDecoder2(new TDigestDecoder()));
RedisCommand TDIGEST_MIN = new RedisCommand("TDIGEST.MIN", new DoubleReplayConvertor());
RedisCommand TDIGEST_MAX = new RedisCommand("TDIGEST.MAX", new DoubleReplayConvertor());
RedisCommand TDIGEST_QUANTILE = new RedisCommand("TDIGEST.QUANTILE", new DoubleReplayConvertor());
RedisCommand TDIGEST_CDF = new RedisCommand("TDIGEST.CDF", new DoubleReplayConvertor());
RedisCommand<List<Double>> TDIGEST_QUANTILE = new RedisCommand("TDIGEST.QUANTILE", new ObjectListReplayDecoder(), new DoubleReplayConvertor());
RedisCommand<List<Double>> TDIGEST_CDF = new RedisCommand("TDIGEST.CDF", new ObjectListReplayDecoder(), new DoubleReplayConvertor());
RedisCommand TDIGEST_MERGE = new RedisCommand("TDIGEST.MERGE", new BooleanReplayConvertor());
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 dengliming.
* Copyright 2021-2022 dengliming.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,11 @@
package io.github.dengliming.redismodule.redisbloom;

import io.github.dengliming.redismodule.redisbloom.model.TDigestInfo;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -72,32 +73,33 @@ public void testMinAndMax() {
public void testQuantile() {
TDigest tDigest = getRedisBloomClient().getTDigest("t-digest");
assertThat(tDigest.create(500)).isTrue();

List<AbstractMap.SimpleEntry<Double, Double>> vals = new ArrayList<>();
for (int i = 1; i <= 10000; i++) {
vals.add(new AbstractMap.SimpleEntry(i * 0.01, 1.0));
}
List<AbstractMap.SimpleEntry<Double, Double>> vals = Arrays.asList(
new AbstractMap.SimpleEntry(1.0, 1.0),
new AbstractMap.SimpleEntry(2.0, 1.0),
new AbstractMap.SimpleEntry(3.0, 1.0)
);

assertThat(tDigest.add(vals)).isTrue();

assertThat(tDigest.getQuantile(0.01)).isCloseTo(1.0, Offset.offset(0.01));
assertThat(tDigest.getQuantile(0.99)).isCloseTo(99.0, Offset.offset(0.01));
assertThat(tDigest.getQuantile(1.0).get(0)).isEqualTo(3.0);
assertThat(tDigest.getQuantile(0).get(0)).isEqualTo(1.0);
}

@Test
public void testCdf() {
TDigest tDigest = getRedisBloomClient().getTDigest("t-digest");
assertThat(tDigest.create(500)).isTrue();

List<AbstractMap.SimpleEntry<Double, Double>> vals = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
vals.add(new AbstractMap.SimpleEntry(i, 1.0));
}
List<AbstractMap.SimpleEntry<Double, Double>> vals = Arrays.asList(
new AbstractMap.SimpleEntry(1.0, 1.0),
new AbstractMap.SimpleEntry(2.0, 1.0),
new AbstractMap.SimpleEntry(3.0, 1.0)
);

assertThat(tDigest.add(vals)).isTrue();

assertThat(tDigest.getCdf(1.0)).isCloseTo(0.01, Offset.offset(0.01));
assertThat(tDigest.getCdf(99.0)).isCloseTo(0.99, Offset.offset(0.01));
assertThat(tDigest.getCdf(10.0).get(0)).isEqualTo(1.0);
assertThat(tDigest.getCdf(0.0).get(0)).isEqualTo(0.0);
}

@Test
Expand All @@ -114,7 +116,7 @@ public void testReset() {

TDigestInfo tDigestInfo = tDigest.getInfo();
assertThat(tDigestInfo).isNotNull();
assertThat(tDigestInfo.getUnmergedNodes()).isEqualTo(100);
assertThat(tDigestInfo.getUnmergedNodes()).isEqualTo(200);

assertThat(tDigest.reset()).isTrue();
tDigestInfo = tDigest.getInfo();
Expand Down Expand Up @@ -145,6 +147,6 @@ public void testMerge() {

TDigestInfo toDigestInfo = toDigest.getInfo();
assertThat(toDigestInfo).isNotNull();
assertThat(toDigestInfo.getMergedWeight() + toDigestInfo.getUnmergedWeight()).isEqualTo(1100);
assertThat(toDigestInfo.getMergedWeight() + toDigestInfo.getUnmergedWeight()).isEqualTo(400);
}
}

0 comments on commit 430d75a

Please sign in to comment.