Skip to content

Commit

Permalink
Take parameters for example
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Apr 29, 2022
2 parents 3f19e6e + cc1022f commit e625503
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions cpp/examples/arrow/rapidjson_row_converter.cc
Expand Up @@ -133,6 +133,7 @@ class RowBatchBuilder {
std::shared_ptr<arrow::Array> values = array.values();
RowBatchBuilder child_builder(values->length());
std::shared_ptr<arrow::Field> value_field = array.list_type()->value_field();
std::string value_field_name = value_field->name();
child_builder.SetField(value_field);
ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*values.get(), &child_builder));

Expand All @@ -142,12 +143,14 @@ class RowBatchBuilder {

int64_t values_i = 0;
for (int64_t i = 0; i < array.length(); ++i) {
auto array_len = array.value_length(i);

rapidjson::Value value;
value.SetArray();
value.Reserve(array.value_length(i), allocator);

for (int64_t j = 0; j < array.value_length(i); ++j) {
value.PushBack(rows[values_i][value_field->name().c_str()], allocator);
for (int64_t j = 0; j < array_len; ++j) {
value.PushBack(rows[values_i][value_field_name.c_str()], allocator);
++values_i;
}

Expand Down Expand Up @@ -488,6 +491,10 @@ class DocumentToArrowConverter : public arrow::FromRowConverter<rapidjson::Docum
}; // DocumentToArrowConverter

int main(int argc, char** argv) {
// Get sizes
int32_t num_rows = argc > 1 ? std::atoi(argv[1]) : 100;
int32_t batch_size = argc > 2 ? std::atoi(argv[2]) : 100;

//(Doc section: Convert to Arrow)
// Write JSON records
std::vector<std::string> json_records = {
Expand All @@ -496,11 +503,24 @@ int main(int argc, char** argv) {
R"({"pk": 3, "date_created": "2020-10-05", "data": {"deleted": false, "metrics": [{"key": "x", "value": 33}, {"key": "x", "value": 42}]}})"};

std::vector<rapidjson::Document> records;
for (const std::string& json : json_records) {
records.reserve(num_rows);
for (int32_t i = 0; i < num_rows; ++i) {
rapidjson::Document document;
document.Parse(json.c_str());
document.Parse(json_records[i % json_records.size()].c_str());
records.push_back(std::move(document));
}
// for (const std::string& json : json_records) {
// rapidjson::Document document;
// document.Parse(json.c_str());
// records.push_back(std::move(document));
// }
for (const rapidjson::Document& doc : records) {
rapidjson::StringBuffer sb;
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
// At batch_size >= 495, we segfault here:
doc.Accept(writer);
std::cout << sb.GetString() << std::endl;
}
auto tags_schema = arrow::list(arrow::struct_({
arrow::field("key", arrow::utf8()),
arrow::field("value", arrow::int64()),
Expand Down Expand Up @@ -532,13 +552,15 @@ int main(int argc, char** argv) {

// Convert table into document (row) iterator
arrow::Iterator<rapidjson::Document> document_iter =
to_doc_converter.ConvertToIterator(table, 100);
to_doc_converter.ConvertToIterator(table, batch_size);

// Print each row
for (arrow::Result<rapidjson::Document> doc_result : document_iter) {
rapidjson::Document doc = std::move(doc_result).ValueOrDie();
rapidjson::StringBuffer sb;
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
// At batch_size between 495 and 506, 750
// Thse work however: 800, 900, 1000
doc.Accept(writer);
std::cout << sb.GetString() << std::endl;
}
Expand Down

0 comments on commit e625503

Please sign in to comment.