# Market Data Compression: from json to delta events

This notebook demonstrates how bulky market data snapshots of open orders in the Eve Online universe can be handled by applying multiple steps of compression.

1. **Stage 1: JSON → Parquet** Columnar storage with Snappy compression
2. **Stage 2: Event Sourcing**: Storing deltas instead of full snapshots


## Problem description

Eve Online is a massive multiplayer game taking place in space. It has a unique in-game economy in which almost every item like ships, weapons, fuel etc is produced by players. At any given time approximately 30'000 players are online.

The goal is to obeserve and analysis the in-game trading market. For this eve has public REST endpoints which reveal some information.

By observing the open market orders (buy and sell side) we can analyze the volume traded and the payed prices, data which is not public, yet very useful for market trading success.

The open order refresh every 5 minutes. The most active trading region is "The Forge" with approx. 400k open orders at all times. This demonstration focuses on this particular region.

A raw json snapshot is 100MB. at 288 snapshots per day the data volume accumulating can't be handled realistically. 
Even after converting json to parquet, one snapshot is still 8MB and produces a lot of data over time if we want to keep the order book history available for analysis.

## Setup

Import required libraries and initialize our data pipeline components.

In [1]:
import json
from pathlib import Path
from datetime import datetime
import polars as pl

# Import our utilities
from src.event_extractor.event_detector_polars import PolarsEventDetector

print("Imports successful")

Imports successful


---

## Stage 1: JSON → Parquet Compression

### Data Preparation

**Before proceeding**, run the data fetching script to prepare the demonstration files:

```bash
python src/utils/fetch_and_convert.py
```

This script will:
1. Fetch ~400k market orders from the EVE Online ESI API (~60 seconds)
2. Save the raw data as JSON
3. Convert to Parquet format with Snappy compression
4. Store both files in `./data/demo/`
---

### Why Parquet over JSON?

**Parquet** is a columnar storage format optimized for analytics that provides massive compression advantages:

#### Columnar Layout
- Stores data **by column** instead of row-by-row
- All values for one field are stored together (e.g., all `order_id` values in sequence)
- Perfect for analytics where you typically query specific columns, not entire rows

#### Built-in Compression Layers

**1. Dictionary Encoding:**
- Replaces repeated values with small integer references
- Example: If the region_id appears 400k times, store it once + 400k tiny references
- Extremely effective for fields with limited unique values (type IDs, locations, buy/sell flags)

**2. Snappy Compression:**
- Fast, general-purpose compression algorithm by google, applied after encoding
- Prioritizes speed over maximum compression (still achieves 2-4× reduction)
- Unlike gzip/zstd, optimized for decompression speed in analytics workloads

**3. Type Efficiency:**
- Binary encoding vs. text representation
- Example: The number `123456789` takes:
  - **9 bytes** in JSON (9 ASCII characters)
  - **4 bytes** in Parquet (32-bit integer)
- Timestamps, floats, booleans all stored in compact binary form

### Load the generated files

In [2]:
# Load the expected demo files
json_path = Path("./data/demo/market_orders.json")
parquet_path = Path("./data/demo/market_orders.parquet")

# Check if files exist
if not json_path.exists() or not parquet_path.exists():
    raise FileNotFoundError(
        "Demo data files not found in ./data/demo/\n"
        "Please run: python src/utils/fetch_and_convert.py"
    )

print(f"Loading demonstration files:")
print(f"   JSON:    {json_path.name}")
print(f"   Parquet: {parquet_path.name}")

# Load JSON to get order count
with open(json_path, 'r') as f:
    orders = json.load(f)

print(f"\nLoaded {len(orders):,} market orders")
print(f"\nSample order structure:")
print(json.dumps(orders[0], indent=2))

Loading demonstration files:
   JSON:    market_orders.json
   Parquet: market_orders.parquet

Loaded 402,300 market orders

Sample order structure:
{
  "duration": 90,
  "is_buy_order": false,
  "issued": "2025-11-15T20:13:29Z",
  "location_id": 60003760,
  "min_volume": 1,
  "order_id": 7186939904,
  "price": 232700.0,
  "range": "region",
  "system_id": 30000142,
  "type_id": 570,
  "volume_remain": 3,
  "volume_total": 5
}


### Analyze JSON file size

In [3]:
# Measure JSON file size
json_size_bytes = json_path.stat().st_size
json_size_mb = json_size_bytes / (1024 * 1024)

print(f"JSON file size: {json_size_mb:.2f} MB ({json_size_bytes:,} bytes)")
print(f"Average bytes per order: {json_size_bytes / len(orders):.1f} bytes")

JSON file size: 120.93 MB (126,801,130 bytes)
Average bytes per order: 315.2 bytes


### Analyze Parquet file size and compression ratio

In [4]:
# Measure Parquet file size
parquet_size_bytes = parquet_path.stat().st_size
parquet_size_mb = parquet_size_bytes / (1024 * 1024)

# Calculate compression ratio
compression_ratio = json_size_bytes / parquet_size_bytes

print(f"Compression Results (Stage 1: JSON → Parquet)")
print(f"{'='*60}")
print(f"Raw JSON:        {json_size_mb:8.2f} MB")
print(f"Parquet:         {parquet_size_mb:8.2f} MB")
print(f"Compression:     {compression_ratio:8.1f}× reduction")
print(f"Avg per order:   {parquet_size_bytes / len(orders):8.1f} bytes (Parquet)")
print(f"                 vs {json_size_bytes / len(orders):.1f} bytes (JSON)")
print(f"{'='*60}")


Compression Results (Stage 1: JSON → Parquet)
Raw JSON:          120.93 MB
Parquet:             9.52 MB
Compression:         12.7× reduction
Avg per order:       24.8 bytes (Parquet)
                 vs 315.2 bytes (JSON)


## Result

By converting the results to parquet we could retain a couple of days of snapshot data (2.7GB) on the server. However space will quickly run-out on the local server and either require payed S3 storage somewhere in the cloud. Or we would have to restrict the history we keep.

Further compression is required.

---

## Stage 2: Event Extraction

Instead of storing full snapshots every 5 minutes (2.7 GB/day), we extract **events** (changes) between snapshots.

With this approach we can reconstruct the order book at any point in time. The events themselves hold valuable market dynamics insights as well.

The possible events are:

```python
TRADE = "trade"                    # Volume was traded (partial fill: volume reduced)
ORDER_OPENED = "order_opened"      # New order appeared
ORDER_CLOSED = "order_closed"      # Order with volume=0 disappeared (fully filled)
ORDER_CANCELLED = "order_cancelled" # Order manually cancelled (disappeared, not scheduled to expire)
ORDER_EXPIRED = "order_expired"    # Order expired naturally (scheduled expiration time reached)
PRICE_CHANGED = "price_changed"    # Order price was modified
```

The trade events will give insights both on the buy and sell side for the executed prices, the volume moved etc.
The price change events will reveal the trader competition as the traders compete to provide the best price.


In order to reconstruct the data based on events, we need a baseline to start from. 


### The Two-Phase Approach:

#### Phase 1: Initialization

Establish the baseline. In this demonstration it is the first available snapshot. In production there will be a daily baseline by closing all orders at the end of the day and re-open all trades the next day.

Process:

- Take the **first snapshot** and create one ORDER_OPENED event per order
- This establishes the baseline state
- Size: ~1 event per order in the market

#### Phase 2: Delta Extraction  

- Compare each **subsequent snapshot** to the previous one
- Only extract events for orders that **changed**
- Typical changes: trades (volume reduction), cancellations, new orders, price changes
- Size: expected ~<1% of orders change between snapshots

This is where the massive compression happens without information loss. 

In [5]:
# Load two consecutive snapshots for demonstration (5 minutes apart)
snapshot_t0 = Path("./data/snapshots/region_10000002_2025-10-22T07-00-00+00-00.parquet")
snapshot_t1 = Path("./data/snapshots/region_10000002_2025-10-22T07-05-00+00-00.parquet")

# Check if files exist
if not snapshot_t0.exists() or not snapshot_t1.exists():
    raise FileNotFoundError(
        "Snapshot files not found in ./data/snapshots/\n"
        "These should be included in the project repository."
    )

print(f"Loaded 2 consecutive snapshots:")
print(f"   T0: {snapshot_t0.name}")
print(f"   T1: {snapshot_t1.name}")

# Measure snapshot sizes
t0_size = snapshot_t0.stat().st_size / (1024 * 1024)
t1_size = snapshot_t1.stat().st_size / (1024 * 1024)

print(f"\nSnapshot sizes:")
print(f"   T0: {t0_size:.2f} MB")
print(f"   T1: {t1_size:.2f} MB")
print(f"   Total: {t0_size + t1_size:.2f} MB for 2 snapshots")

Loaded 2 consecutive snapshots:
   T0: region_10000002_2025-10-22T07-00-00+00-00.parquet
   T1: region_10000002_2025-10-22T07-05-00+00-00.parquet

Snapshot sizes:
   T0: 8.52 MB
   T1: 8.52 MB
   Total: 17.05 MB for 2 snapshots


### Phase 1: Initialization - Extract baseline events

In [6]:
# Initialize detector
detector = PolarsEventDetector()

# Phase 1: Initialize from first snapshot
# This creates 1 ORDER_OPENED event per order
timestamp_t0_str = snapshot_t0.stem.split('_')[-1].replace('+00-00', '')
timestamp_t0 = datetime.strptime(timestamp_t0_str, "%Y-%m-%dT%H-%M-%S")

print("Phase 1: Initialization")
print(f"   Processing first snapshot at {timestamp_t0}")

init_events_df = detector.initialize_from_snapshot(snapshot_t0, timestamp_t0)

print(f"\nInitialization Results:")
print(f"   Orders in T0:        {pl.read_parquet(snapshot_t0).height:,}")
print(f"   ORDER_OPENED events: {init_events_df.height:,}")
print(f"   Ratio:               1 event per order (baseline)")


Phase 1: Initialization
   Processing first snapshot at 2025-10-22 07:00:00
Initializing event log from snapshot: 375,338 orders

Initialization Results:
   Orders in T0:        375,338
   ORDER_OPENED events: 375,338
   Ratio:               1 event per order (baseline)


### Phase 2: Delta Extraction - Detect changes

In [7]:
# Phase 2: Extract delta events between T0 and T1
timestamp_t1_str = snapshot_t1.stem.split('_')[-1].replace('+00-00', '')
timestamp_t1 = datetime.strptime(timestamp_t1_str, "%Y-%m-%dT%H-%M-%S")

print("Phase 2: Delta Extraction")
print(f"   Comparing T0 → T1 (5 minute interval)")

delta_events_df = detector.detect_events(snapshot_t0, snapshot_t1, timestamp_t1)

print(f"\nDelta Extraction Results:")
print(f"   Orders in T1:        {pl.read_parquet(snapshot_t1).height:,}")
print(f"   Delta events:        {delta_events_df.height:,}")
print(f"   Change rate:         {(delta_events_df.height / pl.read_parquet(snapshot_t1).height) * 100:.1f}% of orders changed")

# Show event type breakdown
event_type_counts = delta_events_df.group_by('event_type').agg(
    pl.len().alias('count')
).sort('count', descending=True)

print(f"\nDelta event types:")
for row in event_type_counts.iter_rows(named=True):
    print(f"   {row['event_type']:20} {row['count']:,}")

Phase 2: Delta Extraction
   Comparing T0 → T1 (5 minute interval)

Delta Extraction Results:
   Orders in T1:        375,320
   Delta events:        1,023
   Change rate:         0.3% of orders changed

Delta event types:
   trade                514
   price_changed        221
   order_cancelled      149
   order_opened         135
   order_closed         4


### Combine events and measure compression

In [8]:
# Combine initialization + delta events
all_events_df = pl.concat([init_events_df, delta_events_df])

# Store events as Parquet
events_dir = Path("./data/demo")
events_dir.mkdir(parents=True, exist_ok=True)
events_path = events_dir / "demo_events.parquet"

all_events_df.write_parquet(events_path, compression="snappy")

# Measure compression
events_size_bytes = events_path.stat().st_size
events_size_mb = events_size_bytes / (1024 * 1024)
total_snapshot_size_mb = t0_size + t1_size

snapshot_to_events_ratio = (total_snapshot_size_mb * 1024 * 1024) / events_size_bytes

print(f"Stored {all_events_df.height:,} events to {events_path.name} ({events_size_mb:.2f} MB)")

print(f"\nCompression Results (Stage 2: Snapshots → Events)")
print(f"{'='*70}")
print(f"Two snapshots (T0 + T1):      {total_snapshot_size_mb:8.2f} MB")
print(f"Events (init + delta):         {events_size_mb:8.2f} MB")
print(f"   └─ Initialization events:   {init_events_df.height:,}")
print(f"   └─ Delta events:            {delta_events_df.height:,}")
print(f"   └─ Total events:            {all_events_df.height:,}")
print(f"\nCompression ratio:             {snapshot_to_events_ratio:8.1f}× reduction")
print(f"Bytes per event:               {events_size_bytes / all_events_df.height:8.1f} bytes")
print(f"{'='*70}")

print(f"\nKey Insight:")
print(f"   After initialization, only {delta_events_df.height:,} events needed")
print(f"   vs {pl.read_parquet(snapshot_t1).height:,} orders in full snapshot")
print(f"   = {(delta_events_df.height / pl.read_parquet(snapshot_t1).height) * 100:.1f}% data to store!")

Stored 376,361 events to demo_events.parquet (9.15 MB)

Compression Results (Stage 2: Snapshots → Events)
Two snapshots (T0 + T1):         17.05 MB
Events (init + delta):             9.15 MB
   └─ Initialization events:   375,338
   └─ Delta events:            1,023
   └─ Total events:            376,361

Compression ratio:                  1.9× reduction
Bytes per event:                   25.5 bytes

Key Insight:
   After initialization, only 1,023 events needed
   vs 375,320 orders in full snapshot
   = 0.3% data to store!


In the production setup we are interested in daily summaries. The end of day processing will calculate all market statistics we are currently interested in. 

The order book information (baseline + events) of the day will however be kept should at some point the methodology change or other analyses be needed.

The true compression ratio should therefore be analyzed based on a full day.

---

## Daily Projection

Let's extrapolate our 2-snapshot demonstration to a full day (288 snapshots, every 5 minutes).

### Calculate daily storage requirements

In [9]:
# Based on our 2-snapshot sample
init_events = init_events_df.height
delta_events = delta_events_df.height

# Project for full day (288 snapshots every 5 minutes)
# Day 1: 1 initialization + 287 delta intervals
daily_init_events = init_events
daily_delta_events = delta_events * 287

total_daily_events = daily_init_events + daily_delta_events

print(f"Daily Event Volume:")
print(f"{'='*70}")
print(f"Initialization (once/day):                             {daily_init_events:>10,} events")
print(f"Delta events (287 intervals at approx. {delta_events} each):    {daily_delta_events:>10,} events")
print(f"Total daily events:                                    {total_daily_events:>10,} events")
print(f"{'='*70}")

# Calculate storage
bytes_per_event = events_size_bytes / all_events_df.height
daily_event_storage_mb = (total_daily_events * bytes_per_event) / (1024 * 1024)

# Compare with naive snapshot storage
naive_daily_snapshots_mb = (t0_size + t1_size) / 2 * 288  # 288 snapshots per day
compression_ratio_daily = naive_daily_snapshots_mb / daily_event_storage_mb

print(f"\nDaily Storage Comparison:")
print(f"{'='*70}")
print(f"Naive (keep all snapshots):    {naive_daily_snapshots_mb:>8.2f} MB/day")
print(f"Event sourcing (Parquet):      {daily_event_storage_mb:>8.2f} MB/day")
print(f"Compression ratio:             {compression_ratio_daily:>8.1f}×")
print(f"{'='*70}")

print(f"\nKey Insight:")
print(f"   After initialization, only {delta_events:,} events per 5-min interval")
print(f"   = {(delta_events / init_events) * 100:.2f}% of baseline data per snapshot")
print(f"   = Storing changes is {compression_ratio_daily:.1f}× more efficient!")

Daily Event Volume:
Initialization (once/day):                                375,338 events
Delta events (287 intervals at approx. 1023 each):       293,601 events
Total daily events:                                       668,939 events

Daily Storage Comparison:
Naive (keep all snapshots):     2454.57 MB/day
Event sourcing (Parquet):         16.26 MB/day
Compression ratio:                150.9×

Key Insight:
   After initialization, only 1,023 events per 5-min interval
   = 0.27% of baseline data per snapshot
   = Storing changes is 150.9× more efficient!


---

## Summary

Let's review what we demonstrated with real data.

In [12]:
print(f"\n{'='*70}")
print(f"COMPRESSION DEMONSTRATION SUMMARY")
print(f"{'='*70}")

print(f"\nStage 1: JSON → Parquet (Columnar Storage)")
print(f"   Raw JSON:                  {json_size_mb:.2f} MB")
print(f"   Parquet (Snappy):          {parquet_size_mb:.2f} MB")
print(f"   Compression:               {compression_ratio:.1f}×")

print(f"\nStage 2: Event Sourcing (2 Snapshots)")
print(f"   Two snapshots (T0 + T1):   {total_snapshot_size_mb:.2f} MB")
print(f"   Events (init + delta):     {events_size_mb:.2f} MB")
print(f"   Compression:               {snapshot_to_events_ratio:.1f}×")
print(f"")
print(f"   Events breakdown:")
print(f"      Initialization:         {init_events_df.height:,} events (baseline)")
print(f"      Delta (5 min):          {delta_events_df.height:,} events ({(delta_events_df.height/init_events_df.height)*100:.2f}% changed)")

print(f"\nDaily Projection (288 Snapshots)")
daily_json_mb = json_size_mb * 288
daily_parquet_mb = naive_daily_snapshots_mb
daily_events_mb = daily_event_storage_mb

print(f"   288 JSON files:            {daily_json_mb:,.0f} MB/day")
print(f"   288 Parquet files:         {daily_parquet_mb:,.0f} MB/day ({daily_json_mb/daily_parquet_mb:.1f}× vs JSON)")
print(f"   Event sourcing:            {daily_events_mb:,.0f} MB/day ({daily_parquet_mb/daily_events_mb:.1f}× vs Parquet)")
print(f"")
print(f"   Total compression:         {daily_json_mb/daily_events_mb:.1f}× (JSON 28965MB → Events 15MB)")



COMPRESSION DEMONSTRATION SUMMARY

Stage 1: JSON → Parquet (Columnar Storage)
   Raw JSON:                  120.93 MB
   Parquet (Snappy):          9.52 MB
   Compression:               12.7×

Stage 2: Event Sourcing (2 Snapshots)
   Two snapshots (T0 + T1):   17.05 MB
   Events (init + delta):     9.15 MB
   Compression:               1.9×

   Events breakdown:
      Initialization:         375,338 events (baseline)
      Delta (5 min):          1,023 events (0.27% changed)

Daily Projection (288 Snapshots)
   288 JSON files:            34,827 MB/day
   288 Parquet files:         2,455 MB/day (14.2× vs JSON)
   Event sourcing:            16 MB/day (150.9× vs Parquet)

   Total compression:         2141.7× (JSON 28965MB → Events 15MB)
