# Data Encoding, Decoding and Flow

> ### Updated (27 March 2025):
> This notebook has been split into 4 parts for ease of reference:
>
> 1. Apache Thrift - [`demo_thrift.ipynb`](notebooks/demo_thrift.ipynb)
> 2. Protocol Buffers (Protobufs) - [`demo_protobuf.ipynb`](notebooks/demo_protobuf.ipynb)
> 3. Apache Avro - [`demo_avro.ipynb`](notebooks/demo_avro.ipynb)
> 4. Apache Parquet, ORC and Arrow - [`demo_parquet_orc_arrow.ipynb`](notebooks/demo_parquet_orc_arrow.ipynb)
>


## Apache Thrift

The thrift type system includes base types like _bool, byte, double, string and integer_ but also special types like _binary_ and _struct_ (like classes) and also containers (_list, set, map_) that correspond to commonly available interfaces in most programming languages.

Base types:

- bool: A boolean value (true or false)
- byte: An 8-bit signed integer
- i16: A 16-bit signed integer
- i32: A 32-bit signed integer
- i64: A 64-bit signed integer
- double: A 64-bit floating point number
- string: A text string encoded using UTF-8 encoding

Thrift type definitions are defined in `.thrift` files. The Thrift compiler generates code in various languages from the `.thrift` files.

### Encoding

Let's use the following example record (JSON or dictionary-like) to encode:

```json
{
  "userName": "Martin",
  "favoriteNumber": 1337,
  "interests": ["daydreaming", "hacking"]
}
```

We can encode the record in Thrift using the following schema in the `.thrift` file:

```thrift
struct Person {
  1: required string userName,
  2: optional i64 favoriteNumber,
  3: optional list<string> interests
}
```

Thrift comes with a code generation tool that takes a schema definition like the ones shown here, and produces classes that implement the schema in various programming languages. Our code can call this generated code to encode or decode records of the schema.

The data encoded with this schema looks like this:
![thrift_binary_protocol](../assets/thrift_binary_protocol.png)

Each field has a type annotation (to indicate whether it is a string, integer, list, etc.) and, where required, a length indication (length of a string, number of items in a list). The strings that appear in the data (“Martin”, “daydreaming”, “hacking”) are encoded as UTF-8.

There are no field names (userName, favoriteNumber, interests). Instead, the encoded data contains _field tags_, which are numbers (1, 2, and 3). Those are the numbers that appear in the schema definition. Field tags are like aliases for fields—they are a compact way of saying what field we’re talking about, without having to spell out the field name.

Next, let's add a service. A service is a collection of method interfaces that can be called remotely. A service is defined in a `.thrift` file like this:

```thrift
service School {
    Person teachCourse(1: required Person person, 2: required string course)
}
```

The first line declares a service called `School`. The second line declares a method called `teachCourses`, which takes two arguments: a `Person` record and a `string`. The method returns a `Person` record.

### RPC

Nows, let's look at how to use the generated code to make remote procedure calls. We will write codes for 2 sides of the server-client application- the client initiates an RPC call and waits for a response from the server. The server executes the requested operation and returns a response to the client.

Here, we use `%%writefile` magic command to write the code to a file instead of running it in the cell.

In [None]:
%%writefile ../schema/person.thrift

struct Person {
  1: required string userName,
  2: optional i64 favoriteNumber,
  3: optional list<string> interests
}

service School {
    Person teachCourse(1: required Person person, 2: required string course)
}

In [None]:
%%writefile ../person_thrift_server.py
import thriftpy2
person_thrift = thriftpy2.load("./schema/person.thrift", module_name="person_thrift")

from thriftpy2.rpc import make_server

class School(object):
    def teachCourse(self, person, course):
        person.interests.append(course)
        return person

server = make_server(person_thrift.School, School(), client_timeout=None)
server.serve()

Then, run `python person_thrift_server.py` in a new terminal. This will start the server.

In [None]:
import thriftpy2
person_thrift = thriftpy2.load("../schema/person.thrift", module_name="person_thrift")

from thriftpy2.rpc import make_client

school = make_client(person_thrift.School, timeout=None)

In [None]:
martin = person_thrift.Person(
    userName="Martin", favoriteNumber=1337, interests=["daydreaming", "hacking"]
)

In [None]:
martin.interests

In [None]:
martin = school.teachCourse(martin, "coding")

In [None]:
martin.interests

> 1. Add a new field `grade` (0-100) with an appropriate type annotation to the `Person` struct. Then, add a new method `assignGrade` to the `School` service that takes a `Person` record and a `grade` arguments, assigns the `grade` to the `Person` and returns the `Person`. Then call the method by passing `martin` and a grade number, and print his grade.
>
> 2. Add a method `teachCourses` to School to add a list of courses instead of just one course. Then pass `martin` and a list of course-- `["cooking", "sewing"]` to the method, and print his new interests.

## Protocol Buffers (Protobuf)

Protobuf types include:

- double: double precision floating point number
- float: single precision floating point number
- int32: signed integer, uses variable-length encoding
- int64: signed integer, uses variable-length encoding
- uint32: unsigned integer, uses variable-length encoding
- uint64: unsigned integer, uses variable-length encoding
- sint32: signed integer, uses variable-length encoding, more efficient than int32
- sint64: signed integer, uses variable-length encoding, more efficient than int64
- fixed32: unsigned integer, always 4 bytes
- fixed64: unsigned integer, always 8 bytes
- sfixed32: signed integer, always 4 bytes
- sfixed64: signed integer, always 8 bytes
- bool: boolean value
- string: UTF-8 text string
- bytes: sequence of bytes
- enum: enumerated type
- message: nested message type
- map: map type
- Any: dynamic type

Protobuf schema definitions are defined in `.proto` files. The Protobuf compiler generates code in various languages from the `.proto` files.

### Encoding

We can encode the previous example record in Protobuf using the following schema in the `.proto` file:

```protobuf
message Person {
  required string user_name = 1;
  optional int64 favorite_number = 2;
  repeated string interests = 3;
}
```

The data encoded with this schema looks like this:
![protobuf](../assets/protobuf.png)

Protocol Buffers have an interesting aspect regarding its datatype handling. Unlike having a specific list or array datatype, it utilizes a `repeated` marker for fields, which serves as a third option alongside `required` and `optional`.

As depicted in the figure, a repeated field is simply represented by the same field tag appearing multiple times in the record. The advantage of this approach is that converting an optional (single-valued) field into a repeated (multi-valued) field is permissible. When new code reads old data, it interprets it as a list with either zero or one element, depending on whether the field was present. On the other hand, old code reading new data only perceives the last element of the list.

### gRPC

Nows, let's look at how to use the generated code to make remote procedure calls. We will use gRPC, which is a high-performance RPC framework built on top of Protobuf. gRPC is a client-server application where the client initiates an RPC call and waits for a response from the server. The server executes the requested operation and returns a response to the client.

In [None]:
%%writefile ../schema/person.proto
syntax = "proto3";

message Person {
  string user_name = 1;
  optional int64 favorite_number = 2;
  repeated string interests = 3;
}

message CourseRequest {
  Person person = 1;
  string course = 2;
}


service School {
  rpc teachCourse(CourseRequest) returns (Person) {}
}

Then, run the following command in a terminal to generate the Python code:

```bash
python -m grpc_tools.protoc -I./schema --python_out=. --grpc_python_out=. ./schema/person.proto
```

This will generate the following files:

```bash
person_pb2.py
person_pb2_grpc.py
```

In [None]:
%%writefile ../person_protobuf_server.py
from concurrent import futures
import grpc
import person_pb2_grpc


class School(person_pb2_grpc.SchoolServicer):

  def teachCourse(self, request, context):
    request.person.interests.append(request.course)
    return request.person

server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
person_pb2_grpc.add_SchoolServicer_to_server(
    School(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()

Then, run `python person_protobuf_server.py` in a new terminal. This will start the server.

In [None]:
import sys
sys.path.append('..')
import grpc
import person_pb2
import person_pb2_grpc


In [None]:
def teach_course(stub, person, course):
    person = stub.teachCourse(person_pb2.CourseRequest(person=person, course=course))
    return person


with grpc.insecure_channel('localhost:50051') as channel:
    martin = person_pb2.Person(user_name='Martin', favorite_number=1337, interests=["daydreaming", "hacking"])
    course = "coding"
    stub = person_pb2_grpc.SchoolStub(channel)
    martin = teach_course(stub, martin, course)
    print(martin.interests)

> Add a new field `grade` (0-100) with an appropriate type annotation to the `Person` message. Then, add a new method `assignGrade` to the `School` service that takes a `GradeRequest` message (which consists of a `Person` record and a `grade` arguments), assigns the `grade` to the `Person` and returns the `Person`. Then call the method by passing "Martin" `person` and a grade number, and print his grade.

**Schema evolution** is a key concept in **data serialization and storage formats** like **Avro, Parquet, ORC, Protobuf, and Thrift**. 

---

# **1. What is schema evolution?**

**Schema evolution** is the ability of a data system to **handle changes to the schema** of your data **without breaking existing data or applications**.

* Schema = the structure of your data (fields, types, names).
* Evolution = **adding/removing/modifying fields over time**.

In other words:

> You can change your data format and still read old data, or write new data while old readers still work.

---

# **2. Why schema evolution matters**

In real-world systems:

* Data pipelines are long-lived.
* Requirements change over time.
* You may add new columns, remove old ones, or rename them.

Without schema evolution, **changing a schema would break all old data** or require **rewriting everything**.

---

# **3. How different systems support it**

### **Avro (very strong support)**

* Every Avro file contains its **writer schema**.
* When you read it, you can provide a **reader schema**.
* Supports:

| Change type    | Allowed?       | How it works                         |
| -------------- | -------------- | ------------------------------------ |
| Add a field    | ✅              | Provide a default value for old data |
| Remove a field | ✅              | Old readers ignore extra fields      |
| Rename a field | ✅ (with alias) | Use `"aliases": ["old_name"]`        |
| Change type    | ⚠              | Only compatible types (int → long)   |

Example:

```json
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["null", "int"], "default": null},
    {"name": "email", "type": "string", "default": ""}
  ]
}
```

* Adding `email` later is safe if a default is provided.

---

### **Protobuf**

* Supports **adding new fields** and **ignoring unknown fields**.
* Each field has a **unique tag number**.
* Rules:

1. Add new fields → OK
2. Remove fields → OK, but don’t reuse tag numbers
3. Change types → only if backward compatible

---

### **Thrift**

* Similar to Protobuf.
* Uses **field IDs** for compatibility.
* Optional vs required fields are critical.

---

### **Parquet / ORC**

* Schema evolution is supported **column-wise**.
* Adding new columns → OK
* Removing columns → OK
* Type changes → sometimes tricky; need compatible types

---

# **4. Real-world analogy**

Imagine a database table:

| name | age |

Later you want to add `email` and remove `age`:

* **With schema evolution:**

  * Old rows without `email` → default value
  * Applications reading old files → still work

* **Without schema evolution:**

  * Old data is broken or unreadable

---

# **5. Key points**

* Schema evolution is **mostly backward and forward compatible**.
* Essential for **streaming systems** (Kafka + Avro), **data lakes** (Parquet, ORC).
* Works best when **default values and optional fields** are used.
* Helps **avoid rewriting or migrating massive datasets** when the schema changes.

---

# **6. Visual summary**

```
Old schema:        name | age
New schema:        name | age | email
Backward compatible: old readers still read name & age, ignore email
Forward compatible: new readers can provide default for missing email in old data
```

---


## Schema Evolution for Thrift and Protobuf

Let's explore how Thrift and Protocol Buffers manage schema changes to ensure both _backward_ and _forward compatibility_.

From the examples above, an encoded record comprises concatenated encoded fields. Each field is identified by a tag number (e.g., 1, 2, 3 in the sample schemas) and annotated with a datatype (such as string or integer). If a field value is not set, it is simply omitted from the encoded record. It's important to note that field tags play a crucial role in determining the meaning of the encoded data. Although you can modify a field's name in the schema without issues, changing a field's tag would render all existing encoded data invalid.

Adding new fields to the schema is permissible, but it requires assigning each field a new tag number. Old code, which is unaware of the new tag numbers, can still read data written by new code, and if it encounters an unrecognized tag number, it can safely ignore that field. The datatype annotation allows the parser to determine the necessary bytes to skip, thus maintaining _forward compatibility_.

Regarding _backward compatibility_, as long as each field retains a unique tag number, new code can read old data successfully because the tag numbers retain their original meaning. However, when adding a new field, it cannot be made required since old code wouldn't have written this new field, causing compatibility issues. Hence, to ensure backward compatibility, **any fields added after the initial schema deployment must be optional or have default values**.

Removing a field follows a similar principle to adding one, but with backward and forward compatibility concerns reversed. Only optional fields can be removed, never required fields, and once a tag number is removed, it can't be reused to prevent conflicts with existing data written with the old tag number.

Changing the datatype of a field is possible, but it carries the risk of losing precision or truncating values. For instance, converting a 32-bit integer into a 64-bit integer may lead to data loss when old code reads data written by new code, as the old code uses a 32-bit variable to hold the value. If the 64-bit value exceeds the capacity of a 32-bit variable, it will be truncated. Thus, careful consideration is necessary when altering field datatypes, and consulting the documentation is advisable to understand potential implications.

## Apache Avro

Avro has the following types:

- null: no value
- boolean: a binary value
- int: 32-bit signed integer
- long: 64-bit signed integer
- float: single precision (32-bit) IEEE 754 floating-point number
- double: double precision (64-bit) IEEE 754 floating-point number
- bytes: sequence of 8-bit unsigned bytes
- string: Unicode character sequence
- record: ordered collection of named fields
- enum: enumeration of string values
- array: ordered collection of values
- map: collection of key-value pairs
- union: ordered list of values

It has two schema languages: one (`Avro IDL`) intended for human editing, and one (based on JSON) that is more easily machine-readable.

### Encoding

We can encode the previous example record in IDL using the following schema in the `.avsc` file:

```avro
record Person {
  string userName;
  union { null, long } favoriteNumber = null;
  array<string> interests;
}
```

The equivalent JSON representation of that schema is as follows:

```json
{
  "type": "record",
  "name": "Person",
  "fields": [
    { "name": "userName", "type": "string" },
    { "name": "favoriteNumber", "type": ["null", "long"], "default": null },
    { "name": "interests", "type": { "type": "array", "items": "string" } }
  ]
}
```

The data encoded with this schema looks like this:
![avro](../assets/avro.png)

First and foremost, it's important to note that the schema lacks tag numbers. When we encode our sample record using this schema, the resulting Avro binary encoding is impressively compact, spanning just _32 bytes_—the most space-efficient among all the encodings we've observed.

Examining the byte sequence, one can readily discern the _absence of field identifiers or datatype markers_. The encoding solely comprises concatenated values. For instance, a string is represented by a length prefix followed by UTF-8 bytes, but there are no explicit indicators within the encoded data to specify that it is, indeed, a string. In fact, it could be interpreted as an integer or any other data type altogether. Similarly, an integer is encoded using a variable-length encoding.

To correctly parse the binary data, you must traverse the fields in the order they appear in the schema and _refer to the schema_ itself to ascertain the datatype of each field. Consequently, the binary data can only be accurately decoded if the code reading the data employs the exact same schema as the code that wrote the data. Any deviation or mismatch in the schema between the reader and the writer would result in incorrectly decoded data.

With Avro, data encoding and decoding are based on two schemas: the `writer's schema` used during data encoding and the `reader's schema` employed during data decoding. These schemas do not necessarily have to be identical but should be compatible. When decoding data, the Avro library compares the writer's and reader's schemas, resolving any discrepancies between them.

The Avro specification ensures that fields in different orders between the writer's and reader's schemas pose no issues during resolution since schema matching occurs based on field names. If the reader's schema lacks a field present in the writer's schema, it is simply ignored. Conversely, if the reader's schema expects a field that the writer's schema does not contain, the missing field is filled in with a default value declared in the reader's schema. This allows for flexible schema evolution while maintaining data compatibility.

### Reading (Decoding) a File

Instead of demonstrating RPC, let's look at how to decode data from a file from a real-world dataset. We have a genomic variation data of 1000 samples from the [OpenCGA](http://docs.opencb.org/display/opencga/Welcome+to+OpenCGA) project.

In [None]:
import fastavro
import copy
import json
from pprint import pprint

In [None]:
with open('../data/1k.variants.avro', 'rb') as f:
    reader = fastavro.reader(f)
    genomic_var_1k = [sample for sample in reader]
    metadata = copy.deepcopy(reader.metadata)
    writer_schema = copy.deepcopy(reader.writer_schema)
    schema_from_file = json.loads(metadata['avro.schema'])

In [None]:
len(genomic_var_1k)

In [None]:
pprint(writer_schema)

In [None]:
pprint(schema_from_file)

In [None]:
pprint(genomic_var_1k[0])

In [None]:
for f in schema_from_file["fields"]:
    print(f["name"])

## Apache Parquet, ORC and Arrow

We can easily read (decode) and write (encode) data from and to Parquet, ORC and Arrow files interchangeably. The `pyarrow` library allows us to read a Parquet or ORC file into a `pyarrow.Table` object, which is a columnar data structure that can be converted to a Pandas DataFrame. We can also write a `pyarrow.Table` to a Parquet or ORC file.

Parquet has the following types:

- boolean: 1 bit boolean
- int32: 32 bit signed ints
- int64: 64 bit signed ints
- int96: 96 bit signed ints
- float: IEEE 32-bit floating point values
- double: IEEE 64-bit floating point values
- byte_array: arbitrarily long byte arrays
- fixed_len_byte_array: fixed length byte arrays
- string: UTF-8 encoded strings
- enum: enumeration of strings
- temporal: a logical date type

ORC has the following types:

- boolean: 1 bit boolean
- tinyint: 8 bit signed ints
- smallint: 16 bit signed ints
- int: 32 bit signed ints
- bigint: 64 bit signed ints
- float: IEEE 32-bit floating point values
- double: IEEE 64-bit floating point values
- string: UTF-8 encoded strings
- char: ASCII strings
- varchar: UTF-8 strings
- binary: byte arrays
- timestamp: a logical date type
- date: a logical date type
- decimal: arbitrary precision decimals
- list: an ordered collection of objects
- map: a collection of key-value pairs
- struct: an ordered collection of named fields
- union: a list of types

### Reading (Decoding) and Writing (Encoding) a Parquet File

Let's look at how to decode and encode a Parquet file with mock customers data.

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
table = pq.read_table('../data/userdata1.parquet')

In [None]:
table

In [None]:
table.schema

In [None]:
metadata = pq.read_metadata('../data/userdata1.parquet')

metadata

In [None]:
metadata.schema

In [None]:
metadata.row_group(0).column(10)

Select the first 3 rows of the table:

In [None]:
table.take([0,1,2])

Convert a Table to a DataFrame:

In [None]:
df = table.to_pandas()

In [None]:
df

You can convert the DataFrame back to a Table (note we're using the method from `pa` which is pyarrow):

In [None]:
new_table = pa.Table.from_pandas(df)

new_table

You can write the table back to a Parquet file:

In [None]:
pq.write_table(new_table, "../data/userdata2.parquet")

> 1. How many males and females are there?
>
> 2. What is the average salary for customers from China?
>
> 3. Create a new column `full_name` which combines `first_name` and `last_name` with a space in between in the dataframe. Then convert it back to a new Table and write it to a Parquet file.

### Reading (Decoding) and Writing (Encoding) an ORC File

Let's look at how to decode and encode an ORC file with mock customers data.

In [None]:
import pyarrow as pa
from pyarrow import orc

In [None]:
table2 = orc.read_table('../data/userdata1.1.orc')

In [None]:
table2

The column names are missing in the ORC file, so we need to specify them manually, we can use the column names from the previous Table.

In [None]:
table2 = table2.rename_columns(table.column_names)

In [None]:
table2

In [None]:
df2 = table2.to_pandas()

df2

You can write the table back to an ORC file:

In [None]:
orc.write_table(table2, "../data/userdata2.orc")

> 1. How many males and females are there from China?
>
> 2. Create a new column `age` which is computed from the birthdate in the dataframe. Then convert it back to a new Table and write it to an ORC file.