# Tamper-Evident Forensic Logging System

Modern cybersystems heavily rely on logs as one of the primary forensic evidence during legal proceedings, audits, cyber incidents etc. However, what happens if an adversary gains access to these logs and modifies them? Traditional logs are:
- Plaintext
- Can be easily edited, deleted and reordered
- Rarely provide cryptographic proof of integrity

Once an advesary gains access and elevates access priviledges, traditional logs are no longer trustworthy. This project designs and implements a tamper-evident logging system that ensures:
- Every log entry depends on all previous entries
- Altering one entry breaks the entire chain
- Verification can be done later by an investigator

**This project is NOT about preventing attacks**. The design goal is _tamper evidence_, NOT _tamper prevention_.

### Potential Solution: Encryption

Encryption alone does not guarantee append-only behavior, ordering integrity, tamper detection, evidence continuity etc. An advesary can always delete encrypted logs entirely or replay old encrypted logs

## Project Objectives

1. Logs are append-only
2. Tampering is easily detectable
3. Logs are cryptographically verified
4. Logs are exported to a remote server
5. Log order is preserved
6. The server becomes a source of validation

## Threat Model

This project assumes the following:
- Attacker has user or admin access
- Attacker can read log files
- Attacker can attempt to delete or modify logs

This system defends against:
- Local attackers modifying log files
- Attackers deleting entries
- Attackers inserting fake entries
- Attackers reordering logs
- Attackers replaying old logs
- Network eavesdroppers

### Approach

Each log entry contains:
- `timestamp`
- `event_data`
- `previous_hash`
- `current_hash`

Where `current_hash` = HASH(`timestamp` || `event_data` || `previous_hash`)

This forms a hash chain:
```txt
    Entry 1 -> Hash 1
    Entry 2 -> Hash 2 (Dependent on Hash 1)
    Entry 3 -> Hash 3 (Dependent on Hash 2)
    ...
```

Any modification breaks all future hashes.

### Components

1. Log Generator
    - Produces events (real or simulated)
    - Examples:
        - Login
        - File access
        - Process execution

2. Hashing Engine
    - Computes chained hashes
    - Maintains previous hash state

3. Log Store
    - Append-only file
    - No in-place edits

4. Verification Engine
    - Recomputes hashes
    - Detects tampering
    - Reports first corrupted entry

## Log Generation



In [24]:
import hashlib
from datetime import datetime, timezone
import os

LOG_FILE = "secure_logs.log"
GENESIS_HASH = "0" * 64  # Indicates start of chain


def sha256(data: str) -> str:
    """
    Compute SHA-256 hash of input string.
    """
    return hashlib.sha256(data.encode("utf-8")).hexdigest()


def get_utc_timestamp() -> str:
    """
    Return current UTC time in ISO 8601 format.
    Example: 2026-02-11T14:32:08Z
    """
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def get_last_hash() -> str:
    """
    Retrieve the last hash from the log file.
    If file does not exist or is empty, return GENESIS_HASH.
    """
    if not os.path.exists(LOG_FILE):
        return GENESIS_HASH

    with open(LOG_FILE, "r", encoding="utf-8") as f:
        lines = f.readlines()
        if not lines:
            return GENESIS_HASH

        last_line = lines[-1].strip()
        return last_line.split("|")[-1]  # current_hash


def generate_log(username: str = "admin", level: str = "INFO", message: str = "Test Log") -> None:
    """
    Generate a tamper-evident log entry and append it to disk.
    """

    # Validation
    allowed_levels = {"INFO", "WARNING", "ERROR", "SUCCESS"}
    if level not in allowed_levels:
        raise ValueError(f"Invalid log level: {level}")

    if "\n" in message:
        raise ValueError("Log message must not contain newline characters")

    username = username.strip()
    message = message.strip()

    timestamp = get_utc_timestamp()
    prev_hash = get_last_hash()

    # Canonical serialization
    canonical_entry = (f"{username}|{timestamp}|{level}|{message}|{prev_hash}")

    current_hash = sha256(canonical_entry)
    full_log_entry = f"{canonical_entry}|{current_hash}"

    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(full_log_entry + "\n")

def random_log_entry() -> None:
    """
    Generate a random log entry for testing purposes.
    """
    import random
    username = [
        "admin", "user1", "user2", "user3", "user4", "user5", "user6", "user7", "user8", "user9", "user10"
    ]
    username = random.choice(username)

    level = random.choice(["INFO", "WARNING", "ERROR", "SUCCESS"])

    message_warning = [
        "Disk space low",
        "High memory usage",
        "CPU temperature high",
        "Unusual login attempt",
        "Failed login attempt",
        "Configuration change detected",
    ]
    message_error = [
        "Failed to access resource",
        "Database connection failed",
        "Authentication failed",
        "Authorization denied",
        "System crash detected"
    ]
    message_success = [
        "Backup completed successfully",
        "Data migration completed",
        "System update applied",
        "User account created",
        "User account deleted"
    ]
    message_info = [
        "User logged in",
        "User logged out",
        "Scheduled task executed",
        "Configuration file updated",
        "Service started",
        "Service stopped"
    ]

    if level == "INFO":
        message = random.choice(message_info)
    elif level == "WARNING":
        message = random.choice(message_warning)
    elif level == "SUCCESS":
        message = random.choice(message_success)
    else:
        message = random.choice(message_error)

    timestamp = get_utc_timestamp()
    prev_hash = get_last_hash()

    canonical_entry = f"{username}|{timestamp}|{level}|{message}|{prev_hash}"
    current_hash = sha256(canonical_entry)
    
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(f"{canonical_entry}|{current_hash}\n")
    
    return f"{canonical_entry}|{current_hash}".encode("utf-8")

# if __name__ == "__main__":
#     for _ in range(1000):
#         random_log_entry()

In [32]:
def verify_log_file(log_file: str = LOG_FILE) -> None:
    """
    Verify integrity of the tamper-evident log file.
    Prints verification result and first point of failure (if any).
    """

    if not os.path.exists(log_file):
        print("[ERROR] Log file does not exist.")
        return

    expected_prev_hash = GENESIS_HASH

    with open(log_file, "r", encoding="utf-8") as f:
        for line_number, line in enumerate(f, start=1):
            line = line.strip()

            if not line:
                continue  # skip empty lines

            parts = line.split("|")

            if len(parts) != 6:
                print(f"[TAMPER DETECTED] Line {line_number}: malformed entry")
                return

            username, timestamp, level, message, stored_prev_hash, stored_curr_hash = parts

            # Check previous hash consistency
            if stored_prev_hash != expected_prev_hash:
                print(f"[TAMPER DETECTED] Line {line_number}: previous hash mismatch")
                print(f"Expected: {expected_prev_hash}")
                print(f"Found:    {stored_prev_hash}")
                return

            # Recompute current hash
            canonical_entry = (f"{username}|{timestamp}|{level}|{message}|{stored_prev_hash}")
            computed_hash = sha256(canonical_entry)

            if computed_hash != stored_curr_hash:
                print(f"[TAMPER DETECTED] Line {line_number}: hash verification failed")
                print(f"Expected: {computed_hash}")
                print(f"Found:    {stored_curr_hash}")
                return

            # Move chain forward
            expected_prev_hash = stored_curr_hash

    print("[OK] Log file integrity verified. No tampering detected.")

verify_log_file()

[OK] Log file integrity verified. No tampering detected.


## Key Generation

```py
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

def generate_keys(name):
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048
    )

    with open(f"{name}_private_key.pem", "wb") as f:
        f.write(
            private_key.private_bytes(
                serialization.Encoding.PEM,
                serialization.PrivateFormat.PKCS8,
                serialization.NoEncryption()
            )
        )

    with open(f"{name}_public_key.pem", "wb") as f:
        f.write(
            private_key.public_key().public_bytes(
                serialization.Encoding.PEM,
                serialization.PublicFormat.SubjectPublicKeyInfo
            )
        )

generate_keys("server")
generate_keys("client")
```

In [31]:
import socket
import os
import struct
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

HOST = "127.0.0.1"
PORT = 5000

# Load server public key
with open("Server/server_public_key.pem", "rb") as f:
    server_public_key = serialization.load_pem_public_key(f.read())

# Load client private key (for signing)
with open("client_private_key.pem", "rb") as f:
    client_private_key = serialization.load_pem_private_key(
        f.read(), password=None
    )

# Generate AES session key
aes_key = AESGCM.generate_key(bit_length=256)
aesgcm = AESGCM(aes_key)

def sign(data: bytes) -> bytes:
    return client_private_key.sign(
        data,
        padding.PSS(
            mgf=padding.MGF1(hashes.SHA256()),
            salt_length=padding.PSS.MAX_LENGTH
        ),
        hashes.SHA256()
    )

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))

    # Send AES key encrypted with RSA
    enc_key = server_public_key.encrypt(
        aes_key,
        padding.OAEP(
            mgf=padding.MGF1(hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

    s.sendall(struct.pack(">I", len(enc_key)) + enc_key)
    print("[+] AES key sent securely")

    # Send signed & encrypted logs 
    logs = [random_log_entry() for _ in range(10)]

    for log in logs:
        signature = sign(log)

        payload = (
            struct.pack(">I", len(log)) +
            log +
            signature
        )

        nonce = os.urandom(12)
        ciphertext = aesgcm.encrypt(nonce, payload, None)

        packet = nonce + ciphertext
        s.sendall(struct.pack(">I", len(packet)) + packet)

        print("[+] Log sent securely")



[+] AES key sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
[+] Log sent securely
