Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how read/write REPEATED #1886

Closed
liyongjing opened this issue Jun 16, 2022 · 1 comment
Closed

how read/write REPEATED #1886

liyongjing opened this issue Jun 16, 2022 · 1 comment
Labels
parquet Changes to the parquet crate question Further information is requested

Comments

@liyongjing
Copy link

liyongjing commented Jun 16, 2022

Which part is this question about

use std::{fs::File, path::Path, sync::Arc};

use parquet::{
    basic::Compression,
    data_type::{ByteArray, ByteArrayType, Int32Type},
    file::{
        properties::{WriterProperties, WriterVersion},
        reader::FileReader,
        serialized_reader::SerializedFileReader,
        writer::SerializedFileWriter,
    },
    record::{Row, RowAccessor},
    schema::parser::parse_message_type,
};

const MESSAGE_TYPE: &'static str = "
message Log {
  OPTIONAL INT32 eventType;
  REPEATED BYTE_ARRAY category;
}
";

pub struct Item {
    pub event_type: i32,
    pub category: Vec<String>,
}

pub struct Batch {
    pub event_types: Vec<i32>,
    pub categories: Vec<ByteArray>,
}

fn data() -> Batch {
    let items = vec![
        Item {
            event_type: 1,
            category: vec!["test11".to_string(), "test12".to_string()],
        },
        Item {
            event_type: 2,
            category: vec!["test21".to_string(), "test22".to_string()],
        },
    ];
    let mut b = Batch {
        event_types: vec![],
        categories: vec![],
    };

    for item in &items {
        b.event_types.push(item.event_type);
        for cate in &item.category {
            b.categories.push(ByteArray::from(cate.as_str()));
        }
    }
    b
}

fn write() {
    let path = Path::new("sample.parquet");
    let file = File::create(&path).unwrap();
    let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());

    let props = Arc::new(
        WriterProperties::builder()
            .set_compression(Compression::SNAPPY)
            .set_writer_version(WriterVersion::PARQUET_2_0)
            .build(),
    );

    let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
    let mut row_group_writer = writer.next_row_group().unwrap();

    let batch = data();
    // column 0
    let mut col_writer = row_group_writer
        .next_column()
        .expect("next column")
        .unwrap();
    col_writer
        .typed::<Int32Type>()
        .write_batch(&batch.event_types, None, None)
        .expect("writing column");
    col_writer.close().expect("close column");
    //  question1 column 1 how write REPEATED?
    let mut col_writer = row_group_writer
        .next_column()
        .expect("next column")
        .unwrap();
    col_writer
        .typed::<ByteArrayType>()
        .write_batch(&batch.categories, None, None)
        .expect("writing column");
    col_writer.close().expect("close column");

    let rg_md = row_group_writer.close().expect("close row group");
    println!("total rows written: {}", rg_md.num_rows());

    writer.close().unwrap();
}

fn read() {
    let path = Path::new("sample.parquet");
    let file = File::open(path).expect("Unable to open file");
    let reader = SerializedFileReader::new(file).expect("Unable to read file");

    let iter = reader.get_row_iter(None).expect("get iterator");
    for record in iter {
        let event_type = record.get_int(0).unwrap();
        read_category(&record, 1);
        println!("event_type{}", event_type);
    }
}

// public static List<String> getCategory(Group value) {
//     List<String> categoryList = new ArrayList<>();
//     try {
//       int count = value.getFieldRepetitionCount("category");
//       if (count > 0) {
//         int index = 0;
//         while (index < count) {
//           categoryList.add(value.getString("category", index++).trim());
//         }
//       }
//     } catch (Exception e) {
//     }
//     return categoryList;
//   }
fn read_category(record: &Row, i: usize) {
    // question2 where is getFieldRepetitionCount, how to read REPEATED?
    match record.get_bytes(i) {
        Ok(v) => println!("{:?}", v.as_utf8()),
        Err(_) => {}
    };
}

Describe your question
how read/write category using parquet
question1 column 1 how write REPEATED?
question2 where is getFieldRepetitionCount, how to read REPEATED?

Additional context
Add any other context about the problem here.

@liyongjing liyongjing added the question Further information is requested label Jun 16, 2022
@tustvold
Copy link
Contributor

tustvold commented Jun 18, 2022

Hi, I'm not very familiar with parquet-mr which your example appears to be based on, nor am I hugely knowledgeable about the record APIs for reading parquet, but I'll try to help out here 😅

Perusing the docs it would appear you can use https://docs.rs/parquet/latest/parquet/file/reader/trait.FileReader.html#tymethod.get_row_iter to get a row iterator, and then call https://docs.rs/parquet/latest/parquet/record/trait.RowAccessor.html#tymethod.get_list on the row to read a repeated field.

FWIW I would strongly encourage you to consider trying out the arrow interface, it should be faster, better tested and better documented than the record APIs which are somewhat orphaned at the moment...

@alamb alamb added the parquet Changes to the parquet crate label Jun 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants