Skip to content

Commit

Permalink
Add k-hop for example and benchmark (#407)
Browse files Browse the repository at this point in the history
* [GAIA] Example & Bench: add k-hop and graph index;

* [GAIA] example & bench: add correlated k-hop;

* [GAIA] import a-hash;

* [GAIA] generate dot file for dataflow task;
  • Loading branch information
bmmcq committed Jun 15, 2021
1 parent 5686805 commit 40b744f
Show file tree
Hide file tree
Showing 104 changed files with 1,171 additions and 397 deletions.
5 changes: 4 additions & 1 deletion research/gaia/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ cmake-build-debug/
.settings

# Local build
scripts/bin
scripts/bin

# Dataflow dot
**/*.dot
3 changes: 2 additions & 1 deletion research/gaia/pegasus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ members = [
"network",
"executor",
"pegasus",
"server"
"server",
"graph"
]

[profile.release]
Expand Down
15 changes: 15 additions & 0 deletions research/gaia/pegasus/graph/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "pegasus_graph"
version = "0.1.0"
authors = ["chenqiang.mcq <chenqiang.mcq@alibaba-inc.com>"]
edition = "2018"

[dependencies]
byteorder = "1.3.0"
nohash-hasher = "0.2.0"
#snap = "1"
rand = "0.8.3"
memmap = "0.7.0"

[dev-dependencies]
structopt = "0.2"
23 changes: 23 additions & 0 deletions research/gaia/pegasus/graph/examples/graph_load.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::path::PathBuf;
use structopt::StructOpt;
use std::time::{Instant, Duration};

#[derive(Debug, StructOpt)]
#[structopt(name = "graph_load", about = "load graph topology")]
struct Config {
/// The path of the origin graph data ;
#[structopt(long = "data", parse(from_os_str))]
data_path : PathBuf,
}

fn main() {
let config: Config = Config::from_args();
let graph = pegasus_graph::load(&config.data_path).unwrap();
let samples = graph.sample_vertices(1000_000);
let start = Instant::now();
let mut count = 0;
for id in samples {
count += graph.count_neighbors(id);
}
println!("searched 1 million vertices' neighbors, total touched {} edges, use: {:?};", count, start.elapsed());
}
235 changes: 235 additions & 0 deletions research/gaia/pegasus/graph/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use std::sync::Arc;
use std::path::Path;
use std::io::{BufReader, BufRead, Write, Read};
use std::fs::{File, OpenOptions};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::time::Instant;
use std::ffi::OsStr;
use nohash_hasher::{IntMap, BuildNoHashHasher};
use std::collections::HashMap;
use rand::Rng;
use memmap::Mmap;
use std::ops::{DerefMut, Deref};

pub struct Graph {
topology : IntMap<u64, Arc<Vec<u64>>>,
id_range: (u64, u64),
}

impl Graph {
#[inline]
pub fn get_neighbors(&self, id: u64) -> Neighbors {
self.topology.get(&id).map(|list| Neighbors::new(list))
.unwrap_or(Neighbors::empty())
}

#[inline]
pub fn count_neighbors(&self, id: u64) -> usize {
self.topology.get(&id).map(|list| list.len()).unwrap_or(0)
}

pub fn sample_vertices(&self, size: usize) -> Vec<u64> {
let mut rn = rand::thread_rng();
let mut sample = Vec::with_capacity(size);
while sample.len() < size {
let src = rn.gen_range(self.id_range.0 .. self.id_range.1);
if self.count_neighbors(src) > 0 {
sample.push(src);
}
}
return sample;
}
}

pub struct Neighbors {
ptr: Arc<Vec<u64>>,
cursor: usize,
}

impl Neighbors {
pub fn new(ptr: &Arc<Vec<u64>>) -> Self {
Neighbors {
ptr : ptr.clone(),
cursor: 0
}
}

pub fn empty() -> Self {
Neighbors {
ptr : Arc::new(vec![]),
cursor: 0
}
}
}

impl Iterator for Neighbors {
type Item = u64;

fn next(&mut self) -> Option<Self::Item> {
if self.cursor < self.ptr.len() {
self.cursor += 1;
Some(self.ptr[self.cursor - 1])
} else {
None
}
}
}


pub fn load<P: AsRef<Path>>(path: P) -> std::io::Result<Graph> {
let start = Instant::now();
let graph = if path.as_ref().extension() == Some(OsStr::new("bin")) {
println!("start load binary ... ");
//let reader = BufReader::new(snap::read::FrameDecoder::new(File::open(path)?));
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? } ;
let reader = mmap.deref();
load_bin(reader)
} else {
let p = path.as_ref().with_extension("bin");
match File::open(p) {
Ok(file) => {
println!("start load binary ... ");
//let reader = snap::read::FrameDecoder::new(file);
let mmap = unsafe { Mmap::map(&file)? } ;
let reader = mmap.deref();
load_bin(reader)
},
Err(_) => {
println!("start load raw ...");
load_raw(path)
}
}

}?;
println!("load graph with {} vertices, cost {:?}", graph.topology.len() , start.elapsed());
Ok(graph)
}

fn load_bin<R: Read>(mut reader: R) -> std::io::Result<Graph> {
let min_id = reader.read_u64::<LittleEndian>()?;
let max_id = reader.read_u64::<LittleEndian>()?;
let vertices = read_id_list(&mut reader)?;
let mut map = HashMap::with_capacity_and_hasher(vertices.len(), BuildNoHashHasher::default());
for src in vertices {
let list = read_id_list(&mut reader)?;
map.insert(src, Arc::new(list));
}
Ok(Graph { topology: map, id_range: (min_id, max_id) })
}

fn load_raw<P: AsRef<Path>>(path: P) -> std::io::Result<Graph> {
let as_bin = path.as_ref().with_extension("bin");
let reader = BufReader::new(File::open(path)?);
let spt = std::env::var("GRAPH_SPLIT").unwrap_or(" ".to_owned()).parse::<char>().unwrap();
println!("use split '{}'", spt);
let mut map: IntMap<u64, Vec<u64>> = IntMap::default();
let mut count = 0;
let mut max_id = 0;
let mut min_id = !0u64;
let start = Instant::now();
for (i, edge) in reader.lines().enumerate() {
match edge {
Ok(e) => {
let mut ids = e.split(spt).filter(|s| !s.is_empty());
if let Some(src) = ids.next() {
match src.trim().parse::<u64>() {
Ok(src_id) => {
max_id = std::cmp::max(src_id, max_id);
min_id = std::cmp::min(src_id, min_id);
let nei = map.entry(src_id).or_insert_with(|| Vec::new());
for dst in ids {
match dst.trim().parse::<u64>() {
Ok(dst_id) => {
max_id = std::cmp::max(dst_id, max_id);
min_id = std::cmp::min(dst_id, min_id);
nei.push(dst_id);
count += 1;
},
Err(e) => {
eprintln!("can't parse line {}: {} ", i, e);
}
}
}
},
Err(e) => {
eprintln!("can't parse line {}:'{}'", i, e);
}
}
} else {
eprintln!("invalid line: {}: {}", i, e);
}
},
Err(e) => {
eprintln!("can't parse line to string {}", e);
}
}
if count % 1000_000 == 0 && count != 0 {
println!("already load {} edges, use {:?}", count, start.elapsed());
}
}

let size = map.len();
let mut graph = HashMap::with_capacity_and_hasher(size, BuildNoHashHasher::default());
let mut vertices = Vec::with_capacity(size);
let mut length = size + 3;
for (id, edges) in map.iter() {
vertices.push(*id);
length += edges.len();
}

length = (length * std::mem::size_of::<u64>()).next_power_of_two();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true).open(as_bin)?;
file.set_len(length as u64)?;
let mmap = unsafe { Mmap::map(&file) }?;
let mut bytes = mmap.make_mut()?;
let mut writer = bytes.deref_mut();
writer.write_u64::<LittleEndian>(min_id)?;
writer.write_u64::<LittleEndian>(max_id)?;
write_id_list(&mut writer, &vertices)?;

for (k, v) in map {
write_id_list(&mut writer, &v)?;
graph.insert(k, Arc::new(v));
}

bytes.flush()?;
Ok(Graph { topology: graph, id_range: (min_id, max_id) })
}


#[inline]
fn write_id_list<W: Write>(writer: &mut W, list: &Vec<u64>) -> std::io::Result<()> {
let len = list.len();
writer.write_u64::<LittleEndian>(len as u64)?;
let buf = unsafe {
let ptr = list.as_ptr() as *const u8;
let size = len * std::mem::size_of::<u64>();
std::slice::from_raw_parts(ptr, size)
};
writer.write_all(buf)?;
Ok(())
}

#[inline]
fn read_id_list<R: Read>(reader: &mut R) -> std::io::Result<Vec<u64>> {
let size = reader.read_u64::<LittleEndian>()? as usize;
if size > 0 {
let length = size * std::mem::size_of::<u64>();
let mut buf = vec![0u8; length];
reader.read_exact(&mut buf)?;
let list = unsafe {
let ptr = buf.as_ptr() as *mut u64;
Vec::from_raw_parts(ptr, size, size)
};
std::mem::forget(buf);
Ok(list)
} else {
Ok(vec![])
}

}
4 changes: 4 additions & 0 deletions research/gaia/pegasus/pegasus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pegasus_common = { path = "../common" }
pegasus_memory = { path = "../memory" }
pegasus_network = { path = "../network" }
pegasus_executor = { path = "../executor" }
pegasus_graph = { path = "../graph" }
crossbeam-channel = "0.3.6"
crossbeam-queue = "0.1"
crossbeam-utils = "0.6"
Expand All @@ -21,6 +22,8 @@ hibitset = "0.6.3"
enum_dispatch = "0.3"
toml = "0.5"
serde = { version = "1.0", features = ["derive"] }
ahash = "0.7.2"
dot = "0.1.4"

[features]
default = []
Expand All @@ -31,3 +34,4 @@ time = "0.1"
env_logger = { version = "0.6" }
structopt = "0.2"
rand = "0.8.3"

6 changes: 3 additions & 3 deletions research/gaia/pegasus/pegasus/benches/bench_tag.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Loading

0 comments on commit 40b744f

Please sign in to comment.