Skip to content

Commit

Permalink
MinHashDeduper structure that computes dijsoint-set
Browse files Browse the repository at this point in the history
I had a really hard time to make the generic stuff in IdContainer
work along with the union-find, so ended up just making the vec public
and reading it directly
  • Loading branch information
ZJaume committed Aug 30, 2023
1 parent ec2f769 commit ebeb181
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 5 deletions.
1 change: 1 addition & 0 deletions gaoya/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ pub mod minhash;
pub mod simhash;
pub mod text;
pub mod clustering;
pub mod unionfind;
23 changes: 20 additions & 3 deletions gaoya/src/minhash/id_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;
use std::hash::{BuildHasher, Hash};
use std::ops::Index;
use std::slice::Iter;
use std::iter::Iterator;
use smallvec::{Array, SmallVec};
use crate::minhash::MinHashType;

Expand Down Expand Up @@ -68,10 +69,20 @@ impl<T: Hash + Eq + Send + Sync + Clone> IdContainer<T> for HashSetContainer<T>
fn remove(&mut self, item: &T) {
self.set.remove(item);
}

}

impl<T: Hash + Eq + Send + Sync + Clone> IntoIterator for HashSetContainer<T> {
type Item = T;
type IntoIter = std::collections::hash_set::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.set.into_iter()
}
}


pub struct VecContainer<T> {
vec: Vec<T>
pub vec: Vec<T>
}

impl<T: Hash + Eq + Send + Sync + Clone> IdContainer<T> for VecContainer<T> {
Expand All @@ -98,14 +109,21 @@ impl<T: Hash + Eq + Send + Sync + Clone> IdContainer<T> for VecContainer<T> {
}



fn remove(&mut self, item: &T) {
if let Some(index) = self.vec.iter().position(|x| x == item) {
self.vec.swap_remove(index);
}
}
}

impl<T: Hash + Eq + Send + Sync + Clone> IntoIterator for VecContainer<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.vec.into_iter()
}
}


/// SmallVecContainer uses SmallVec backed up by an array
pub struct SmallVecContainer<T, const N: usize> {
Expand Down Expand Up @@ -136,7 +154,6 @@ impl<T: Hash + Eq + Send + Sync + Clone, const N: usize> IdContainer<T> for Smal
}



fn remove(&mut self, item: &T) {
if let Some(index) = self.vec.iter().position(|x| x == item) {
self.vec.swap_remove(index);
Expand Down
240 changes: 239 additions & 1 deletion gaoya/src/minhash/minhash_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use std::ops::{Index, Range};
use ahash::{AHasher, AHashMap, AHashSet, RandomState};
use itertools::Itertools;
use crate::clustering::QueryIndex;
use crate::minhash::id_container::{HashSetContainer, IdContainer};
use crate::minhash::id_container::{HashSetContainer, IdContainer, VecContainer};
use crate::unionfind::UnionFind;


/// BandKey contains the hash of the band.
Expand Down Expand Up @@ -845,6 +846,243 @@ where
}


/// Data Structure to index minhashes into bands and extract nearduplicates disjoint-set
///
/// This is a copy of MinHashIndex without storing the <id, signatures> collection. This has
/// a new method callded `find_clusters` that goes over all the bands and computes the
/// near-duplicates disjoint-set with union-find.
///
///
///
pub struct MinHashDeduper<T>
where
T: MinHashType,
{
bands: Vec<MinHashBand<T, usize, VecContainer<usize>>>,
threshold: f64,
r: usize,
b: usize,
num_hashes: usize,
size: usize,
}

impl<T> fmt::Display for MinHashDeduper<T>
where
T: MinHashType,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MinHashDeduper<{}> {{ threshold = {}, num_hashes = {}, bands = {}, rows_per_band = {}, size = {} }}",
type_name::<T>(),
self.threshold, self.b * self.r, self.b, self.r, self.size)
}
}

impl<T> MinHashDeduper<T>
where
T: MinHashType,
{
/// Create a new MinHashDeduper
pub fn new_index(num_bands: usize,
band_width: usize,
jaccard_threshold: f64,
band_id: isize) -> Self {
let build_hasher = RandomState::with_seed(42);
let mut bands = Vec::new();
if band_id < 0 {
for i in 0..num_bands {
let (start, end) = (i * band_width, (i + 1) * band_width);
bands.push(MinHashBand::<T, usize, VecContainer<usize>>::new(start, end, build_hasher.clone()));
}
} else {
// Index with only one of all possible bands for partitioned indexing
let uband_id: usize = band_id.try_into().unwrap();
let (start, end) = (uband_id * band_width, (uband_id + 1) * band_width);
bands.push(MinHashBand::<T, usize, VecContainer<usize>>::new(start, end, build_hasher.clone()));
}
Self {
bands,
threshold: jaccard_threshold,
b: num_bands,
r: band_width,
num_hashes: num_bands * band_width,
size: 0,
}

}

/// Creates new MinHashDeduper with specified `initial_capacity` and `expected_similarity_ratio`,
/// which is the expected ratio of similar documents to the total.
/// `MinHashDeduper` stores signatures in bands, such that similar signatures locate in
/// the same location within the band. The size of the band in inversely proportional to
/// the similarity ratio.
/// For example with similarity ratio 0.9 the band size on average will be 0.1 of the total
/// number of signatures.
pub fn new_with_capacity(num_bands: usize, band_width: usize,
jaccard_threshold: f64,
initial_capacity: usize,
expected_similarity_ratio: f64) -> Self {
let mut bands = Vec::new();
let build_hasher = RandomState::new();

let band_capacity = (initial_capacity as f64 * (1.0 - expected_similarity_ratio)) as usize;
for i in 0..num_bands {
let (start, end) = (i * band_width, (i + 1) * band_width);
bands.push(MinHashBand::<T, usize, VecContainer<usize>>::new_with_capacity(start, end, band_capacity, build_hasher.clone()));
}
Self {
bands,
threshold: jaccard_threshold,
b: num_bands,
r: band_width,
num_hashes: num_bands * band_width,
size: 0,
}

}


#[inline]
pub fn insert(&mut self, id: usize, signature: Vec<T>) {
assert_eq!(self.num_hashes(), signature.len());
for band in &mut self.bands {
band.insert(id.clone(), &signature);
}
self.size += 1;
}

pub fn par_bulk_insert(&mut self, ids: Vec<usize>, signatures: Vec<Vec<T>>)
where T: Send + Sync,
{
if !signatures.is_empty() {
assert_eq!(self.num_hashes(), signatures[0].len());
}

self.bands.par_iter_mut().for_each(|band| {
for item in signatures.iter().zip(ids.iter()) {
let hashes = item.0;
let id = item.1.clone();
band.insert(id, hashes);
}
});
self.size += ids.len();
}

pub fn par_bulk_insert_pairs(&mut self, id_signature_pairs: Vec<(usize, Vec<T>)>)
where T: Send + Sync,
{
self.bands.par_iter_mut().for_each(|band| {
for item in id_signature_pairs.iter() {
let i: &(usize, Vec<T>) = item;
let (a, b) = i;
let k: usize = a.clone();
band.insert(k, b);
}
});
self.size += id_signature_pairs.len();
}

pub fn shrink_to_fit(&mut self)
where T: Send + Sync
{
self.bands.par_iter_mut()
.for_each(|band| band.shrink_to_fit());
}

pub fn shrink_to(&mut self, min_capacity: usize)
where T: Send + Sync
{
self.bands.par_iter_mut()
.for_each(|band| {
band.shrink_to(min_capacity)
});
}


pub fn clear(&mut self) {
self.bands.iter_mut().for_each(|band| band.clear());
self.size = 0;
}

pub fn query(&self, query_signature: &Vec<T>) -> HashSet<&usize> {
assert_eq!(self.num_hashes(), query_signature.len());
let mut match_ids = HashSet::with_capacity(10);
for band in &self.bands {
band.query(query_signature, &mut match_ids);
}

match_ids
}

pub fn query_owned(&self, query_signature: &Vec<T>) -> HashSet<usize> {
assert_eq!(self.num_hashes(), query_signature.len());
let mut match_ids = HashSet::with_capacity(10);
for band in &self.bands {
band.query_to_owned(query_signature, &mut match_ids);
}
match_ids
}

pub fn par_bulk_query(&self, signatures: &Vec<Vec<T>>) -> Vec<HashSet<usize>>
where
T: Send + Sync
{
signatures.par_iter()
.map(|signature| self.query_owned(signature))
.collect()
}

pub fn find_clusters(&self) -> UnionFind {
let mut uf = UnionFind::new(self.size());

// Compute unionfind components visiting each band cluster
for band in &self.bands {
for (_, cluster) in &band.hash_table {
if cluster.len() <= 1 {
continue
}
// We set all elements in the cluster as duplicates of the first element
// as the first will be an arbitrary element
let first = cluster.vec[0];
for elem in &cluster.vec {
uf.union(first, *elem);
}
}
}
uf
}

pub fn size(&self) -> usize {
self.size
}

//TODO min or max of all bands capacity?
//pub fn capacity(&self) -> usize {
// self.id_signatures.capacity()
//}

pub fn num_hashes(&self) -> usize {
self.num_hashes
}

fn band_range(&self, band_index: usize) -> Range<usize> {
band_index * self.r..(band_index + 1) * self.r
}

pub fn band_sizes(&self) -> BandStats {
let band_sizes: Vec<_> = self.bands.iter()
.map(|band| band.hash_table.len())
.collect();
let max_size = band_sizes.iter().max().unwrap();
let min_size = band_sizes.iter().min().unwrap();
BandStats {
min_size: *min_size,
max_size: *max_size,
sizes: band_sizes
}
}

}

#[derive(Debug)]
pub struct BandStats {
pub min_size: usize,
Expand Down
3 changes: 2 additions & 1 deletion gaoya/src/minhash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use self::min_hasher::MinHasher8;
pub use self::min_hasher::MinHasher16;
pub use self::min_hasher::MinHasher32;
pub use self::minhash_index::MinHashIndex;
pub use self::minhash_index::MinHashDeduper;
pub use self::string_index::MinHashStringIndex;
pub use self::id_container::IdContainer;
pub use self::id_container::HashSetContainer;
Expand Down Expand Up @@ -328,4 +329,4 @@ mod tests {
assert!(sum_similarity_from_centroid > s);
}
}
}
}
63 changes: 63 additions & 0 deletions gaoya/src/unionfind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::vec::Vec;

pub struct UnionFind {
pub parents: Vec<usize>,
pub length: usize,
}

// Implementation of the Union Find algorithm to obtain all the connected duplicates
impl UnionFind {
pub fn new(length: usize) -> Self {
Self {
parents: (0..length).collect(),
length: length,
}
}

// find the parent of a node
// after finding the uppermost parent, we set the direct parent of x, to that parent
// so we widen the tree and subsequent finds will be much faster (only one jump)
// doing mutable self because it's called from union, who has mutable self
pub fn find(&mut self, x: usize) -> usize {
let mut p = x;
while self.parents[p] != p {
p = self.parents[p];
}
self.parents[x] = p; // path compression
return p;
}

pub fn union(&mut self, x: usize, y: usize) {
if x == y {
return
}
let par_x = self.find(x);
let par_y = self.find(y);
self.parents[par_y] = par_x;
}
}


#[cfg(test)]
mod tests {
use super::*;

#[test]
fn union_find() {
let mut uf = UnionFind::new(6);
uf.union(3,2);
uf.union(4,2);

assert_eq!(uf.parents, [0, 1, 3, 4, 4, 5]);
}

#[test]
fn union_find_path_compression() {
let mut uf = UnionFind::new(6);
uf.union(3,2);
uf.union(4,2);

assert_eq!(uf.find(2), 4);
assert_eq!(uf.parents, [0, 1, 4, 4, 4, 5]);
}
}

0 comments on commit ebeb181

Please sign in to comment.