diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc index 8093ada5111d..c6f42e026d63 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc @@ -44,6 +44,17 @@ static std::vector read_header(const std::string& file_name, return res_vec; } +static bool check_primary_key_type(std::shared_ptr data_type) { + if (data_type->Equals(arrow::int64()) || data_type->Equals(arrow::uint64()) || + data_type->Equals(arrow::int32()) || data_type->Equals(arrow::uint32()) || + data_type->Equals(arrow::utf8()) || + data_type->Equals(arrow::large_utf8())) { + return true; + } else { + return false; + } +} + static void put_delimiter_option(const LoadingConfig& loading_config, arrow::csv::ParseOptions& parse_options) { auto delimiter_str = loading_config.GetDelimiter(); @@ -319,7 +330,7 @@ static void set_vertex_properties(gs::ColumnBase* col, } } -template +template static void append_edges( std::shared_ptr src_col, std::shared_ptr dst_col, const LFIndexer& src_indexer, @@ -355,204 +366,11 @@ static void append_edges( size_t cur_ind = old_size; const auto& col = is_dst ? dst_col : src_col; const auto& indexer = is_dst ? dst_indexer : src_indexer; - if (col->type() == arrow::int64()) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::uint64()) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::int32()) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::uint32()) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::utf8()) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto str = casted->GetView(j); - std::string_view str_view(str.data(), str.size()); - auto vid = indexer.get_index(Any::From(str_view)); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::large_utf8()) { - auto casted = std::static_pointer_cast(col); - for (auto j = 0; j < casted->length(); ++j) { - auto str = casted->GetView(j); - std::string_view str_view(str.data(), str.size()); - auto vid = indexer.get_index(Any::From(str_view)); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else { - LOG(FATAL) << "Not support type: " << col->type()->ToString(); - } - }; - - auto src_col_thread = std::thread([&]() { _append(false); }); - auto dst_col_thread = std::thread([&]() { _append(true); }); - src_col_thread.join(); - dst_col_thread.join(); - - // if EDATA_T is grape::EmptyType, no need to read columns - if constexpr (!std::is_same::value) { - CHECK(edata_cols.size() == 1); - auto edata_col = edata_cols[0]; - CHECK(src_col->length() == edata_col->length()); - size_t cur_ind = old_size; - auto type = edata_col->type(); - if (type != TypeConverter::ArrowTypeValue()) { - LOG(FATAL) << "Inconsistent data type, expect " - << TypeConverter::ArrowTypeValue()->ToString() - << ", but got " << type->ToString(); - } - - using arrow_array_type = - typename gs::TypeConverter::ArrowArrayType; - // cast chunk to EDATA_T array - auto data = std::static_pointer_cast(edata_col); - for (auto j = 0; j < edata_col->length(); ++j) { - if constexpr (std::is_same::value || - std::is_same::value) { - std::get<2>(parsed_edges[cur_ind++]) = data->GetString(j); - } else { - std::get<2>(parsed_edges[cur_ind++]) = data->Value(j); - } - } - VLOG(10) << "Finish inserting: " << src_col->length() << " edges"; - } -} - -template -static void append_edges( - std::shared_ptr src_col, - std::shared_ptr dst_col, - const LFIndexer& src_indexer, const LFIndexer& dst_indexer, - std::vector>& edata_cols, - std::vector>& parsed_edges, - std::vector& ie_degree, std::vector& oe_degree) { - CHECK(src_col->length() == dst_col->length()); - auto indexer_check_lambda = - [](const LFIndexer& cur_indexer, - const std::shared_ptr& cur_col) { - if (cur_indexer.get_type() == PropertyType::kInt64) { - CHECK(cur_col->type() == arrow::int64()); - } else if (cur_indexer.get_type() == PropertyType::kString) { - CHECK(cur_col->type() == arrow::utf8() || - cur_col->type() == arrow::large_utf8()); - } else if (cur_indexer.get_type() == PropertyType::kInt32) { - CHECK(cur_col->type() == arrow::int32()); - } else if (cur_indexer.get_type() == PropertyType::kUInt32) { - CHECK(cur_col->type() == arrow::uint32()); - } else if (cur_indexer.get_type() == PropertyType::kUInt64) { - CHECK(cur_col->type() == arrow::uint64()); - } - }; - indexer_check_lambda(src_indexer, src_col); - indexer_check_lambda(dst_indexer, dst_col); - auto old_size = parsed_edges.size(); - parsed_edges.resize(old_size + src_col->length()); - VLOG(10) << "resize parsed_edges from" << old_size << " to " - << parsed_edges.size(); - auto _append = [&](bool is_dst) { - size_t cur_ind = old_size; - const auto& col = is_dst ? dst_col : src_col; - const auto& indexer = is_dst ? dst_indexer : src_indexer; - for (auto i = 0; i < col->num_chunks(); ++i) { - auto chunk = col->chunk(i); - CHECK(chunk->type() == col->type()); - if (col->type() == arrow::int64()) { - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted_chunk->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::uint64()) { - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto any = Any::From(casted_chunk->Value(j)); - auto vid = indexer.get_index(any); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::int32()) { - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted_chunk->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::uint32()) { - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto vid = indexer.get_index(Any::From(casted_chunk->Value(j))); - if (is_dst) { - std::get<1>(parsed_edges[cur_ind++]) = vid; - } else { - std::get<0>(parsed_edges[cur_ind++]) = vid; - } - is_dst ? ie_degree[vid]++ : oe_degree[vid]++; - } - } else if (col->type() == arrow::utf8()) { - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto str = casted_chunk->GetView(j); + if constexpr (std::is_same_v) { + if (col->type() == arrow::utf8()) { + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto str = casted->GetView(j); std::string_view str_view(str.data(), str.size()); auto vid = indexer.get_index(Any::From(str_view)); if (is_dst) { @@ -562,11 +380,11 @@ static void append_edges( } is_dst ? ie_degree[vid]++ : oe_degree[vid]++; } - } else if (col->type() == arrow::large_utf8()) { - auto casted_chunk = - std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - auto str = casted_chunk->GetView(j); + } else { + // must be large utf8 + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto str = casted->GetView(j); std::string_view str_view(str.data(), str.size()); auto vid = indexer.get_index(Any::From(str_view)); if (is_dst) { @@ -576,13 +394,21 @@ static void append_edges( } is_dst ? ie_degree[vid]++ : oe_degree[vid]++; } - } else { - LOG(FATAL) << "Not support type: " << col->type()->ToString(); + } + } else { + using arrow_array_type = typename gs::TypeConverter::ArrowArrayType; + auto casted = std::static_pointer_cast(col); + for (auto j = 0; j < casted->length(); ++j) { + auto vid = indexer.get_index(Any::From(casted->Value(j))); + if (is_dst) { + std::get<1>(parsed_edges[cur_ind++]) = vid; + } else { + std::get<0>(parsed_edges[cur_ind++]) = vid; + } + is_dst ? ie_degree[vid]++ : oe_degree[vid]++; } } }; - auto src_col_thread = std::thread([&]() { _append(false); }); - auto dst_col_thread = std::thread([&]() { _append(true); }); // if EDATA_T is grape::EmptyType, no need to read columns auto edata_col_thread = std::thread([&]() { @@ -590,151 +416,36 @@ static void append_edges( CHECK(edata_cols.size() == 1); auto edata_col = edata_cols[0]; CHECK(src_col->length() == edata_col->length()); - // iterate and put data size_t cur_ind = old_size; auto type = edata_col->type(); + if (type != TypeConverter::ArrowTypeValue()) { + LOG(FATAL) << "Inconsistent data type, expect " + << TypeConverter::ArrowTypeValue()->ToString() + << ", but got " << type->ToString(); + } using arrow_array_type = typename gs::TypeConverter::ArrowArrayType; - if (type->Equals(arrow::timestamp(arrow::TimeUnit::MILLI))) { - for (auto i = 0; i < edata_col->num_chunks(); ++i) { - auto chunk = edata_col->chunk(i); - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - std::get<2>(parsed_edges[cur_ind++]) = casted_chunk->Value(j); - } - } - } else if (type->Equals(arrow::large_utf8()) || - type->Equals(arrow::utf8())) { - for (auto i = 0; i < edata_col->num_chunks(); ++i) { - auto chunk = edata_col->chunk(i); - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - std::get<2>(parsed_edges[cur_ind++]) = casted_chunk->GetView(j); - } - } - } else { - for (auto i = 0; i < edata_col->num_chunks(); ++i) { - auto chunk = edata_col->chunk(i); - auto casted_chunk = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_chunk->length(); ++j) { - std::get<2>(parsed_edges[cur_ind++]) = casted_chunk->Value(j); - } + // cast chunk to EDATA_T array + auto data = std::static_pointer_cast(edata_col); + for (auto j = 0; j < edata_col->length(); ++j) { + if constexpr (std::is_same::value || + std::is_same::value) { + std::get<2>(parsed_edges[cur_ind++]) = data->GetView(j); + } else { + std::get<2>(parsed_edges[cur_ind++]) = data->Value(j); } } + VLOG(10) << "Finish inserting: " << src_col->length() << " edges"; } }); + auto src_col_thread = std::thread([&]() { _append(false); }); + auto dst_col_thread = std::thread([&]() { _append(true); }); src_col_thread.join(); dst_col_thread.join(); edata_col_thread.join(); - VLOG(10) << "Finish inserting: " << src_col->length() << " edges"; -} - -// Create VertexTableReader -std::shared_ptr -CSVFragmentLoader::createVertexTableReader(label_t v_label, - const std::string& v_file) { - // Create options. - arrow::csv::ConvertOptions convert_options; - arrow::csv::ReadOptions read_options; - arrow::csv::ParseOptions parse_options; - fillVertexReaderMeta(read_options, parse_options, convert_options, v_file, - v_label); - - auto read_result = arrow::io::ReadableFile::Open(v_file); - if (!read_result.ok()) { - LOG(FATAL) << "Fail to open: " << v_file - << " error: " << read_result.status().message(); - } - std::shared_ptr file = read_result.ValueOrDie(); - auto res = - arrow::csv::TableReader::Make(arrow::io::IOContext(), file, read_options, - parse_options, convert_options); - if (!res.ok()) { - LOG(FATAL) << "Fail to create StreamingReader for file: " << v_file - << " error: " << res.status().message(); - } - return res.ValueOrDie(); -} - -std::shared_ptr -CSVFragmentLoader::createVertexStreamReader(label_t v_label, - const std::string& v_file) { - arrow::csv::ConvertOptions convert_options; - arrow::csv::ReadOptions read_options; - arrow::csv::ParseOptions parse_options; - fillVertexReaderMeta(read_options, parse_options, convert_options, v_file, - v_label); - - auto read_result = arrow::io::ReadableFile::Open(v_file); - if (!read_result.ok()) { - LOG(FATAL) << "Fail to open: " << v_file - << " error: " << read_result.status().message(); - } - std::shared_ptr file = read_result.ValueOrDie(); - auto res = arrow::csv::StreamingReader::Make(arrow::io::IOContext(), file, - read_options, parse_options, - convert_options); - if (!res.ok()) { - LOG(FATAL) << "Fail to create StreamingReader for file: " << v_file - << " error: " << res.status().message(); - } - return res.ValueOrDie(); -} - -std::shared_ptr -CSVFragmentLoader::createEdgeStreamReader(label_t src_label_id, - label_t dst_label_id, - label_t label_id, - const std::string& e_file) { - arrow::csv::ConvertOptions convert_options; - arrow::csv::ReadOptions read_options; - arrow::csv::ParseOptions parse_options; - - fillEdgeReaderMeta(read_options, parse_options, convert_options, e_file, - src_label_id, dst_label_id, label_id); - - auto read_result = arrow::io::ReadableFile::Open(e_file); - if (!read_result.ok()) { - LOG(FATAL) << "Fail to open: " << e_file - << " error: " << read_result.status().message(); - } - std::shared_ptr file = read_result.ValueOrDie(); - auto res = arrow::csv::StreamingReader::Make(arrow::io::IOContext(), file, - read_options, parse_options, - convert_options); - if (!res.ok()) { - LOG(FATAL) << "Fail to create StreamingReader for file: " << e_file - << " error: " << res.status().message(); - } - return res.ValueOrDie(); -} - -std::shared_ptr -CSVFragmentLoader::createEdgeTableReader(label_t src_label_id, - label_t dst_label_id, label_t label_id, - const std::string& e_file) { - arrow::csv::ConvertOptions convert_options; - arrow::csv::ReadOptions read_options; - arrow::csv::ParseOptions parse_options; - - fillEdgeReaderMeta(read_options, parse_options, convert_options, e_file, - src_label_id, dst_label_id, label_id); - - auto read_result = arrow::io::ReadableFile::Open(e_file); - if (!read_result.ok()) { - LOG(FATAL) << "Fail to open: " << e_file - << " error: " << read_result.status().message(); - } - std::shared_ptr file = read_result.ValueOrDie(); - auto res = - arrow::csv::TableReader::Make(arrow::io::IOContext(), file, read_options, - parse_options, convert_options); - if (!res.ok()) { - LOG(FATAL) << "Fail to create TableReader for file: " << e_file - << " error: " << res.status().message(); - } - return res.ValueOrDie(); } template @@ -805,12 +516,7 @@ void CSVFragmentLoader::addVertexBatch( vids.reserve(row_num); _add_vertex()(primary_key_col, indexer, vids); - t += grape::GetCurrentTime(); - for (double tmp = convert_to_internal_vertex_time_; - !convert_to_internal_vertex_time_.compare_exchange_weak(tmp, tmp + t);) { - } - t = -grape::GetCurrentTime(); for (auto j = 0; j < property_cols.size(); ++j) { auto array = property_cols[j]; auto chunked_array = std::make_shared(array); @@ -819,193 +525,83 @@ void CSVFragmentLoader::addVertexBatch( chunked_array, vids); } - t += grape::GetCurrentTime(); - for (double tmp = basic_frag_loader_vertex_time_; - !basic_frag_loader_vertex_time_.compare_exchange_weak(tmp, tmp + t);) {} - VLOG(10) << "Insert rows: " << row_num; } -template -struct _add_vertex_chunk { - void operator()(const std::shared_ptr& col, - IdIndexer& indexer, std::vector& vids) { - size_t row_num = col->length(); - vid_t vid; - - if constexpr (!std::is_same::value) { - auto expected_type = gs::TypeConverter::ArrowTypeValue(); - using arrow_array_type = - typename gs::TypeConverter::ArrowArrayType; - if (col->type() != expected_type) { - LOG(FATAL) << "Inconsistent data type, expect " - << expected_type->ToString() << ", but got " - << col->type()->ToString(); +// Iterate over all record batches read from file. +void ForEachRecordBatch( + const std::string& path, const arrow::csv::ConvertOptions& convert_options, + const arrow::csv::ReadOptions& read_options, + const arrow::csv::ParseOptions& parse_options, + std::function, bool)> func, + bool stream) { + auto read_result = arrow::io::ReadableFile::Open(path); + if (!read_result.ok()) { + LOG(FATAL) << "Failed to open file: " << path + << " error: " << read_result.status().message(); + } + std::shared_ptr file = read_result.ValueOrDie(); + bool first_batch = true; + if (stream) { + auto res = arrow::csv::StreamingReader::Make( + arrow::io::default_io_context(), file, read_options, parse_options, + convert_options); + if (!res.ok()) { + LOG(FATAL) << "Failed to create streaming reader for file: " << path + << " error: " << res.status().message(); + } + auto reader = res.ValueOrDie(); + while (true) { + std::shared_ptr batch; + auto status = reader->ReadNext(&batch); + if (!status.ok()) { + LOG(FATAL) << "Failed to read batch from file: " << path + << " error: " << status.message(); } - for (auto i = 0; i < col->num_chunks(); ++i) { - auto chunk = col->chunk(i); - auto casted_array = std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_array->length(); ++j) { - if (!indexer.add(casted_array->Value(j), vid)) { - LOG(FATAL) << "Duplicate vertex id: " << casted_array->Value(j) - << " .. "; - } - vids.emplace_back(vid); - } + if (batch == nullptr) { + break; } - } else { - if (col->type() == arrow::utf8()) { - for (auto i = 0; i < col->num_chunks(); ++i) { - auto chunk = col->chunk(i); - auto casted_array = - std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_array->length(); ++j) { - auto str = casted_array->GetView(j); - std::string_view str_view(str.data(), str.size()); - if (!indexer.add(str_view, vid)) { - LOG(FATAL) << "Duplicate vertex id: " << str_view << " .. "; - } - vids.emplace_back(vid); - } - } - } else if (col->type() == arrow::large_utf8()) { - for (auto i = 0; i < col->num_chunks(); ++i) { - auto chunk = col->chunk(i); - auto casted_array = - std::static_pointer_cast(chunk); - for (auto j = 0; j < casted_array->length(); ++j) { - auto str = casted_array->GetView(j); - std::string_view str_view(str.data(), str.size()); - if (!indexer.add(str_view, vid)) { - LOG(FATAL) << "Duplicate vertex id: " << str_view << " .. "; - } - vids.emplace_back(vid); - } - } - } else { - LOG(FATAL) << "Not support type: " << col->type()->ToString(); + + func(batch, first_batch); + if (first_batch) { + first_batch = false; } } - } -}; - -template -void CSVFragmentLoader::addVertexBatch( - label_t v_label_id, IdIndexer& indexer, - std::shared_ptr& primary_key_col, - const std::vector>& property_cols) { - size_t row_num = primary_key_col->length(); - std::vector vids; - vids.reserve(row_num); - // check row num - auto col_num = property_cols.size(); - for (size_t i = 0; i < col_num; ++i) { - CHECK_EQ(property_cols[i]->length(), row_num); - } - - double t = -grape::GetCurrentTime(); - _add_vertex_chunk()(primary_key_col, indexer, vids); - - t += grape::GetCurrentTime(); - for (double tmp = convert_to_internal_vertex_time_; - !convert_to_internal_vertex_time_.compare_exchange_weak(tmp, tmp + t);) { - } - - t = -grape::GetCurrentTime(); - for (auto i = 0; i < property_cols.size(); ++i) { - auto array = property_cols[i]; - auto& table = basic_fragment_loader_.GetVertexTable(v_label_id); - auto& col_ptrs = table.column_ptrs(); - set_vertex_properties(col_ptrs[i], array, vids); - } - t += grape::GetCurrentTime(); - for (double tmp = basic_frag_loader_vertex_time_; - !basic_frag_loader_vertex_time_.compare_exchange_weak(tmp, tmp + t);) {} - - VLOG(10) << "Insert rows: " << row_num; -} + } else { + auto res = arrow::csv::TableReader::Make(arrow::io::default_io_context(), + file, read_options, parse_options, + convert_options); -template -void CSVFragmentLoader::addVerticesImplWithTableReader( - const std::string& v_file, label_t v_label_id, - IdIndexer& indexer) { - auto vertex_column_mappings = - loading_config_.GetVertexColumnMappings(v_label_id); - auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; - size_t primary_key_ind = std::get<2>(primary_key); - auto reader = createVertexTableReader(v_label_id, v_file); - std::shared_ptr table; - double t = -grape::GetCurrentTime(); - auto result = reader->Read(); - t += grape::GetCurrentTime(); - for (double tmp = read_vertex_table_time_; - !read_vertex_table_time_.compare_exchange_weak(tmp, tmp + t);) {} - - auto status = result.status(); - if (!status.ok()) { - LOG(FATAL) << "Failed to read next batch from file " << v_file - << status.message(); - } - table = result.ValueOrDie(); - if (table == nullptr) { - LOG(FATAL) << "Empty file: " << v_file; - } - auto header = table->schema()->field_names(); - auto schema_column_names = schema_.get_vertex_property_names(v_label_id); - CHECK(schema_column_names.size() + 1 == header.size()); - VLOG(10) << "Find header of size: " << header.size(); - - auto columns = table->columns(); - CHECK(primary_key_ind < columns.size()); - auto primary_key_column = columns[primary_key_ind]; - auto other_columns_array = columns; - other_columns_array.erase(other_columns_array.begin() + primary_key_ind); - VLOG(10) << "Reading record batch of size: " << table->num_rows(); - addVertexBatch(v_label_id, indexer, primary_key_column, other_columns_array); -} + if (!res.ok()) { + LOG(FATAL) << "Failed to create table reader for file: " << path + << " error: " << res.status().message(); + } + auto reader = res.ValueOrDie(); -template -void CSVFragmentLoader::addVerticesImplWithStreamReader( - const std::string& v_file, label_t v_label_id, - IdIndexer& indexer) { - auto vertex_column_mappings = - loading_config_.GetVertexColumnMappings(v_label_id); - auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; - auto primary_key_name = std::get<1>(primary_key); - size_t primary_key_ind = std::get<2>(primary_key); - auto reader = createVertexStreamReader(v_label_id, v_file); - std::shared_ptr record_batch; - bool first_batch = true; - while (true) { - double t = -grape::GetCurrentTime(); - auto status = reader->ReadNext(&record_batch); - t += grape::GetCurrentTime(); - for (double tmp = read_vertex_table_time_; - !read_vertex_table_time_.compare_exchange_weak(tmp, tmp + t);) {} + auto result = reader->Read(); + auto status = result.status(); if (!status.ok()) { - LOG(FATAL) << "Failed to read next batch from file " << v_file - << status.message(); - } - if (record_batch == nullptr) { - break; - } - if (first_batch) { - // get header - auto header = record_batch->schema()->field_names(); - auto schema_column_names = schema_.get_vertex_property_names(v_label_id); - CHECK(schema_column_names.size() + 1 == header.size()); - VLOG(10) << "Find header of size: " << header.size(); - first_batch = false; + LOG(FATAL) << "Failed to read table from file: " << path + << " error: " << status.message(); + } + std::shared_ptr table = result.ValueOrDie(); + + arrow::TableBatchReader batch_reader(*table); + while (true) { + std::shared_ptr batch; + auto status = batch_reader.ReadNext(&batch); + if (!status.ok()) { + LOG(FATAL) << "Failed to read batch from file: " << path + << " error: " << status.message(); + } + if (batch == nullptr) { + break; + } + func(batch, first_batch); + if (first_batch) { + first_batch = false; + } } - - auto columns = record_batch->columns(); - CHECK(primary_key_ind < columns.size()); - auto primary_key_column = columns[primary_key_ind]; - auto other_columns_array = columns; - other_columns_array.erase(other_columns_array.begin() + primary_key_ind); - VLOG(10) << "Reading record batch of size: " << record_batch->num_rows(); - addVertexBatch(v_label_id, indexer, primary_key_column, - other_columns_array); } } @@ -1018,11 +614,36 @@ void CSVFragmentLoader::addVerticesImpl(label_t v_label_id, << v_label_name; for (auto& v_file : v_files) { - if (loading_config_.GetIsBatchReader()) { - addVerticesImplWithStreamReader(v_file, v_label_id, indexer); - } else { - addVerticesImplWithTableReader(v_file, v_label_id, indexer); - } + arrow::csv::ConvertOptions convert_options; + arrow::csv::ReadOptions read_options; + arrow::csv::ParseOptions parse_options; + fillVertexReaderMeta(read_options, parse_options, convert_options, v_file, + v_label_id); + auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; + auto primary_key_name = std::get<1>(primary_key); + size_t primary_key_ind = std::get<2>(primary_key); + ForEachRecordBatch( + v_file, convert_options, read_options, parse_options, + [&](std::shared_ptr batch, bool first_batch) { + if (first_batch) { + auto header = batch->schema()->field_names(); + auto schema_column_names = + schema_.get_vertex_property_names(v_label_id); + CHECK(schema_column_names.size() + 1 == header.size()) + << "File header of size: " << header.size() + << " does not match schema column size: " + << schema_column_names.size() + 1; + } + auto columns = batch->columns(); + CHECK(primary_key_ind < columns.size()); + auto primary_key_column = columns[primary_key_ind]; + auto other_columns_array = columns; + other_columns_array.erase(other_columns_array.begin() + + primary_key_ind); + addVertexBatch(v_label_id, indexer, primary_key_column, + other_columns_array); + }, + loading_config_.GetIsBatchReader()); } VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label " @@ -1089,153 +710,6 @@ void CSVFragmentLoader::addVertices(label_t v_label_id, VLOG(10) << "Finish init vertices for label " << v_label_name; } -template -void CSVFragmentLoader::addEdgesImplWithTableReader( - const std::string& filename, label_t src_label_id, label_t dst_label_id, - label_t e_label_id, std::vector& ie_degree, - std::vector& oe_degree, - std::vector>& parsed_edges) { - const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id); - const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id); - auto reader = - createEdgeTableReader(src_label_id, dst_label_id, e_label_id, filename); - std::shared_ptr table; - double t = -grape::GetCurrentTime(); - auto result = reader->Read(); - t += grape::GetCurrentTime(); - for (double tmp = read_edge_table_time_; - !read_edge_table_time_.compare_exchange_weak(tmp, tmp + t);) {} - - auto status = result.status(); - if (!status.ok()) { - LOG(FATAL) << "Failed to read Table from file " << filename - << status.message(); - } - table = result.ValueOrDie(); - if (table == nullptr) { - LOG(FATAL) << "Empty file: " << filename; - } - auto header = table->schema()->field_names(); - auto schema_column_names = - schema_.get_edge_property_names(src_label_id, dst_label_id, e_label_id); - auto schema_column_types = - schema_.get_edge_properties(src_label_id, dst_label_id, e_label_id); - CHECK(schema_column_names.size() + 2 == header.size()); - CHECK(schema_column_types.size() + 2 == header.size()); - VLOG(10) << "Find header of size: " << header.size(); - - auto columns = table->columns(); - CHECK(columns.size() >= 2); - auto src_col = columns[0]; - auto dst_col = columns[1]; - auto src_col_type = src_col->type(); - auto dst_col_type = dst_col->type(); - CHECK(src_col_type == arrow::int64() || src_col_type == arrow::uint64() || - src_col_type == arrow::int32() || src_col_type == arrow::uint32() || - src_col_type == arrow::utf8() || src_col_type == arrow::large_utf8()) - << "unsupported src_col type: " << src_col_type->ToString(); - CHECK(dst_col_type == arrow::int64() || dst_col_type == arrow::uint64() || - dst_col_type == arrow::int32() || dst_col_type == arrow::uint32() || - dst_col_type == arrow::utf8() || dst_col_type == arrow::large_utf8()) - << "unsupported dst_col type: " << dst_col_type->ToString(); - - std::vector> property_cols; - for (auto i = 2; i < columns.size(); ++i) { - property_cols.emplace_back(columns[i]); - } - CHECK(property_cols.size() <= 1) - << "Currently only support at most one property on edge"; - { - CHECK(src_col->length() == dst_col->length()); - t = -grape::GetCurrentTime(); - append_edges(src_col, dst_col, src_indexer, dst_indexer, property_cols, - parsed_edges, ie_degree, oe_degree); - t += grape::GetCurrentTime(); - for (double tmp = convert_to_internal_edge_time_; - !convert_to_internal_edge_time_.compare_exchange_weak(tmp, tmp + t);) { - } - } -} - -template -void CSVFragmentLoader::addEdgesImplWithStreamReader( - const std::string& filename, label_t src_label_id, label_t dst_label_id, - label_t e_label_id, std::vector& ie_degree, - std::vector& oe_degree, - std::vector>& parsed_edges) { - const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id); - const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id); - auto reader = - createEdgeStreamReader(src_label_id, dst_label_id, e_label_id, filename); - std::shared_ptr record_batch; - // read first batch - bool first_batch = true; - while (true) { - double t = -grape::GetCurrentTime(); - auto status = reader->ReadNext(&record_batch); - t += grape::GetCurrentTime(); - for (double tmp = read_edge_table_time_; - !read_edge_table_time_.compare_exchange_weak(tmp, tmp + t);) {} - if (!status.ok()) { - LOG(FATAL) << "Failed to read next batch from file " << filename - << status.message(); - } - if (record_batch == nullptr) { - break; - } - if (first_batch) { - auto header = record_batch->schema()->field_names(); - auto schema_column_names = schema_.get_edge_property_names( - src_label_id, dst_label_id, e_label_id); - auto schema_column_types = - schema_.get_edge_properties(src_label_id, dst_label_id, e_label_id); - CHECK(schema_column_names.size() + 2 == header.size()) - << "schema size: " << schema_column_names.size() - << " header size: " << header.size(); - CHECK(schema_column_types.size() + 2 == header.size()) - << "schema size: " << schema_column_types.size() - << " header size: " << header.size(); - VLOG(10) << "Find header of size: " << header.size(); - first_batch = false; - } - - // copy the table to csr. - auto columns = record_batch->columns(); - // We assume the src_col and dst_col will always be put at front. - CHECK(columns.size() >= 2); - auto src_col = columns[0]; - auto dst_col = columns[1]; - auto src_col_type = src_col->type(); - auto dst_col_type = dst_col->type(); - CHECK(src_col_type == arrow::int64() || src_col_type == arrow::uint64() || - src_col_type == arrow::int32() || src_col_type == arrow::uint32() || - src_col_type == arrow::utf8() || src_col_type == arrow::large_utf8()) - << "unsupported src_col type: " << src_col_type->ToString(); - CHECK(dst_col_type == arrow::int64() || dst_col_type == arrow::uint64() || - dst_col_type == arrow::int32() || dst_col_type == arrow::uint32() || - dst_col_type == arrow::utf8() || dst_col_type == arrow::large_utf8()) - << "unsupported dst_col type: " << dst_col_type->ToString(); - - std::vector> property_cols; - for (auto i = 2; i < columns.size(); ++i) { - property_cols.emplace_back(columns[i]); - } - CHECK(property_cols.size() <= 1) - << "Currently only support at most one property on edge"; - { - // add edges to vector - CHECK(src_col->length() == dst_col->length()); - t = -grape::GetCurrentTime(); - append_edges(src_col, dst_col, src_indexer, dst_indexer, property_cols, - parsed_edges, ie_degree, oe_degree); - t += grape::GetCurrentTime(); - for (double tmp = convert_to_internal_edge_time_; - !convert_to_internal_edge_time_.compare_exchange_weak(tmp, tmp + t); - tmp = convert_to_internal_edge_time_) {} - } - } -} - template void CSVFragmentLoader::addEdgesImpl(label_t src_label_id, label_t dst_label_id, label_t e_label_id, @@ -1266,27 +740,78 @@ void CSVFragmentLoader::addEdgesImpl(label_t src_label_id, label_t dst_label_id, << " dst indexer size: " << dst_indexer.size(); for (auto filename : e_files) { - VLOG(10) << "processing " << filename << " with src_col_id " << src_col_ind - << " and dst_col_id " << dst_col_ind; - if (loading_config_.GetIsBatchReader()) { - VLOG(1) << "Using batch reader"; - addEdgesImplWithStreamReader(filename, src_label_id, dst_label_id, - e_label_id, ie_degree, oe_degree, - parsed_edges); - } else { - VLOG(1) << "Using table reader"; - addEdgesImplWithTableReader(filename, src_label_id, dst_label_id, - e_label_id, ie_degree, oe_degree, - parsed_edges); - } + arrow::csv::ConvertOptions convert_options; + arrow::csv::ReadOptions read_options; + arrow::csv::ParseOptions parse_options; + fillEdgeReaderMeta(read_options, parse_options, convert_options, filename, + src_label_id, dst_label_id, e_label_id); + ForEachRecordBatch( + filename, convert_options, read_options, parse_options, + [&](std::shared_ptr batch, bool first_batch) { + if (first_batch) { + auto header = batch->schema()->field_names(); + auto schema_column_names = schema_.get_edge_property_names( + src_label_id, dst_label_id, e_label_id); + auto schema_column_types = schema_.get_edge_properties( + src_label_id, dst_label_id, e_label_id); + CHECK(schema_column_names.size() + 2 == header.size()) + << "schema size: " << schema_column_names.size() + << " neq header size: " << header.size(); + } + // copy the table to csr. + auto columns = batch->columns(); + // We assume the src_col and dst_col will always be put at front. + CHECK(columns.size() >= 2); + auto src_col = columns[0]; + auto dst_col = columns[1]; + auto src_col_type = src_col->type(); + auto dst_col_type = dst_col->type(); + CHECK(check_primary_key_type(src_col_type)) + << "unsupported src_col type: " << src_col_type->ToString(); + CHECK(check_primary_key_type(dst_col_type)) + << "unsupported dst_col type: " << dst_col_type->ToString(); + CHECK(src_col_type == dst_col_type) + << "src_col type: " << src_col_type->ToString() + << " neq dst_col type: " << dst_col_type->ToString(); + + std::vector> property_cols; + for (auto i = 2; i < columns.size(); ++i) { + property_cols.emplace_back(columns[i]); + } + CHECK(property_cols.size() <= 1) + << "Currently only support at most one property on edge"; + + // add edges to vector + CHECK(src_col->length() == dst_col->length()); + if (src_col_type->Equals(arrow::int64())) { + append_edges(src_col, dst_col, src_indexer, + dst_indexer, property_cols, + parsed_edges, ie_degree, oe_degree); + } else if (src_col_type->Equals(arrow::uint64())) { + append_edges(src_col, dst_col, src_indexer, + dst_indexer, property_cols, + parsed_edges, ie_degree, oe_degree); + } else if (src_col_type->Equals(arrow::int32())) { + append_edges(src_col, dst_col, src_indexer, + dst_indexer, property_cols, + parsed_edges, ie_degree, oe_degree); + } else if (src_col_type->Equals(arrow::uint32())) { + append_edges(src_col, dst_col, src_indexer, + dst_indexer, property_cols, + parsed_edges, ie_degree, oe_degree); + } else { + // must be string + append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + parsed_edges, ie_degree, oe_degree); + } + }, + loading_config_.GetIsBatchReader()); } - double t = -grape::GetCurrentTime(); + basic_fragment_loader_.PutEdges(src_label_id, dst_label_id, e_label_id, parsed_edges, ie_degree, oe_degree); - t += grape::GetCurrentTime(); - // basic_frag_loader_edge_time_.fetch_add(t); - for (double tmp = basic_frag_loader_edge_time_; - !basic_frag_loader_edge_time_.compare_exchange_weak(tmp, tmp + t);) {} + VLOG(10) << "Finish putting: " << parsed_edges.size() << " edges"; } diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h index 842fd929822e..15c5b9c16268 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.h @@ -69,15 +69,6 @@ class CSVFragmentLoader : public IFragmentLoader { void addVerticesImpl(label_t v_label_id, const std::string& v_label_name, const std::vector v_file, IdIndexer& indexer); - template - void addVerticesImplWithStreamReader(const std::string& filename, - label_t v_label_id, - IdIndexer& indexer); - - template - void addVerticesImplWithTableReader(const std::string& filename, - label_t v_label_id, - IdIndexer& indexer); template void addVertexBatch( @@ -99,34 +90,6 @@ class CSVFragmentLoader : public IFragmentLoader { label_t e_label_id, const std::vector& e_files); - template - void addEdgesImplWithStreamReader( - const std::string& file_name, label_t src_label_id, label_t dst_label_id, - label_t e_label_id, std::vector& ie_degree, - std::vector& oe_degree, - std::vector>& edges); - - template - void addEdgesImplWithTableReader( - const std::string& filename, label_t src_label_id, label_t dst_label_id, - label_t e_label_id, std::vector& ie_degree, - std::vector& oe_degree, - std::vector>& edges); - - std::shared_ptr createVertexStreamReader( - label_t v_label, const std::string& v_file); - - std::shared_ptr createVertexTableReader( - label_t v_label, const std::string& v_file); - - std::shared_ptr createEdgeStreamReader( - label_t src_label_id, label_t dst_label_id, label_t e_label, - const std::string& e_file); - - std::shared_ptr createEdgeTableReader( - label_t src_label_id, label_t dst_label_id, label_t e_label, - const std::string& e_file); - void fillEdgeReaderMeta(arrow::csv::ReadOptions& read_options, arrow::csv::ParseOptions& parse_options, arrow::csv::ConvertOptions& convert_options,