Skip to content

Commit

Permalink
Project structure, pom, Recordinality implementation, gitignore, and …
Browse files Browse the repository at this point in the history
…test data.
  • Loading branch information
cscotta committed Aug 19, 2013
1 parent 1734319 commit f431468
Show file tree
Hide file tree
Showing 4 changed files with 17,310 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
@@ -0,0 +1,5 @@
.DS_Store
*.iml
.idea/
target/
*.class
108 changes: 108 additions & 0 deletions pom.xml
@@ -0,0 +1,108 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cscotta</groupId>
<artifactId>recordinality</artifactId>
<version>0.1</version>
<name>recordinality</name>
<url>https://www.github.com/cscotta/recordinality</url>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>12.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.8.1</version>
<configuration>
<useFile>false</useFile>
<useSystemClassLoader>false</useSystemClassLoader>
<argLine>-Xmx512m</argLine>
<includes>
<exclude>**/*Test.java</exclude>
<include>**/*Spec.java</include>
</includes>
<excludes>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>junit:junit</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>central</id>
<name>Maven repository</name>
<url>http://repo1.maven.org/maven2</url>
</repository>
</repositories>
</project>
157 changes: 157 additions & 0 deletions src/main/java/com/cscotta/recordinality/Recordinality.java
@@ -0,0 +1,157 @@
/*
* Copyright 2013, C. Scott Andreas (@cscotta / scott@paradoxica.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cscotta.recordinality;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.hash.Hashing;
import com.google.common.hash.HashFunction;
import com.google.common.collect.ImmutableSet;

public class Recordinality {

private final int sampleSize;
private final HashFunction hash = Hashing.murmur3_128();
private final AtomicLong modifications = new AtomicLong(0);
private final AtomicLong cachedMin = new AtomicLong(Long.MIN_VALUE);
private final ConcurrentSkipListMap<Long, Element> kMap =
new ConcurrentSkipListMap<Long, Element>();

/*
* Initializes a new Recordinality instance with a configurable 'k'-size.
*/
public Recordinality(int sampleSize) {
this.sampleSize = sampleSize;
}

/*
* Observes a value in a stream.
*/
public void observe(String element) {
boolean inserted = insertIfFits(element);
if (inserted) modifications.incrementAndGet();
}

/*
* Returns an estimate of the stream's cardinality. For a description
* of this estimator, see see §2 Theorem 1 in the Recordinality paper:
* http://www-apr.lip6.fr/~lumbroso/Publications/HeLuMaVi12.pdf
*/
public long estimateCardinality() {
synchronized (this) {
long pow = modifications.get() - sampleSize + 1;
double estimate = (sampleSize * (Math.pow(1 + (1.0 / sampleSize), pow))) - 1;
return (long) estimate;
}
}

/*
* Returns an estimate of Recordinality's error for this stream,
* expressed in terms of standard error. For a description of this estimator,
* see §2 Theorem 2: http://www-apr.lip6.fr/~lumbroso/Publications/HeLuMaVi12.pdf
*/
public double estimateError() {
synchronized (this) {
double estCardinality = estimateCardinality();
double error = Math.sqrt(Math.pow(
(estCardinality / (sampleSize * Math.E)), (1.0 / sampleSize)) - 1);
return error;
}
}

/*
* Returns the current set of k-records observed in the stream.
*/
public Set<Element> getSample() {
synchronized (this) {
return ImmutableSet.copyOf(kMap.values());
}
}

/*
* Inserts a record into our k-set if it fits.
*/
private boolean insertIfFits(String element) {
long hashedValue = hash.hashString(element).asLong();

// Short-circuit if our k-set is saturated. Common case.
if (kMap.size() >= sampleSize && hashedValue < cachedMin.get())
return false;

synchronized (this) {
int mapSize = kMap.size();
assert(mapSize <= sampleSize);

if (mapSize < sampleSize || hashedValue >= cachedMin.get()) {
Element existing = kMap.get(hashedValue);
if (existing != null) {
existing.count.incrementAndGet();
return false;
} else {
kMap.put(hashedValue, new Element(element));
long lowestKey = kMap.firstKey();
cachedMin.set(lowestKey);
if (mapSize == sampleSize) kMap.remove(lowestKey);
return true;
}
}

return false;
}
}

@Override
public String toString() {
synchronized (this) {
return "Recordinality{" +
"sampleSize=" + sampleSize +
", hash=" + hash +
", modifications=" + modifications.get() +
", cachedMin=" + cachedMin.get() +
", mapSize=" + kMap.size() +
", estCardinality=" + estimateCardinality() +
", estError=" + estimateError() +
'}';
}
}

/*
* Inner class representing a pair of an observed k-record in the stream
* along with the number of times it has been observed.
*/
public class Element {

public final String value;
public final AtomicLong count;

public Element(String value) {
this.value = value;
this.count = new AtomicLong(1);
}

@Override
public String toString() {
return "Element{" +
"value='" + value + '\'' +
", count=" + count +
'}';
}
}

}

0 comments on commit f431468

Please sign in to comment.