Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datasketches module updates #1991

Merged
merged 2 commits into from Nov 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/development/datasketches-aggregators.md
Expand Up @@ -5,6 +5,7 @@ layout: doc_page
## DataSketches aggregator
Druid aggregators based on [datasketches]()http://datasketches.github.io/) library. Note that sketch algorithms are approxiate, see details in the "Accuracy" section of the datasketches doc.
At ingestion time, this aggregator creates the theta sketch objects which get stored in Druid segments. Logically speaking, a theta sketch object can be thought of as a Set data structure. At query time, sketches are read and aggregated(set unioned) together. In the end, by default, you receive the estimate of number of unique entries in the sketch object. Also, You can use post aggregators to do union, intersection or difference on sketch columns in the same row.
Note that you can use `thetaSketch` aggregator on columns which were not ingested using same, it will return estimated cardinality of the column. It is recommended to use it at ingestion time as well to make querying faster.

### Aggregators

Expand Down
185 changes: 93 additions & 92 deletions extensions/datasketches/pom.xml
Expand Up @@ -18,102 +18,103 @@
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<name>druid-datasketches</name>
<description>Druid Aggregators based on datasketches lib http://datasketches.github.io/</description>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<name>druid-datasketches</name>
<description>Druid Aggregators based on datasketches lib http://datasketches.github.io/</description>

<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.1.1</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${druid.api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependencies>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.2.2</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${druid.api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-smile-provider</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-smile-provider</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Expand Up @@ -19,17 +19,22 @@

package io.druid.query.aggregation.datasketches.theta;

import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOpReturnState;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;

import java.util.List;

public class SketchAggregator implements Aggregator
{
private static final Logger logger = new Logger(SketchAggregator.class);

private final ObjectColumnSelector selector;
private final String name;
private final int size;
Expand All @@ -48,21 +53,11 @@ public SketchAggregator(String name, ObjectColumnSelector selector, int size)
public void aggregate()
{
Object update = selector.get();

if(update == null) {
if (update == null) {
return;
}

SetOpReturnState success;
if (update instanceof Memory) {
success = union.update((Memory) update);
} else {
success = union.update((Sketch) update);
}

if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch Aggregation failed with state " + success);
}
updateUnion(union, update);
}

@Override
Expand Down Expand Up @@ -105,4 +100,31 @@ public void close()
{
union = null;
}

static void updateUnion(Union union, Object update)
{
if (update instanceof Memory) {
union.update((Memory) update);
} else if (update instanceof Sketch) {
union.update((Sketch) update);
} else if (update instanceof String) {
union.update((String) update);
} else if (update instanceof byte[]) {
union.update((byte[]) update);
} else if (update instanceof Double) {
union.update(((Double) update));
} else if (update instanceof Integer || update instanceof Long) {
union.update(((Number) update).longValue());
} else if (update instanceof int[]) {
union.update((int[]) update);
} else if (update instanceof long[]) {
union.update((long[]) update);
} else if (update instanceof List) {
for (Object entry : (List) update) {
union.update(entry.toString());
}
} else {
throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
}
}
}
Expand Up @@ -19,13 +19,12 @@

package io.druid.query.aggregation.datasketches.theta;

import com.metamx.common.logger.Logger;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.memory.MemoryRegion;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.SetOpReturnState;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;
Expand All @@ -36,6 +35,8 @@

public class SketchBufferAggregator implements BufferAggregator
{
private static final Logger logger = new Logger(SketchAggregator.class);

private final ObjectColumnSelector selector;
private final int size;
private final int maxIntermediateSize;
Expand All @@ -59,28 +60,19 @@ public void init(ByteBuffer buf, int position)
}

Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
unions.put(position, (Union) SetOperation.builder().setMemory(mem).build(size, Family.UNION));
unions.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION));
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
Object update = selector.get();
if(update == null) {
if (update == null) {
return;
}

Union union = getUnion(buf, position);
SetOpReturnState success;
if (update instanceof Memory) {
success = union.update((Memory) update);
} else {
success = union.update((Sketch) update);
}

if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch Buffer Aggregation failed with state " + update);
}
SketchAggregator.updateUnion(union, update);
}

@Override
Expand All @@ -98,7 +90,7 @@ public Object get(ByteBuffer buf, int position)
private Union getUnion(ByteBuffer buf, int position)
{
Union union = unions.get(position);
if(union == null) {
if (union == null) {
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
union = (Union) SetOperation.wrap(mem);
unions.put(position, union);
Expand All @@ -119,7 +111,8 @@ public long getLong(ByteBuffer buf, int position)
}

@Override
public void close() {
public void close()
{
unions.clear();
}

Expand Down