# IPC: Streaming & File Formats

> **Level:** Intermediate  
> **Spec:** [IPC Streaming Format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format) · [IPC File Format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)  
> **PyArrow docs:** [Streaming, serialization and IPC](https://arrow.apache.org/docs/python/ipc.html)

## What you will learn

1. IPC message wire format: continuation marker, metadata length, metadata, body
2. End-of-stream (EOS) marker bytes
3. Streaming format vs file format (`ARROW1` magic, footer)
4. Random access in file format with `get_batch(i)`
5. Compression with lz4 and zstd

In [1]:
import io
import pyarrow as pa
import pyarrow.ipc as ipc
import numpy as np

---
## 1. Streaming Format Wire Layout

> **Spec:** [IPC streaming format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format)

Each message follows this layout:
```
[ 0xFFFFFFFF (4 bytes) ]  ← continuation marker
[ metadata_size (int32) ] ← size of Flatbuffers metadata
[ metadata bytes ]        ← Flatbuffers Message
[ padding to 8-byte alignment ]
[ body bytes ]            ← raw buffer data
```
End-of-stream: `0xFFFFFFFF 0x00000000`

In [2]:
schema = pa.schema([pa.field('x', pa.int32()), pa.field('y', pa.utf8())])
batches = [
    pa.record_batch({'x': [1,2,3], 'y': ['a','b','c']}, schema=schema),
    pa.record_batch({'x': [4,5],   'y': ['d','e']},     schema=schema),
    pa.record_batch({'x': [6],     'y': ['f']},         schema=schema),
]

# Write to an in-memory stream
buf = io.BytesIO()
writer = ipc.new_stream(buf, schema)
for b in batches:
    writer.write_batch(b)
writer.close()
raw = buf.getvalue()

print(f'Total bytes: {len(raw)}')
print(f'First 4 bytes (continuation marker): {raw[:4].hex()}  (expect ffffffff)')
# Find EOS marker
eos_idx = raw.rfind(b'\xff\xff\xff\xff\x00\x00\x00\x00')
print(f'EOS marker at byte offset: {eos_idx}')
print(f'EOS bytes: {raw[eos_idx:eos_idx+8].hex()}')

Total bytes: 904
First 4 bytes (continuation marker): ffffffff  (expect ffffffff)
EOS marker at byte offset: 896
EOS bytes: ffffffff00000000


In [3]:
# Read back
reader = ipc.open_stream(io.BytesIO(raw))
print('Schema:', reader.schema)
for i, batch in enumerate(reader):
    print(f'Batch {i}: {batch.to_pydict()}')

Schema: x: int32
y: string
Batch 0: {'x': [1, 2, 3], 'y': ['a', 'b', 'c']}
Batch 1: {'x': [4, 5], 'y': ['d', 'e']}
Batch 2: {'x': [6], 'y': ['f']}


---
## 2. File Format: Magic Bytes & Footer

> **Spec:** [IPC file format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)

The file format wraps the streaming format with:
- Magic bytes `ARROW1` at start and end
- A **footer** (Flatbuffers) at the end recording batch offsets for random access

```
[ b'ARROW1\0\0' ]   ← 8-byte magic
[ streaming messages ... ]
[ footer bytes ]
[ footer length (int32) ]
[ b'ARROW1\0\0' ]   ← 8-byte magic again
```

In [4]:
file_buf = io.BytesIO()
fwriter = ipc.new_file(file_buf, schema)
for b in batches:
    fwriter.write_batch(b)
fwriter.close()
fraw = file_buf.getvalue()

print(f'File size       : {len(fraw)} bytes')
print(f'Magic (start)   : {fraw[:6]}') 
print(f'Magic (end -6)  : {fraw[-6:]}')

# Random access
freader = ipc.open_file(io.BytesIO(fraw))
print(f'\nNum record batches: {freader.num_record_batches}')
print(f'Batch[2] directly  : {freader.get_batch(2).to_pydict()}')

File size       : 1178 bytes
Magic (start)   : b'ARROW1'
Magic (end -6)  : b'ARROW1'

Num record batches: 3
Batch[2] directly  : {'x': [6], 'y': ['f']}


---
## 3. Compression

> **PyArrow docs:** [IPC write options](https://arrow.apache.org/docs/python/generated/pyarrow.ipc.IpcWriteOptions.html)

Both `lz4_frame` and `zstd` are supported as body compression codecs.  
The codec is recorded in each message's metadata, consumers auto-detect.

In [None]:
big_schema = pa.schema([pa.field('v', pa.string())])
categories = ['Paris','Montpellier','Toulouse','Angers','Rennes','Nantes','Nice']
rng = np.random.default_rng(0)
data = [categories[i] for i in rng.integers(0,7, 1_000_000)]
big_batch  = pa.record_batch({'v': data}, schema=big_schema)

def ipc_file_size(batch, codec=None):
    buf = io.BytesIO()
    opts = ipc.IpcWriteOptions(compression=codec) if codec else ipc.IpcWriteOptions()
    w = ipc.new_file(buf, batch.schema, options=opts)
    w.write_batch(batch)
    w.close()
    return len(buf.getvalue())

s_plain = ipc_file_size(big_batch)
s_lz4   = ipc_file_size(big_batch, 'lz4')
s_zstd  = ipc_file_size(big_batch, 'zstd')

print(f'Plain : {s_plain:>10,} bytes')
print(f'lz4   : {s_lz4:>10,} bytes  ({s_plain/s_lz4:.2f}x smaller)')
print(f'zstd  : {s_zstd:>10,} bytes  ({s_plain/s_zstd:.2f}x smaller)')

Plain :    800,442 bytes
lz4   :    800,530 bytes  (1.00x smaller)
zstd  :    767,258 bytes  (1.04x smaller)


---
##  IPC vs C Data Interface

| Mechanism | When to use | Copy? | Cross-process? |
|-----------|-------------|-------|---------------|
| **IPC streaming** | Persist to disk, send over network | Yes (serialise) | Yes |
| **IPC file** | Random-access on disk | Yes | Yes |
| **C Data Interface** | Same-process, different libraries | **No** | No |

> The C Data Interface is covered in depth in notebooks 07.

---
## Summary

| Format | Use case | Random access | Magic |
|--------|----------|--------------|-------|
| Streaming | Sockets, pipes, incremental | No | None |
| File | Disk, object stores | Yes (`get_batch(i)`) | `ARROW1` |

- Messages: `0xFFFFFFFF` continuation + int32 metadata length + Flatbuffers + body
- EOS: `0xFFFFFFFF 0x00000000`
- Compression: lz4 or zstd per message, recorded in metadata

**Next ⏭️** [Extension types](06_extension_types.ipynb): custom types with metadata