## Avro
---

In [2]:
:cache 1000
:dep apache-avro
:dep serde = { features = ["derive"] }

cache: 1000 MiB


In [3]:
use apache_avro::{from_value, to_value, Reader, Schema, Writer};
use apache_avro::types::Value;
use serde::{Serialize, Deserialize};
use std::collections::BTreeMap;

---
### Key Value Records

In [4]:
let kv_schema_raw = r#"
    {
        "type": "record",
        "name": "kv",
        "fields": [
            {"name": "key", "type": "string"},
            {"name": "val", "type": "string"}
        ]
    }
"#;

In [5]:
let kv_schema = Schema::parse_str(kv_schema_raw).unwrap();
kv_schema

Record(RecordSchema { name: Name { name: "kv", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "key", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "val", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"key": 0, "val": 1}, attributes: {} })

In [6]:
#[derive(Debug, Deserialize, Serialize)]
struct KV {
    key: String,
    val: String,
}

In [7]:
let mut writer = Writer::new(&kv_schema, Vec::new());
for indx in 1..=3 {
    writer.append_ser(KV {key: format!("k{indx}"), val: format!("v{indx}")}).unwrap();
}
let avro_dat = writer.into_inner().unwrap();
avro_dat

[79, 98, 106, 1, 4, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99, 8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 204, 1, 123, 34, 116, 121, 112, 101, 34, 58, 34, 114, 101, 99, 111, 114, 100, 34, 44, 34, 110, 97, 109, 101, 34, 58, 34, 107, 118, 34, 44, 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34, 107, 101, 121, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44, 123, 34, 110, 97, 109, 101, 34, 58, 34, 118, 97, 108, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 93, 125, 0, 19, 206, 161, 228, 222, 51, 25, 116, 175, 143, 78, 166, 175, 95, 180, 79, 6, 36, 4, 107, 49, 4, 118, 49, 4, 107, 50, 4, 118, 50, 4, 107, 51, 4, 118, 51, 19, 206, 161, 228, 222, 51, 25, 116, 175, 143, 78, 166, 175, 95, 180, 79]

In [8]:
let reader = Reader::new(&avro_dat[..]).unwrap();
let mut dat = Vec::<KV>::new();
for kv in reader {
    println!("{:?}", from_value::<KV>(&kv.unwrap()).unwrap());
};

KV { key: "k1", val: "v1" }
KV { key: "k2", val: "v2" }
KV { key: "k3", val: "v3" }


---
### Map Record

In [9]:
let map_schema_raw = r#"
    {
        "type": "map",
        "values": "string"
    }
"#;

In [10]:
let map_schema = Schema::parse_str(map_schema_raw).unwrap();
map_schema

Map(String)

In [11]:
let mut src = BTreeMap::<String, String>::new();
src.insert("k1".into(), "v1".into());
src.insert("k2".into(), "v2".into());
src.insert("k3".into(), "v3".into());
src

{"k1": "v1", "k2": "v2", "k3": "v3"}

In [12]:
let mut writer = Writer::new(&map_schema, Vec::new());
writer.append_ser(src.clone()).unwrap();
let avro_dat = writer.into_inner().unwrap();
avro_dat

[79, 98, 106, 1, 4, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99, 8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 64, 123, 34, 116, 121, 112, 101, 34, 58, 34, 109, 97, 112, 34, 44, 34, 118, 97, 108, 117, 101, 115, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 0, 249, 161, 63, 78, 20, 182, 193, 192, 178, 119, 157, 90, 1, 246, 24, 97, 2, 40, 6, 4, 107, 49, 4, 118, 49, 4, 107, 50, 4, 118, 50, 4, 107, 51, 4, 118, 51, 0, 249, 161, 63, 78, 20, 182, 193, 192, 178, 119, 157, 90, 1, 246, 24, 97]

In [13]:
let reader = Reader::new(&avro_dat[..]).unwrap();
let map_dat = from_value::<BTreeMap<String, String>>(&reader.last().unwrap().unwrap()).unwrap();
map_dat

{"k1": "v1", "k2": "v2", "k3": "v3"}

---
### Mixed Records

In [14]:
let db_schema_raw = r#"
    {
        "type": "record",
        "name": "db",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "version", "type": "string"},
            {"name": "kv", "type": {"type": "map", "values": "string"}}
        ]
    }
"#;

In [15]:
let db_schema = Schema::parse_str(db_schema_raw).unwrap();
db_schema

Record(RecordSchema { name: Name { name: "db", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "version", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "kv", doc: None, aliases: None, default: None, schema: Map(String), order: Ascending, position: 2, custom_attributes: {} }], lookup: {"kv": 2, "name": 0, "version": 1}, attributes: {} })

In [16]:
#[derive(Debug, Default, Deserialize, Serialize)]
struct DB {
    name: String,
    version: String,
    kv: BTreeMap<String, String>,
}

In [17]:
let mut writer = Writer::new(&db_schema, Vec::new());
writer.append(to_value(DB {name: "test".into(), version: "1".into(), kv: src.clone()}).unwrap()).unwrap();
let avro_dat = writer.into_inner().unwrap();
avro_dat

[79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 194, 2, 123, 34, 116, 121, 112, 101, 34, 58, 34, 114, 101, 99, 111, 114, 100, 34, 44, 34, 110, 97, 109, 101, 34, 58, 34, 100, 98, 34, 44, 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34, 110, 97, 109, 101, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44, 123, 34, 110, 97, 109, 101, 34, 58, 34, 118, 101, 114, 115, 105, 111, 110, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44, 123, 34, 110, 97, 109, 101, 34, 58, 34, 107, 118, 34, 44, 34, 116, 121, 112, 101, 34, 58, 123, 34, 116, 121, 112, 101, 34, 58, 34, 109, 97, 112, 34, 44, 34, 118, 97, 108, 117, 101, 115, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 125, 93, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99, 8, 110, 117, 108, 108, 0, 127, 221, 167, 118, 67, 168, 232, 17, 242, 155, 63, 167, 64, 138, 201, 160, 2, 54, 8, 116, 101, 115, 116, 

In [18]:
let reader = Reader::new(&avro_dat[..]).unwrap();
let db_dat = from_value::<DB>(&reader.last().unwrap().unwrap()).unwrap();
db_dat

DB { name: "test", version: "1", kv: {"k1": "v1", "k2": "v2", "k3": "v3"} }

---
### Encrypted Records

In [19]:
let enc_schema_raw = r#"
    {
        "type": "record",
        "name": "enc",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "version", "type": "string"},
            {"name": "enc", "type": "bytes"}
        ]
    }
"#;

In [20]:
let enc_schema = Schema::parse_str(enc_schema_raw).unwrap();
enc_schema

Record(RecordSchema { name: Name { name: "enc", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "version", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "enc", doc: None, aliases: None, default: None, schema: Bytes, order: Ascending, position: 2, custom_attributes: {} }], lookup: {"enc": 2, "name": 0, "version": 1}, attributes: {} })

In [21]:
#[derive(Debug, Default, Deserialize, Serialize)]
struct Enc {
    name: String,
    version: String,
    enc: Vec<u8>,
}

In [22]:
let mut writer = Writer::new(&enc_schema, Vec::new());
let rec = Value::Record(vec![("name".into(), "test".into()), ("version".into(), "1".into()), ("enc".into(), vec![1u8, 2, 3].into())]);
writer.append(rec).unwrap();
let avro_dat = writer.into_inner().unwrap();
avro_dat

[79, 98, 106, 1, 4, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101, 99, 8, 110, 117, 108, 108, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 148, 2, 123, 34, 116, 121, 112, 101, 34, 58, 34, 114, 101, 99, 111, 114, 100, 34, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 110, 99, 34, 44, 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34, 110, 97, 109, 101, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44, 123, 34, 110, 97, 109, 101, 34, 58, 34, 118, 101, 114, 115, 105, 111, 110, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 44, 123, 34, 110, 97, 109, 101, 34, 58, 34, 101, 110, 99, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 98, 121, 116, 101, 115, 34, 125, 93, 125, 0, 3, 64, 176, 144, 186, 163, 222, 4, 142, 67, 247, 255, 132, 36, 1, 182, 2, 22, 8, 116, 101, 115, 116, 2, 49, 6, 1, 2, 3, 3, 64, 176, 144, 186, 163, 222, 4, 142, 67, 247, 255, 132, 36, 1, 182]

In [31]:
let rec = Reader::new(&avro_dat[..]).unwrap().last().unwrap().unwrap();
rec

Record([("name", String("test")), ("version", String("1")), ("enc", Bytes([1, 2, 3]))])

In [32]:
from_value::<Enc>(&rec)

Err(Failed to deserialize Avro value into value: Expected an Array or Union, but got: Bytes([1, 2, 3]))

In [34]:
let mut unwrapped = vec![];
if let Value::Record(x) = rec {
    unwrapped = x;
}
unwrapped

[("name", String("test")), ("version", String("1")), ("enc", Bytes([1, 2, 3]))]

In [38]:
let name: String = unwrapped[0].1.into();

Error: the trait bound `std::string::String: From<Value>` is not satisfied