# Storage Benchmark

In [None]:
import os
import mmap
import time
import random
import shutil
from pathlib import Path
from abc import ABC, abstractmethod

# config
nRecords = 100_000
minRecordSize = 1024
maxRecordSize = 2048
recordsPerChunk = 1000
nRandomReads = 1000

writeBufferSize = 4 * 1024 * 1024  
chunkBufferSize = 1024 * 1024

outputDir = Path("./benchmark_output")
random.seed(24)  # for reproducibility

In [None]:
class StorageStrategy(ABC):
    def __init__(self, baseDir, name):
        self.baseDir = baseDir
        self.name = name
        self.baseDir.mkdir(parents=True, exist_ok=True)
    
    @abstractmethod
    def write(self, records): pass
    
    @abstractmethod
    def readSequential(self): pass
    
    @abstractmethod
    def readRandom(self, indices): pass
    
    def cleanUp(self):
        if self.baseDir.exists():
            shutil.rmtree(self.baseDir)
    
    def getDiskSpace(self):
        total = 0
        for f in self.baseDir.rglob('*'):
            if f.is_file():
                total += f.stat().st_size
        return total
    
    def getFileCount(self):
        return sum(1 for f in self.baseDir.rglob('*') if f.is_file())


class SingleFileStrategy(StorageStrategy):
    """all records in one big file, with an in-memory index"""
    def __init__(self, baseDir):
        super().__init__(baseDir, "SingleFile")
        self.dataFile = self.baseDir / "data.bin"
        self.index = []
    
    def write(self, records):
        self.index = [(0, 0)] * len(records)
        offset = 0
        with open(self.dataFile, 'wb', buffering=writeBufferSize) as f:
            for i, rec in enumerate(records):
                self.index[i] = (offset, len(rec))
                f.write(rec)
                offset += len(rec)
    
    def readSequential(self):
        # mmap is way faster than seeking around
        with open(self.dataFile, 'rb') as f:
            mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
            records = [bytes(mm[off:off+sz]) for off, sz in self.index]
            mm.close()
        return records
    
    def readRandom(self, indices):
        with open(self.dataFile, 'rb') as f:
            mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
            records = [bytes(mm[self.index[i][0]:self.index[i][0]+self.index[i][1]]) for i in indices]
            mm.close()
        return records


class ChunkedFileStrategy(StorageStrategy):
    """split into chunk files, each with ~1000 records"""
    def __init__(self, baseDir, chunkSize=recordsPerChunk):
        super().__init__(baseDir, "ChunkedFile")
        self.chunkSize = chunkSize
        self.chunkIndex = {}
        self.numChunks = 0
    
    def _chunkPath(self, cid):
        return self.baseDir / f"chunk_{cid:04d}.bin"
    
    def write(self, records):
        self.chunkIndex.clear()
        cid = -1
        f = None
        count = 0
        offset = 0
        
        for rec in records:
            if count == 0:
                if f: f.close()
                cid += 1
                offset = 0
                self.chunkIndex[cid] = []
                f = open(self._chunkPath(cid), 'wb', buffering=chunkBufferSize)
            
            self.chunkIndex[cid].append((offset, len(rec)))
            f.write(rec)
            offset += len(rec)
            
            count += 1
            if count >= self.chunkSize:
                count = 0
        
        self.numChunks = cid + 1
        if f: f.close()
    
    def readSequential(self):
        records = []
        for cid in range(self.numChunks):
            with open(self._chunkPath(cid), 'rb', buffering=chunkBufferSize) as f:
                for off, sz in self.chunkIndex[cid]:
                    records.append(f.read(sz))
        return records
    
    def readRandom(self, indices):
        # group by chunk to minimize file opens
        byChunk = {}
        for pos, idx in enumerate(indices):
            cid = idx // self.chunkSize
            lid = idx % self.chunkSize
            off, sz = self.chunkIndex[cid][lid]
            byChunk.setdefault(cid, []).append((off, pos, sz))
        
        results = [None] * len(indices)
        for cid in sorted(byChunk.keys()):
            with open(self._chunkPath(cid), 'rb') as f:
                mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
                for off, pos, sz in byChunk[cid]:
                    results[pos] = bytes(mm[off:off+sz])
                mm.close()
        return results


class IndividualFileStrategy(StorageStrategy):
    """one file per record - simple but slow due to filesystem overhead"""
    def __init__(self, baseDir):
        super().__init__(baseDir, "IndividualFile")
        self.count = 0
    
    def _recPath(self, rid):
        return self.baseDir / f"rec_{rid:06d}.bin"
    
    def write(self, records):
        self.count = len(records)
        for i, rec in enumerate(records):
            with open(self._recPath(i), 'wb') as f:
                f.write(rec)
    
    def readSequential(self):
        return [open(self._recPath(i), 'rb').read() for i in range(self.count)]
    
    def readRandom(self, indices):
        return [open(self._recPath(i), 'rb').read() for i in indices]

In [None]:
def timed(fn):
    t0 = time.perf_counter()
    result = fn()
    return result, time.perf_counter() - t0

def runBenchmark(strategy, records, randIndices, dataSize):
    print(f"\n{strategy.name}:")
    # print(f"  DEBUG: {len(records)} records")
    
    _, wt = timed(lambda: strategy.write(records))
    print(f"  Write:      {wt:.3f}s")
    
    diskSpace = strategy.getDiskSpace()
    fileCount = strategy.getFileCount()
    
    seqRecs, st = timed(strategy.readSequential)
    print(f"  Seq Read:   {st:.3f}s")
    
    # quick sanity check
    if len(seqRecs) != len(records):
        print("  WARNING: record count mismatch!")
    
    _, rt = timed(lambda: strategy.readRandom(randIndices))
    print(f"  Rand Read:  {rt:.3f}s")
    
    strategy.cleanUp()
    
    return {
        "name": strategy.name, 
        "write": wt, "seq": st, "rand": rt,
        "diskSpace": diskSpace,
        "fileCount": fileCount,
        "dataSize": dataSize
    }

In [None]:
print(f"Generating {nRecords:,} records...")
records = [os.urandom(random.randint(minRecordSize, maxRecordSize)) for _ in range(nRecords)]
totalSize = sum(len(r) for r in records)
print(f"Total size: {totalSize / (1024*1024):.2f} MB")

randIndices = random.sample(range(nRecords), nRandomReads)

# clean up from previous runs
if outputDir.exists():
    shutil.rmtree(outputDir)

In [None]:
strategies = [
    SingleFileStrategy(outputDir / "single"),
    ChunkedFileStrategy(outputDir / "chunked"),
    IndividualFileStrategy(outputDir / "individual"),
]

results = [runBenchmark(s, records, randIndices, totalSize) for s in strategies]

In [None]:
MB = 1024 * 1024

print("\n" + "="*80)
print("RESULTS")
print("="*80)
print(f"{'Strategy':<15}{'Write(s)':>10}{'MB/s':>10}{'SeqRead(s)':>12}{'MB/s':>10}{'RandRead(s)':>12}")
print("-"*80)
for r in results:
    wMBps = (r['dataSize'] / MB) / r['write']
    sMBps = (r['dataSize'] / MB) / r['seq']
    print(f"{r['name']:<15}{r['write']:>10.3f}{wMBps:>10.1f}{r['seq']:>12.3f}{sMBps:>10.1f}{r['rand']:>12.3f}")

print("\n" + "-"*60)
print("STORAGE")
print("-"*60)
print(f"{'Strategy':<15}{'Disk (MB)':>12}{'Files':>10}{'Bytes/Rec':>14}")
for r in results:
    diskMB = r['diskSpace'] / MB
    bpr = r['diskSpace'] / nRecords
    print(f"{r['name']:<15}{diskMB:>12.2f}{r['fileCount']:>10}{bpr:>14.1f}")