# Redis Backup Analysis

This notebook lets you inspect JSON files generated by ``redis_backup``. It loads the payload, prints summary information, and attempts to decode string values so you can read them easily.


## Configure the backup path

Set the path to the JSON backup exported with ``save_backup_to_file``. The notebook never connects to Redis; it reads everything straight from the file on disk.


In [6]:
from __future__ import annotations

from pathlib import Path
from typing import Any, Iterable
import json
import pprint

from redis_backup import (
    decode_bytes,
    display_backup_summary,
    load_backup_from_file,
)


In [7]:
# Path to the backup you want to inspect.
BACKUP_PATH = Path('/path/to/your/backup.json')

backup = None
if BACKUP_PATH.exists():
    backup = load_backup_from_file(BACKUP_PATH)
    display_backup_summary(backup)
else:
    print("⚠️ Update BACKUP_PATH with the correct path to your backup file.")


⚠️ Update BACKUP_PATH with the correct path to your backup file.


In [8]:
from dataclasses import dataclass


class DumpDecodeError(RuntimeError):
    """Generic error raised while decoding a Redis DUMP payload."""


@dataclass
class DumpSections:
    payload: bytes
    version: int
    checksum: bytes


def split_dump_sections(raw: bytes) -> DumpSections:
    """Split payload, RDB version, and checksum from a Redis dump."""

    if len(raw) < 10:
        raise DumpDecodeError("DUMP payload is too short to contain metadata")
    checksum = raw[-8:]
    version_bytes = raw[-10:-8]
    version = int.from_bytes(version_bytes, "little", signed=False)
    payload = raw[:-10]
    return DumpSections(payload=payload, version=version, checksum=checksum)


class _LengthEncoding:
    __slots__ = ("value", "encoding")

    def __init__(self, value: int | None, encoding: int | None = None) -> None:
        self.value = value
        self.encoding = encoding


RDB_ENCODING_INT8 = 0
RDB_ENCODING_INT16 = 1
RDB_ENCODING_INT32 = 2
RDB_ENCODING_LZF = 3


def _read_length_info(buffer: bytes, offset: int) -> tuple[_LengthEncoding, int]:
    if offset >= len(buffer):
        raise DumpDecodeError("Offset out of range while reading length")
    first = buffer[offset]
    prefix = first >> 6
    if prefix == 0:
        length = first & 0x3F
        return _LengthEncoding(length), offset + 1
    if prefix == 1:
        if offset + 1 >= len(buffer):
            raise DumpDecodeError("Truncated 14-bit encoded length")
        second = buffer[offset + 1]
        length = ((first & 0x3F) << 8) | second
        return _LengthEncoding(length), offset + 2
    if prefix == 2:
        if offset + 4 >= len(buffer):
            raise DumpDecodeError("Truncated 32-bit encoded length")
        length = int.from_bytes(buffer[offset + 1 : offset + 5], "big", signed=False)
        return _LengthEncoding(length), offset + 5
    return _LengthEncoding(None, first & 0x3F), offset + 1


def lzf_decompress(data: bytes, expected_length: int) -> bytes:
    """Minimal implementation of the LZF decompression used by Redis."""

    output = bytearray()
    idx = 0
    data_len = len(data)
    while idx < data_len:
        ctrl = data[idx]
        idx += 1
        if ctrl < 32:
            literal_len = ctrl + 1
            if idx + literal_len > data_len:
                raise DumpDecodeError("Truncated literal LZF sequence")
            output.extend(data[idx : idx + literal_len])
            idx += literal_len
        else:
            length = ctrl >> 5
            ref_offset = len(output) - ((ctrl & 0x1F) << 8) - 1
            if length == 7:
                if idx >= data_len:
                    raise DumpDecodeError("Truncated LZF sequence while extending length")
                length += data[idx]
                idx += 1
            if idx >= data_len:
                raise DumpDecodeError("Truncated LZF sequence while resolving reference")
            ref_offset -= data[idx]
            idx += 1
            length += 2
            if ref_offset < 0:
                raise DumpDecodeError("Negative LZF reference")
            for _ in range(length):
                if ref_offset >= len(output):
                    raise DumpDecodeError("LZF reference out of range")
                output.append(output[ref_offset])
                ref_offset += 1
    if len(output) != expected_length:
        raise DumpDecodeError(
            f"Unexpected decompressed length: expected {expected_length}, got {len(output)}"
        )
    return bytes(output)


def _decode_special_encoding(buffer: bytes, offset: int, encoding: int) -> tuple[bytes, int]:
    if encoding == RDB_ENCODING_INT8:
        if offset >= len(buffer):
            raise DumpDecodeError("Truncated 8-bit encoded integer")
        value = int.from_bytes(buffer[offset : offset + 1], "little", signed=True)
        return str(value).encode("ascii"), offset + 1
    if encoding == RDB_ENCODING_INT16:
        if offset + 1 >= len(buffer):
            raise DumpDecodeError("Truncated 16-bit encoded integer")
        value = int.from_bytes(buffer[offset : offset + 2], "little", signed=True)
        return str(value).encode("ascii"), offset + 2
    if encoding == RDB_ENCODING_INT32:
        if offset + 3 >= len(buffer):
            raise DumpDecodeError("Truncated 32-bit encoded integer")
        value = int.from_bytes(buffer[offset : offset + 4], "little", signed=True)
        return str(value).encode("ascii"), offset + 4
    if encoding == RDB_ENCODING_LZF:
        length_info, offset = _read_length_info(buffer, offset)
        if length_info.value is None:
            raise DumpDecodeError("Invalid compressed length")
        compressed_len = length_info.value
        length_info, offset = _read_length_info(buffer, offset)
        if length_info.value is None:
            raise DumpDecodeError("Invalid original length")
        uncompressed_len = length_info.value
        end = offset + compressed_len
        if end > len(buffer):
            raise DumpDecodeError("Truncated compressed LZF data")
        chunk = buffer[offset:end]
        offset = end
        return lzf_decompress(chunk, uncompressed_len), offset
    raise DumpDecodeError(f"Unsupported special encoding: {encoding}")


def _read_encoded_string(buffer: bytes, offset: int) -> tuple[bytes, int]:
    length_info, offset = _read_length_info(buffer, offset)
    if length_info.value is not None:
        length = length_info.value
        end = offset + length
        if end > len(buffer):
            raise DumpDecodeError("Truncated encoded string")
        return buffer[offset:end], end
    if length_info.encoding is None:
        raise DumpDecodeError("Unknown string encoding")
    return _decode_special_encoding(buffer, offset, length_info.encoding)


def decode_string_from_dump(raw: bytes) -> bytes:
    sections = split_dump_sections(raw)
    payload = sections.payload
    if not payload:
        raise DumpDecodeError("Empty payload")
    object_type = payload[0]
    if object_type != 0:
        raise DumpDecodeError(f"Non-string object type: {object_type}")
    value, offset = _read_encoded_string(payload, 1)
    if offset != len(payload):
        # Ignore unexpected trailing bytes
        value = value
    return value


def decode_key(entry: dict[str, Any]) -> bytes:
    return decode_bytes(entry["key"])


def text_preview(value: bytes, *, limit: int = 120) -> str:
    text = value.decode("utf-8", errors="replace")
    if len(text) > limit:
        return text[: limit - 1] + "…"
    return text


def try_decode_value(entry: dict[str, Any]) -> tuple[str, dict[str, Any]]:
    value_info = dict(entry.get("value") or {})
    data_b64 = value_info.get("data")
    if not data_b64:
        return "<no value>", value_info
    raw = decode_bytes(data_b64)
    details: dict[str, Any] = {
        "dump_size": len(raw),
    }
    try:
        sections = split_dump_sections(raw)
        details["rdb_version"] = sections.version
        details["checksum"] = sections.checksum.hex()
    except DumpDecodeError as exc:
        details["dump_error"] = str(exc)
        return "<invalid dump>", details
    if entry.get("type") == "string":
        try:
            decoded = decode_string_from_dump(raw)
        except DumpDecodeError as exc:
            details["decode_error"] = str(exc)
            return "<string not decoded>", details
        details["decoded_bytes"] = decoded
        preview = text_preview(decoded)
        return preview, details
    return f"<{entry.get('type')} - {len(sections.payload)} bytes>", details


def summarise_entries(entries: Iterable[dict[str, Any]], limit: int = 20) -> list[dict[str, Any]]:
    summary: list[dict[str, Any]] = []
    for index, entry in enumerate(entries):
        if index >= limit:
            break
        key_bytes = decode_key(entry)
        value_preview, _ = try_decode_value(entry)
        summary.append(
            {
                "key": text_preview(key_bytes),
                "type": entry.get("type"),
                "ttl_ms": entry.get("pttl"),
                "value_preview": value_preview,
            }
        )
    return summary


def find_entry(entries: Iterable[dict[str, Any]], key: bytes | str) -> dict[str, Any]:
    if isinstance(key, str):
        key_bytes = key.encode("utf-8")
    else:
        key_bytes = key
    for entry in entries:
        if decode_key(entry) == key_bytes:
            return entry
    raise KeyError(key)


def inspect_entry(entry: dict[str, Any]) -> None:
    key_bytes = decode_key(entry)
    print(f"Key: {key_bytes!r}")
    print(f"Redis type: {entry.get('type')}")
    ttl = entry.get("pttl")
    print(f"TTL (ms): {ttl if ttl is not None else 'persistent'}")
    preview, details = try_decode_value(entry)
    print(f"Value preview: {preview}")
    print("Details:")
    pprint.pprint(details)
    decoded = details.get("decoded_bytes")
    if isinstance(decoded, (bytes, bytearray)):
        text = decoded.decode("utf-8", errors="replace")
        print("\nPlaintext content:")
        print(text)
        stripped = text.strip()
        if stripped.startswith("{") and stripped.endswith("}"):
            try:
                parsed = json.loads(text)
            except json.JSONDecodeError:
                pass
            else:
                print("\nParsed JSON:")
                pprint.pprint(parsed)


In [9]:
if backup is not None:
    entries = list(backup.get('entries', []))
    print(f'Total keys in backup: {len(entries)}')
    preview_rows = summarise_entries(entries, limit=20)
    try:
        import pandas as pd  # type: ignore
    except ModuleNotFoundError:
        pd = None
    if preview_rows:
        if 'pd' in locals() and pd is not None:
            display(pd.DataFrame(preview_rows))
        else:
            for row in preview_rows:
                print(row)
    else:
        print('No keys found in the selected backup.')


## Inspect a specific key

Use ``find_entry`` to retrieve a key from the backup and ``inspect_entry`` to display its metadata and decoded value when possible.


In [10]:
if backup is not None:
    # Example: replace 'namespace:your:key' with the key you want to inspect.
    try:
        entry = find_entry(entries, 'namespace:your:key')
    except KeyError:
        print('Key not found: update the name to inspect its contents.')
    else:
        inspect_entry(entry)
