Skip to content

Update read trait #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/graph_impl/static_graph/edge_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,5 +592,4 @@ mod test {

assert_eq!(edges.edges, vec![3_u32, 3, 3, 0, 1, 2, 2]);
}

}
4 changes: 2 additions & 2 deletions src/io/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ pub fn read_from_csv<Id, NL, EL, G, P>(
is_flexible: bool,
) where
for<'de> Id: IdType + Serialize + Deserialize<'de>,
for<'de> NL: Hash + Eq + Serialize + Deserialize<'de>,
for<'de> EL: Hash + Eq + Serialize + Deserialize<'de>,
for<'de> NL: Hash + Eq + Serialize + Deserialize<'de> + 'static,
for<'de> EL: Hash + Eq + Serialize + Deserialize<'de> + 'static,
G: MutGraphTrait<Id, NL, EL>,
P: AsRef<Path>,
{
Expand Down
79 changes: 42 additions & 37 deletions src/io/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use generic::{IdType, Iter};
use io::csv::record::{EdgeRecord, NodeRecord, PropEdgeRecord, PropNodeRecord};
use io::csv::JsonValue;
use io::{ReadGraph, ReadGraphTo};
use itertools::Itertools;
use serde::Deserialize;
use serde_json::{from_str, to_value};

Expand Down Expand Up @@ -67,15 +68,21 @@ impl<Id: IdType, NL: Hash + Eq, EL: Hash + Eq> Clone for CSVReader<Id, NL, EL> {

impl<Id: IdType, NL: Hash + Eq, EL: Hash + Eq> CSVReader<Id, NL, EL> {
pub fn new<P: AsRef<Path>>(path_to_nodes: Vec<P>, path_to_edges: Vec<P>) -> Self {
let mut path_to_nodes: Vec<PathBuf> = path_to_nodes
.into_iter()
.flat_map(|p| list_files(p))
.collect_vec();
path_to_nodes.sort();

let mut path_to_edges: Vec<PathBuf> = path_to_edges
.into_iter()
.flat_map(|p| list_files(p))
.collect_vec();
path_to_edges.sort();

CSVReader {
path_to_nodes: path_to_nodes
.into_iter()
.flat_map(|p| list_files(p))
.collect(),
path_to_edges: path_to_edges
.into_iter()
.flat_map(|p| list_files(p))
.collect(),
path_to_nodes,
path_to_edges,
separator: b',',
has_headers: true,
is_flexible: false,
Expand Down Expand Up @@ -113,20 +120,20 @@ impl<Id: IdType, NL: Hash + Eq, EL: Hash + Eq> CSVReader<Id, NL, EL> {
}
}

impl<Id: IdType, NL: Hash + Eq, EL: Hash + Eq> ReadGraph<Id, NL, EL> for CSVReader<Id, NL, EL>
impl<Id: IdType, NL: Hash + Eq + 'static, EL: Hash + Eq + 'static> ReadGraph<Id, NL, EL>
for CSVReader<Id, NL, EL>
where
for<'de> Id: Deserialize<'de>,
for<'de> NL: Deserialize<'de>,
for<'de> EL: Deserialize<'de>,
{
fn node_iter(&self) -> Iter<(Id, Option<NL>)> {
let vec = self.path_to_nodes.clone();
fn get_node_iter(&self, idx: usize) -> Option<Iter<(Id, Option<NL>)>> {
let node_file = self.path_to_nodes.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
node_file
.map(move |path_to_nodes| {
let str_node_path = path_to_nodes.as_path().to_str().unwrap();
info!("Reading nodes from {}", str_node_path);
Expand All @@ -152,19 +159,16 @@ where
}
})
})
.flat_map(|x| x);

Iter::new(Box::new(iter))
.map(|iter| Iter::new(Box::new(iter)))
}

fn edge_iter(&self) -> Iter<(Id, Id, Option<EL>)> {
let vec = self.path_to_edges.clone();
fn get_edge_iter(&self, idx: usize) -> Option<Iter<(Id, Id, Option<EL>)>> {
let edge_file = self.path_to_edges.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
edge_file
.map(move |path_to_edges| {
info!(
"Reading edges from {}",
Expand Down Expand Up @@ -192,21 +196,18 @@ where
}
})
})
.flat_map(|x| x);

Iter::new(Box::new(iter))
.map(|iter| Iter::new(Box::new(iter)))
}

fn prop_node_iter(&self) -> Iter<(Id, Option<NL>, JsonValue)> {
fn get_prop_node_iter(&self, idx: usize) -> Option<Iter<(Id, Option<NL>, JsonValue)>> {
assert!(self.has_headers);

let vec = self.path_to_nodes.clone();
let node_file = self.path_to_nodes.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
node_file
.map(move |path_to_nodes| {
info!(
"Reading nodes from {}",
Expand All @@ -232,21 +233,18 @@ where
(record.id, record.label, prop)
})
})
.flat_map(|x| x);

Iter::new(Box::new(iter))
.map(|iter| Iter::new(Box::new(iter)))
}

fn prop_edge_iter(&self) -> Iter<(Id, Id, Option<EL>, JsonValue)> {
fn get_prop_edge_iter(&self, idx: usize) -> Option<Iter<(Id, Id, Option<EL>, JsonValue)>> {
assert!(self.has_headers);

let vec = self.path_to_edges.clone();
let edge_file = self.path_to_edges.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
edge_file
.map(move |path_to_edges| {
info!(
"Reading edges from {}",
Expand All @@ -272,13 +270,20 @@ where
(record.src, record.dst, record.label, prop)
})
})
.flat_map(|x| x);
.map(|iter| Iter::new(Box::new(iter)))
}

fn num_of_node_files(&self) -> usize {
self.path_to_nodes.len()
}

Iter::new(Box::new(iter))
fn num_of_edge_files(&self) -> usize {
self.path_to_edges.len()
}
}

impl<Id: IdType, NL: Hash + Eq, EL: Hash + Eq> ReadGraphTo<Id, NL, EL> for CSVReader<Id, NL, EL>
impl<Id: IdType, NL: Hash + Eq + 'static, EL: Hash + Eq + 'static> ReadGraphTo<Id, NL, EL>
for CSVReader<Id, NL, EL>
where
for<'de> Id: Deserialize<'de>,
for<'de> NL: Deserialize<'de>,
Expand Down
4 changes: 2 additions & 2 deletions src/io/hdfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ pub fn read_from_hdfs<Id, NL, EL, G, P>(
is_flexible: bool,
) where
for<'de> Id: IdType + Serialize + Deserialize<'de>,
for<'de> NL: Hash + Eq + Serialize + Deserialize<'de>,
for<'de> EL: Hash + Eq + Serialize + Deserialize<'de>,
for<'de> NL: Hash + Eq + Serialize + Deserialize<'de> + 'static,
for<'de> EL: Hash + Eq + Serialize + Deserialize<'de> + 'static,
G: MutGraphTrait<Id, NL, EL>,
P: AsRef<Path>,
{
Expand Down
54 changes: 26 additions & 28 deletions src/io/hdfs/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,20 @@ impl<'a, Id: IdType, NL: Hash + Eq, EL: Hash + Eq> HDFSReader<'a, Id, NL, EL> {
}
}

impl<'a, Id: IdType, NL: Hash + Eq, EL: Hash + Eq> ReadGraph<Id, NL, EL>
impl<'a, Id: IdType, NL: Hash + Eq + 'static, EL: Hash + Eq + 'static> ReadGraph<Id, NL, EL>
for HDFSReader<'a, Id, NL, EL>
where
for<'de> Id: Deserialize<'de>,
for<'de> NL: Deserialize<'de>,
for<'de> EL: Deserialize<'de>,
{
fn node_iter(&self) -> Iter<(Id, Option<NL>)> {
let vec = self.path_to_nodes.clone();
fn get_node_iter(&self, idx: usize) -> Option<Iter<(Id, Option<NL>)>> {
let node_file = self.path_to_nodes.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
node_file
.map(move |path_to_nodes| {
let str_node_path = path_to_nodes.as_path().to_str().unwrap();
info!("Reading nodes from {}", str_node_path);
Expand Down Expand Up @@ -175,18 +174,16 @@ where
}
})
})
.flat_map(|x| x);;
Iter::new(Box::new(iter))
.map(|iter| Iter::new(Box::new(iter)))
}

fn edge_iter(&self) -> Iter<(Id, Id, Option<EL>)> {
let vec = self.path_to_edges.clone();
fn get_edge_iter(&self, idx: usize) -> Option<Iter<(Id, Id, Option<EL>)>> {
let edge_file = self.path_to_edges.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
edge_file
.map(move |path_to_edges| {
let str_edge_path = path_to_edges.as_path().to_str().unwrap();
info!("Reading edges from {}", str_edge_path);
Expand Down Expand Up @@ -216,21 +213,18 @@ where
}
})
})
.flat_map(|x| x);

Iter::new(Box::new(iter))
.map(|iter| Iter::new(Box::new(iter)))
}

fn prop_node_iter(&self) -> Iter<(Id, Option<NL>, JsonValue)> {
fn get_prop_node_iter(&self, idx: usize) -> Option<Iter<(Id, Option<NL>, JsonValue)>> {
assert!(self.has_headers);

let vec = self.path_to_nodes.clone();
let node_file = self.path_to_nodes.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
node_file
.map(move |path_to_nodes| {
let str_node_path = path_to_nodes.as_path().to_str().unwrap();
info!("Reading nodes from {}", str_node_path);
Expand Down Expand Up @@ -258,19 +252,16 @@ where
(record.id, record.label, prop)
})
})
.flat_map(|x| x);

Iter::new(Box::new(iter))
.map(|iter| Iter::new(Box::new(iter)))
}

fn prop_edge_iter(&self) -> Iter<(Id, Id, Option<EL>, JsonValue)> {
let vec = self.path_to_edges.clone();
fn get_prop_edge_iter(&self, idx: usize) -> Option<Iter<(Id, Id, Option<EL>, JsonValue)>> {
let edge_file = self.path_to_edges.get(idx).cloned();
let has_headers = self.has_headers;
let is_flexible = self.is_flexible;
let separator = self.separator;

let iter = vec
.into_iter()
edge_file
.map(move |path_to_edges| {
let str_edge_path = path_to_edges.as_path().to_str().unwrap();
info!("Reading edges from {}", str_edge_path);
Expand Down Expand Up @@ -298,13 +289,19 @@ where
(record.src, record.dst, record.label, prop)
})
})
.flat_map(|x| x);
.map(|iter| Iter::new(Box::new(iter)))
}

Iter::new(Box::new(iter))
fn num_of_node_files(&self) -> usize {
self.path_to_nodes.len()
}

fn num_of_edge_files(&self) -> usize {
self.path_to_edges.len()
}
}

impl<'a, Id: IdType, NL: Hash + Eq, EL: Hash + Eq> ReadGraphTo<Id, NL, EL>
impl<'a, Id: IdType, NL: Hash + Eq + 'static, EL: Hash + Eq + 'static> ReadGraphTo<Id, NL, EL>
for HDFSReader<'a, Id, NL, EL>
where
for<'de> Id: Deserialize<'de>,
Expand Down Expand Up @@ -346,5 +343,6 @@ fn list_hdfs_files<P: AsRef<Path>>(p: P) -> Vec<PathBuf> {
}
}
}

fold_path_vec
}
52 changes: 46 additions & 6 deletions src/io/read_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,62 @@
*/
use generic::{IdType, Iter, MutGraphTrait};
use io::csv::JsonValue;
use itertools::Itertools;
use serde::Deserialize;
use std::hash::Hash;

pub trait ReadGraph<Id: IdType, NL: Hash + Eq, EL: Hash + Eq>
pub trait ReadGraph<Id: IdType, NL: Hash + Eq + 'static, EL: Hash + Eq + 'static>
where
for<'de> Id: Deserialize<'de>,
for<'de> NL: Deserialize<'de>,
for<'de> EL: Deserialize<'de>,
{
fn node_iter(&self) -> Iter<(Id, Option<NL>)>;
fn edge_iter(&self) -> Iter<(Id, Id, Option<EL>)>;
fn prop_node_iter(&self) -> Iter<(Id, Option<NL>, JsonValue)>;
fn prop_edge_iter(&self) -> Iter<(Id, Id, Option<EL>, JsonValue)>;
fn get_node_iter(&self, idx: usize) -> Option<Iter<(Id, Option<NL>)>>;
fn get_edge_iter(&self, idx: usize) -> Option<Iter<(Id, Id, Option<EL>)>>;
fn get_prop_node_iter(&self, idx: usize) -> Option<Iter<(Id, Option<NL>, JsonValue)>>;
fn get_prop_edge_iter(&self, idx: usize) -> Option<Iter<(Id, Id, Option<EL>, JsonValue)>>;
fn num_of_node_files(&self) -> usize;
fn num_of_edge_files(&self) -> usize;

fn node_iter(&self) -> Iter<(Id, Option<NL>)> {
let iter_vec = (0..self.num_of_node_files())
.map(|i| self.get_node_iter(i).unwrap())
.collect_vec();
let iter = iter_vec.into_iter().flat_map(|x| x);

Iter::new(Box::new(iter))
}

fn edge_iter(&self) -> Iter<(Id, Id, Option<EL>)> {
let iter_vec = (0..self.num_of_edge_files())
.map(|i| self.get_edge_iter(i).unwrap())
.collect_vec();
let iter = iter_vec.into_iter().flat_map(|x| x);

Iter::new(Box::new(iter))
}

fn prop_node_iter(&self) -> Iter<(Id, Option<NL>, JsonValue)> {
let iter_vec = (0..self.num_of_node_files())
.map(|i| self.get_prop_node_iter(i).unwrap())
.collect_vec();
let iter = iter_vec.into_iter().flat_map(|x| x);

Iter::new(Box::new(iter))
}

fn prop_edge_iter(&self) -> Iter<(Id, Id, Option<EL>, JsonValue)> {
let iter_vec = (0..self.num_of_edge_files())
.map(|i| self.get_prop_edge_iter(i).unwrap())
.collect_vec();
let iter = iter_vec.into_iter().flat_map(|x| x);

Iter::new(Box::new(iter))
}
}

pub trait ReadGraphTo<Id: IdType, NL: Hash + Eq, EL: Hash + Eq>: ReadGraph<Id, NL, EL>
pub trait ReadGraphTo<Id: IdType, NL: Hash + Eq + 'static, EL: Hash + Eq + 'static>:
ReadGraph<Id, NL, EL>
where
for<'de> Id: Deserialize<'de>,
for<'de> NL: Deserialize<'de>,
Expand Down