# Week 04 – Malicious Software  
## Networks and Systems Security




---

##  Section 1 – File Integrity Checker

**Goal:** Build a simple file integrity checker that:


In [1]:
import hashlib
import csv
from pathlib import Path
from datetime import datetime

def sha256_file(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

def create_baseline(target_dir: str = "watched_files", baseline_csv: str = "baseline_hashes.csv"):
    target = Path(target_dir)
    if not target.exists():
        print(f"[!] Directory {target_dir} does not exist. Create it and add some test files first.")
        return

    rows = []
    now = datetime.utcnow().isoformat()

    for p in target.rglob("*"):
        if p.is_file():
            file_hash = sha256_file(p)
            rows.append({
                "path": str(p.relative_to(target)),
                "hash": file_hash,
                "timestamp": now
            })

    with open(baseline_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=["path", "hash", "timestamp"])
        writer.writeheader()
        writer.writerows(rows)

    print(f"✅ Baseline saved with {len(rows)} entries to {baseline_csv}")

# Example:
# create_baseline()


---

## Section 2 – Detecting Suspicious File Changes


In [2]:
def load_baseline(baseline_csv: str = "baseline_hashes.csv"):
    baseline = {}
    try:
        with open(baseline_csv, newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                baseline[row["path"]] = row["hash"]
    except FileNotFoundError:
        print(f"[!] Baseline file {baseline_csv} not found.")
    return baseline

def scan_for_changes(target_dir: str = "watched_files", baseline_csv: str = "baseline_hashes.csv"):
    target = Path(target_dir)
    baseline = load_baseline(baseline_csv)
    if not baseline:
        print("[!] No baseline loaded; run create_baseline() first.")
        return

    current_hashes = {}
    for p in target.rglob("*"):
        if p.is_file():
            rel = str(p.relative_to(target))
            current_hashes[rel] = sha256_file(p)

    modified = []
    added = []
    deleted = []

    for path, old_hash in baseline.items():
        new_hash = current_hashes.get(path)
        if new_hash is None:
            deleted.append(path)
        elif new_hash != old_hash:
            modified.append(path)

    for path in current_hashes.keys():
        if path not in baseline:
            added.append(path)

    print("=== Change Report ===")
    print("Modified files:", modified if modified else "None")
    print("Added files   :", added if added else "None")
    print("Deleted files :", deleted if deleted else "None")

# Example:
# scan_for_changes()


---

##  Section 3 – Signature-Based Malware Detection


In [3]:
import re

SIGNATURES = [
    r"eval\(",
    r"base64\.b64decode",
    r"socket\.connect",
    r"exec\(",
    r"import os"
]

compiled_sigs = [re.compile(sig) for sig in SIGNATURES]

def scan_file_for_signatures(path: Path):
    try:
        text = path.read_text(errors="ignore")
    except UnicodeDecodeError:
        return []

    matches = []
    for sig, pattern in zip(SIGNATURES, compiled_sigs):
        if pattern.search(text):
            matches.append(sig)
    return matches

def scan_directory_for_signatures(target_dir: str = "watched_files"):
    target = Path(target_dir)
    if not target.exists():
        print(f"[!] Directory {target_dir} does not exist.")
        return

    results = {}
    for p in target.rglob("*"):
        if p.is_file():
            hits = scan_file_for_signatures(p)
            if hits:
                results[str(p)] = hits

    if not results:
        print("✅ No suspicious signatures found.")
    else:
        print("⚠️ Potentially suspicious files:")
        for path, hits in results.items():
            print(f" - {path}: {hits}")

# Example:
# scan_directory_for_signatures()


---

## Section 4 – Worm Propagation Simulation


In [4]:
import random

def simulate_worm_spread(
    num_hosts: int = 100,
    attempts_per_infected: int = 3,
    infection_probability: float = 0.4,
    steps: int = 15,
    seed: int = 42
):
    random.seed(seed)
    infected = set([0])
    history = [len(infected)]

    for step in range(steps):
        new_infected = set()
        for host in infected:
            for _ in range(attempts_per_infected):
                target = random.randint(0, num_hosts - 1)
                if target not in infected and random.random() < infection_probability:
                    new_infected.add(target)
        infected |= new_infected
        history.append(len(infected))

    return history

history = simulate_worm_spread()
print("Infected hosts over time:", history)
print("Final infected:", history[-1], "out of", 100)


Infected hosts over time: [1, 4, 10, 19, 36, 59, 80, 89, 97, 100, 100, 100, 100, 100, 100, 100]
Final infected: 100 out of 100


In [None]:
import matplotlib.pyplot as plt

history1 = simulate_worm_spread(attempts_per_infected=2)
history2 = simulate_worm_spread(attempts_per_infected=6)

plt.figure()
plt.plot(history1, label="Attempts per host = 2")
plt.plot(history2, label="Attempts per host = 6")
plt.xlabel("Time step")
plt.ylabel("Number of infected hosts")
plt.title("Worm Propagation Simulation")
plt.legend()
plt.show()


---

##  Section 5 – Simple Network Anomaly Monitor (Countermeasure Sketch)


In [5]:
from collections import defaultdict

def simulate_network_activity(num_hosts: int = 20, steps: int = 20, seed: int = 1):
    random.seed(seed)
    logs = []
    for t in range(steps):
        for host in range(num_hosts):
            base = random.randint(0, 3)
            logs.append((t, host, base))
    return logs

def inject_worm_like_host(logs, malicious_host: int = 5, extra_connections: int = 20):
    augmented = []
    for (t, host, conns) in logs:
        if host == malicious_host:
            conns += extra_connections
        augmented.append((t, host, conns))
    return augmented

def detect_anomalous_hosts(logs, threshold: int = 10):
    suspicious = defaultdict(int)
    for (_, host, conns) in logs:
        if conns > threshold:
            suspicious[host] += 1
    print("=== Anomaly Detection Report ===")
    if not suspicious:
        print("No suspicious hosts detected.")
    else:
        for host, count in suspicious.items():
            print(f"Host {host} exceeded threshold on {count} time steps.")

base_logs = simulate_network_activity()
logs_with_worm = inject_worm_like_host(base_logs, malicious_host=7, extra_connections=20)
detect_anomalous_hosts(logs_with_worm, threshold=10)


=== Anomaly Detection Report ===
Host 7 exceeded threshold on 20 time steps.


---

## Key Takeaways & Reflection
The exercise that helped me the most was the worm propagation simulation, because it showed how quickly malware can spread and why network segmentation and monitoring are essential.

To improve these prototypes for a real system, I’d add proper logging, scheduled scans, and stronger behavioural checks instead of relying on simple patterns.

Balancing detection and false positives means using multiple indicators rather than reacting to single events. A layered approach—file integrity, behaviour monitoring, and network analysis—keeps detection accurate without overwhelming the system with noise.
