In [1]:
import hashlib
import csv
import os
import datetime

# --- Configuration ---
TARGET_DIR = "files_to_hash"
OUTPUT_FILE = "file_hashes_report.csv"
HASH_ALGORITHM = "sha256"

def generate_file_hash(filepath: str, block_size: int = 65536) -> str:

    # Create a new hash object for the specified algorithm
    hasher = hashlib.new(HASH_ALGORITHM)
    
    try:
        with open(filepath, 'rb') as f:
            while True:
                # Read file chunk by chunk
                data = f.read(block_size)
                if not data:
                    break
                hasher.update(data)
        
        # Return the final hexadecimal digest
        return hasher.hexdigest()
    
    except FileNotFoundError:
        return "FILE_NOT_FOUND"

def create_hash_report():

    
    # 1. Prepare data and timestamp
    report_data = []
    current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    print(f"--- Starting Hash Generation for files in '{TARGET_DIR}' ---")
    
    # 2. Loop through files
    for filename in os.listdir(TARGET_DIR):
        filepath = os.path.join(TARGET_DIR, filename)
        
        # Skip directories and symbolic links
        if os.path.isfile(filepath):
            file_hash = generate_file_hash(filepath)
            
            # Store the result
            report_data.append([
                filename,
                file_hash,
                current_timestamp 
            ])
            print(f"Hashed: {filename} -> {file_hash[:10]}...")
            
    # 3. Write results to CSV
    try:
        with open(OUTPUT_FILE, 'w', newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            
            # Write header row
            csv_writer.writerow(['File Name', f'{HASH_ALGORITHM.upper()} Hash', 'Timestamp'])
            
            # Write data rows
            csv_writer.writerows(report_data)
            
        print(f"\n Success: Report saved to '{OUTPUT_FILE}' with {len(report_data)} entries.")

    except IOError as e:
        print(f" Error writing CSV file: {e}")

if __name__ == "__main__":
    if not os.path.isdir(TARGET_DIR):
        print(f"Error: Directory '{TARGET_DIR}' not found. Run the setup steps first.")
    else:
        create_hash_report()

--- Starting Hash Generation for files in 'files_to_hash' ---
Hashed: file_a.txt -> 97a02a9384...
Hashed: file_c.dat -> 896ba01813...
Hashed: file_d_copy.dat -> 896ba01813...
Hashed: malware_payload.exe -> 28fc48c88f...

 Success: Report saved to 'file_hashes_report.csv' with 4 entries.


Security systems rely on cryptographic file hashes for integrity checks as they create a unique digital fingerprint of file content,instantly exposing tampering that timestamps or file sizes cannot detect. Malware often evades checks using timestomping or rootkits that hide malicious files. Legitimate system updates are managed by creating a scheduled maintenance window and promoting the new, verified hashes to establish a new trusted baseline.

In [2]:
import hashlib
import os
import json
import time
import datetime
from typing import Dict, Any

# --- Configuration ---
TARGET_DIR = "files_to_hash"
BASELINE_FILE = "trusted_baseline.json"
HASH_ALGORITHM = "sha256"

# --- Hashing Utility (Adapted from previous code) ---

def generate_file_hash(filepath: str, block_size: int = 65536) -> str:
    """Calculates the SHA-256 hash of a file efficiently."""
    hasher = hashlib.new(HASH_ALGORITHM)
    try:
        with open(filepath, 'rb') as f:
            while True:
                data = f.read(block_size)
                if not data:
                    break
                hasher.update(data)
        return hasher.hexdigest()
    except FileNotFoundError:
        return "FILE_NOT_FOUND"

def create_current_hashes(directory: str) -> Dict[str, str]:
    """Generates a dictionary of current file hashes in the target directory."""
    current_hashes = {}
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        if os.path.isfile(filepath):
            current_hashes[filename] = generate_file_hash(filepath)
    return current_hashes


def create_baseline(directory: str) -> None:

    print(f"--- Creating Trusted Baseline for '{directory}' ---")
    baseline_data = {
        'timestamp': datetime.datetime.now().isoformat(),
        'hashes': create_current_hashes(directory)
    }
    
    with open(BASELINE_FILE, 'w') as f:
        json.dump(baseline_data, f, indent=4)
        
    print(f" Baseline saved with {len(baseline_data['hashes'])} files.")

def load_baseline() -> Dict[str, str]:
    if not os.path.exists(BASELINE_FILE):
        print(f" Error: Baseline file '{BASELINE_FILE}' not found. Run create_baseline() first.")
        return {}
        
    with open(BASELINE_FILE, 'r') as f:
        data = json.load(f)
        return data['hashes']



def compare_and_detect_changes(baseline_hashes: Dict[str, str], current_hashes: Dict[str, str]):

    print("\n---  Running Integrity Check and Change Detection ---")
    
    all_files = set(baseline_hashes.keys()) | set(current_hashes.keys())
    changes_detected = 0
    
    for filename in sorted(list(all_files)):
        baseline_hash = baseline_hashes.get(filename)
        current_hash = current_hashes.get(filename)
        
        if baseline_hash is None and current_hash is not None:
            # File is present now, but wasn't in the baseline
            print(f" ADDED: File '{filename}' is new.")
            changes_detected += 1
            
        elif baseline_hash is not None and current_hash is None:
            # File was in the baseline, but is now missing
            print(f" DELETED: File '{filename}' is missing from the system.")
            changes_detected += 1

        elif baseline_hash != current_hash:
            # Hashes do not match
            print(f" MODIFIED: File '{filename}' hash changed!")
            print(f"  > Baseline: {baseline_hash[:10]}...")
            print(f"  > Current:  {current_hash[:10]}...")
            changes_detected += 1
            
        # else: Hashes match, file is unchanged (no output needed)

    print("\n" + "="*40)
    if changes_detected > 0:
        print(f" ALERT: {changes_detected} change(s) detected! Potential tampering.")
    else:
        print(" INTEGRITY CHECK PASSED: No unauthorized changes detected.")
    print("="*40)

# ---------------------------------------------------
# 3. Execution Simulation
# ---------------------------------------------------

if __name__ == "__main__":
    
    if not os.path.isdir(TARGET_DIR):
        print(f"Error: Directory '{TARGET_DIR}' not found. Please create the directory and files first.")
        sys.exit(1)

    # --- SIMULATION STEP 1: Establish Initial Trusted Baseline ---
    create_baseline(TARGET_DIR)

    # --- SIMULATION STEP 2: Wait and Simulate Malware/Tampering ---
    print("\n[Simulating Time passing... Attacker modifies files...]")
    time.sleep(1) 
    
    # 1. Simulate a file modification (content change)
    with open(os.path.join(TARGET_DIR, "file_a.txt"), "a") as f:
        f.write("\n*** MALWARE CODE INJECTED ***")
        
    # 2. Simulate a file deletion
    if os.path.exists(os.path.join(TARGET_DIR, "file_b.log")):
        os.remove(os.path.join(TARGET_DIR, "file_b.log"))
        
    # 3. Simulate a new file being added
    with open(os.path.join(TARGET_DIR, "malware_payload.exe"), "w") as f:
        f.write("I am a virus.")

    # --- SIMULATION STEP 3: Run the Detection Scan ---
    
    baseline = load_baseline()
    current_scan = create_current_hashes(TARGET_DIR)
    
    compare_and_detect_changes(baseline, current_scan)

--- Creating Trusted Baseline for 'files_to_hash' ---
 Baseline saved with 4 files.

[Simulating Time passing... Attacker modifies files...]

---  Running Integrity Check and Change Detection ---
 MODIFIED: File 'file_a.txt' hash changed!
  > Baseline: 97a02a9384...
  > Current:  97c5d38097...

 ALERT: 1 change(s) detected! Potential tampering.


Security systems rely on cryptographic file hashes for integrity checks, as they create a unique digital fingerprint of file content, instantly exposing tampering that timestamps or file sizes cannot detect. Malware often evades checks using timestomping or footkits that hide malicious files.  Legitimate system updates are managed by creating a shecduled maintanance and promoting the verifed hashes to establish a new trusted baseline.

In [3]:
import os
import re
import datetime
from typing import List, Dict

# --- Configuration ---
TARGET_DIR = "files_to_hash" # Use the folder created in the previous example
OUTPUT_REPORT = "suspicious_code_report.txt"

# --- Suspicious Signatures (Regular Expressions) ---
SIGNATURES: List[str] = [
    # Runtime Code Execution: Used to execute dynamically generated or downloaded code.
    r"eval\(", 
    r"exec\(", 
    r"os\.system\(",
    
    # Obfuscation/Decoding: Often used to hide the actual malicious payload.
    r"base64\.b64decode", 
    r"binascii\.unhexlify",
    
    # Network Communication: Used for command and control (C2) or data exfiltration.
    r"socket\.connect", 
    r"urllib\.request\.urlopen",
    r"requests\.(get|post)"
]

def scan_for_signatures(filepath: str) -> Dict[str, List[int]]:
    """
    Scans a single file for predefined suspicious signatures using regular expressions.
    Returns a dictionary mapping detected signature to the line numbers it was found on.
    """
    detections: Dict[str, List[int]] = {}
    
    try:
        with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
            for line_num, line in enumerate(f, 1):
                for signature in SIGNATURES:
                    # Search for the signature pattern in the current line
                    if re.search(signature, line, re.IGNORECASE):
                        if signature not in detections:
                            detections[signature] = []
                        detections[signature].append(line_num)
                        
    except Exception as e:
        print(f"Error reading file {filepath}: {e}")
        
    return detections

def generate_suspicion_report():
    """Loops through all files and generates a final report of detected signatures."""
    
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    print(f"--- Starting Code Signature Scan in '{TARGET_DIR}' ---")
    
    report_lines = [
        "==================================================",
        f"CODE SIGNATURE SCAN REPORT - Run At: {timestamp}",
        f"Target Directory: {TARGET_DIR}",
        "=================================================="
    ]
    
    total_detections = 0
    
    # Iterate over files in the target directory
    for filename in os.listdir(TARGET_DIR):
        filepath = os.path.join(TARGET_DIR, filename)
        
        if os.path.isfile(filepath) and filename.endswith(('.py', '.txt', '.log', '.dat')):
            detections = scan_for_signatures(filepath)
            
            if detections:
                total_detections += 1
                # Use the emoji in the report line
                report_lines.append(f"\n SUSPICIOUS FILE DETECTED: {filename}")
                print(f" DETECTED: {filename}")
                
                for signature, lines in detections.items():
                    report_lines.append(f"  > Signature '{signature}' found on lines: {', '.join(map(str, lines))}")
            
    if total_detections == 0:
        # Use the emoji in the report line
        report_lines.append("\n SCAN COMPLETE: No suspicious signatures found.")
        print(" SCAN COMPLETE: No suspicious signatures found.")

    # Write the full report to a file, explicitly using UTF-8
    try:
        with open(OUTPUT_REPORT, 'w', encoding="utf-8") as f:
            f.write('\n'.join(report_lines))
            
        print(f"\nReport saved to '{OUTPUT_REPORT}'.")
    except Exception as e:
        print(f"Failed to write report file: {e}")
            
    if total_detections == 0:
        report_lines.append("\n SCAN COMPLETE: No suspicious signatures found.")
        print(" SCAN COMPLETE: No suspicious signatures found.")

    # Write the full report to a file
    with open(OUTPUT_REPORT, 'w') as f:
        f.write('\n'.join(report_lines))
        
    print(f"\nReport saved to '{OUTPUT_REPORT}'.")


def inject_test_code():
    """Injects a sample of suspicious code into a file for testing the scanner."""
    print("\n[Injecting simulated suspicious code into 'file_a.txt' for testing...]")
    injection_file = os.path.join(TARGET_DIR, "file_a.txt")
    if os.path.exists(injection_file):
        with open(injection_file, 'a') as f:
            f.write('\n# Normal comment\n')
            f.write('import base64; print(base64.b64decode("c29ja2V0LmNvbm5lY3Q=")) # This is suspicious\n')
            f.write('x = "print(1)"; eval(x) # Also suspicious\n')
            f.write('os.system("ls -la")\n')

if __name__ == "__main__":
    if not os.path.isdir(TARGET_DIR):
        print(f"Error: Directory '{TARGET_DIR}' not found. Please run the setup steps from previous examples.")
    else:
        # Step 1: Inject the test code to guarantee a detection
        inject_test_code()
        
        # Step 2: Run the scanner
        generate_suspicion_report()


[Injecting simulated suspicious code into 'file_a.txt' for testing...]
--- Starting Code Signature Scan in 'files_to_hash' ---
 DETECTED: file_a.txt

Report saved to 'suspicious_code_report.txt'.

Report saved to 'suspicious_code_report.txt'.


Signature-based detection fails against polymorphic and metamorphic viruses because they constantly change their code structure, eliminating the fixed byte sequences needed for matching. Attackers can intentionally use harmless lookin strings to trigger false alarms, forcing security vendors to weaken detection. heuristics and behaviour scans improve detection by monitoring suspicious file attributes and execution actions, regardless of the malware's signature. 

In [None]:
import random
import time
from typing import List, Dict

# --- Configuration ---
TOTAL_HOSTS = 500      # Total number of computers in the network
VULNERABILITY_RATE = 0.25  # Percentage of hosts that are initially vulnerable (25%)
SCAN_RATE_PER_WORM = 5     # Number of hosts one worm can attempt to infect per time step
TIME_STEPS = 20        # Number of simulation steps (e.g., 20 minutes)

# --- Host State Definitions ---
STATE_CLEAN = 0      # Not infected, not vulnerable
STATE_VULNERABLE = 1 # Not infected, but has the exploit target
STATE_INFECTED = 2   # Currently hosting and spreading the worm

class Host:
    def __init__(self, host_id: int, state: int):
        self.id = host_id
        self.state = state
        self.is_vulnerable = (state == STATE_VULNERABLE)
        self.last_scanned_by = None # For tracking purposes
        
    def __repr__(self):
        return f"Host({self.id}, State:{self.state})"

class WormSimulation:

    def __init__(self):
        self.hosts: List[Host] = self._initialize_network()
        self.stats: List[Dict[str, Any]] = []

    def _initialize_network(self) -> List[Host]:

        hosts = []
        num_vulnerable = int(TOTAL_HOSTS * VULNERABILITY_RATE)
        
        # Determine initial states
        initial_states = [STATE_VULNERABLE] * num_vulnerable + [STATE_CLEAN] * (TOTAL_HOSTS - num_vulnerable)
        random.shuffle(initial_states)
        
        # Infect the first vulnerable host to start the spread
        initial_infection_index = [i for i, s in enumerate(initial_states) if s == STATE_VULNERABLE][0]
        initial_states[initial_infection_index] = STATE_INFECTED

        for i, state in enumerate(initial_states):
            hosts.append(Host(i, state))
            
        return hosts

    def step(self, t: int):

        
        infected_hosts = [h for h in self.hosts if h.state == STATE_INFECTED]
        newly_infected_count = 0
        total_scans = 0

        # Create a list of all host IDs for random scanning
        host_ids = list(range(TOTAL_HOSTS))
        
        # Propagation phase
        for infecting_host in infected_hosts:
            # Simulate random scanning strategy
            scan_targets = random.sample(host_ids, min(TOTAL_HOSTS, SCAN_RATE_PER_WORM))
            total_scans += len(scan_targets)
            
            for target_id in scan_targets:
                target_host = self.hosts[target_id]
                
                # Check for successful infection
                if target_host.state == STATE_VULNERABLE:
                    target_host.state = STATE_INFECTED
                    target_host.is_vulnerable = False # No longer vulnerable, now infected
                    newly_infected_count += 1

        # Collect statistics for this step
        infected_count = len([h for h in self.hosts if h.state == STATE_INFECTED])
        vulnerable_count = len([h for h in self.hosts if h.state == STATE_VULNERABLE])
        clean_count = TOTAL_HOSTS - infected_count - vulnerable_count

        self.stats.append({
            'time': t,
            'infected': infected_count,
            'vulnerable': vulnerable_count,
            'clean': clean_count,
            'newly_infected': newly_infected_count,
            'scans': total_scans
        })

    def run(self):

        print(f"Starting Worm Simulation on {TOTAL_HOSTS} hosts...")
        
        for t in range(TIME_STEPS):
            self.step(t)
            infected = self.stats[-1]['infected']
            
            print(f"Time Step {t+1}: Infected Hosts = {infected}/{TOTAL_HOSTS}")
            
            # Stop early if the entire network is infected
            if infected == TOTAL_HOSTS:
                break
                
        self.plot_results()

    def plot_results(self):

        print("\n" + "="*70)
        print("Propagation Dynamics Summary:")
        print("="*70)
        print(f"{'Time':<5}{'Infected':<10}{'Vulnerable':<12}{'Clean':<7}{'New/Step':<10}{'Total Scans':<15}")
        print("-" * 70)
        
        # Print a subset of results for readability (e.g., every 2 steps)
        for i, s in enumerate(self.stats):
            if i % 2 == 0 or i == len(self.stats) - 1:
                print(f"{s['time']+1:<5}{s['infected']:<10}{s['vulnerable']:<12}{s['clean']:<7}{s['newly_infected']:<10}{s['scans']:<15}")

        print("\nKey Insight: The infection curve starts slow but accelerates rapidly (exponential growth) until the number of vulnerable hosts drops.")
        # Trigger an image to visualize the common S-curve of network propagation models
        print("")


if __name__ == "__main__":
    sim = WormSimulation()
    sim.run()

Starting Worm Simulation on 500 hosts...
Time Step 1: Infected Hosts = 2/500
Time Step 2: Infected Hosts = 2/500
Time Step 3: Infected Hosts = 3/500
Time Step 4: Infected Hosts = 6/500
Time Step 5: Infected Hosts = 13/500
Time Step 6: Infected Hosts = 28/500
Time Step 7: Infected Hosts = 48/500
Time Step 8: Infected Hosts = 78/500
Time Step 9: Infected Hosts = 104/500
Time Step 10: Infected Hosts = 115/500
Time Step 11: Infected Hosts = 123/500
Time Step 12: Infected Hosts = 125/500
Time Step 13: Infected Hosts = 125/500
Time Step 14: Infected Hosts = 125/500
Time Step 15: Infected Hosts = 125/500
Time Step 16: Infected Hosts = 125/500
Time Step 17: Infected Hosts = 125/500
Time Step 18: Infected Hosts = 125/500
Time Step 19: Infected Hosts = 125/500
Time Step 20: Infected Hosts = 125/500

Propagation Dynamics Summary:
Time Infected  Vulnerable  Clean  New/Step  Total Scans    
----------------------------------------------------------------------
1    2         123         375    1   

Doubling the scan attempts per host increases teh curve , making it exponentially grow by quickly exhausting the vulnerable population. A worm might choose a local subnet propogation for speed, low latency, and evasion of network perimeter defenses. Rate halting is probably most effective containment strategy here, as it directly limits the worm's key function, rapid scanning. 

In [None]:
import time
import random
import logging
import datetime
from typing import Dict, List, Tuple

# --- Configuration for Detection and Alerting ---
LOG_FILE = "security_alerts.log"
TIME_WINDOW_SECONDS = 5    # Time window for counting connections
RATE_THRESHOLD = 10        # Max allowed connections per TIME_WINDOW_SECONDS
ADMIN_CONTACT = "security@corp.com"

# Setup basic logging to a file
logging.basicConfig(filename=LOG_FILE, 
                    level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Mock data structure to hold connection timestamps
connection_log: List[float] = [] 

# --- Detection Layer: Anomaly Detection ---

def simulate_outbound_connection(is_suspicious: bool = False):

    
    # üö® FIX: The 'global' declaration MUST be the first reference to the variable.
    global connection_log 
    
    current_time = time.time()
    
    # Add connection timestamp
    connection_log.append(current_time)
    
    # Optionally simulate a burst of suspicious connections
    if is_suspicious:
        # Simulate 12 rapid, suspicious connection attempts
        for _ in range(12): 
            connection_log.append(current_time + random.random())
            
    # Clean up old timestamps from the log
    cutoff_time = current_time - TIME_WINDOW_SECONDS
    
    # Keep only connections within the defined time window
    # Now this list comprehension is safe because connection_log is known to be global
    connection_log = [t for t in connection_log if t >= cutoff_time]
    
def check_network_rate_anomaly() -> Tuple[bool, int]:

    current_count = len(connection_log)
    
    if current_count > RATE_THRESHOLD:
        return True, current_count
    else:
        return False, current_count

# --- Mitigation/Resilience Layer: Logging and Alerting ---

def alert_administrator(count: int):

    alert_message = (
        f"HIGH ANOMALY DETECTED: {count} outbound connections in {TIME_WINDOW_SECONDS}s. "
        f"Exceeds threshold of {RATE_THRESHOLD}. Potential worm scanning or C2 traffic."
    )
    
    # Log the event (Host-based integrity log)
    logging.error(alert_message)
    
    # Simulate an external notification (Network-based response)
    print(f"\n--- ALERT ---")
    print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] SECURITY ALERT: Anomaly on host!")
    print(f"Action: Notifying {ADMIN_CONTACT} for immediate host quarantine.")
    print(f"Log Details written to {LOG_FILE}")
    print("-------------\n")

# --- Integration and Simulation ---

def run_monitoring_cycle():
    """Runs the main monitoring loop."""
    print(f"--- üõ°Ô∏è Starting Host Network Monitor ---")
    print(f"Threshold: {RATE_THRESHOLD} connections per {TIME_WINDOW_SECONDS} seconds.")
    
    # 1. Normal Activity Simulation (Prevention successful)
    print("\n[Cycle 1: Simulating normal, low-rate user activity...]")
    for _ in range(5):
        simulate_outbound_connection() # Low rate connection
        time.sleep(1)
        
    anomaly, count = check_network_rate_anomaly()
    print(f"Current rate: {count} connections. Anomaly: {anomaly}")
    
    # 2. Suspicious Activity Simulation (Detection needed)
    print("\n[Cycle 2: Simulating malware launching a rapid scan...]")
    simulate_outbound_connection(is_suspicious=True) # High rate connection injected
    
    anomaly, count = check_network_rate_anomaly()
    print(f"Current rate: {count} connections. Anomaly: {anomaly}")
    
    # 3. Trigger Alert
    if anomaly:
        alert_administrator(count)

    # 4. Check status after alert (Resilience/Containment)
    # The rate drops immediately because the log cleans itself out based on the time window
    time.sleep(TIME_WINDOW_SECONDS) 
    anomaly, count = check_network_rate_anomaly()
    print(f"Current rate (After 5s reset): {count} connections.")
    
    print(f"\nMonitoring cycle complete. Check '{LOG_FILE}' for forensic details.")
    
if __name__ == "__main__":
    run_monitoring_cycle()

--- üõ°Ô∏è Starting Host Network Monitor ---
Threshold: 10 connections per 5 seconds.

[Cycle 1: Simulating normal, low-rate user activity...]
Current rate: 5 connections. Anomaly: False

[Cycle 2: Simulating malware launching a rapid scan...]
Current rate: 17 connections. Anomaly: True

--- ALERT ---
[21:12:58] SECURITY ALERT: Anomaly on host!
Action: Notifying security@corp.com for immediate host quarantine.
Log Details written to security_alerts.log
-------------

Current rate (After 5s reset): 17 connections.

Monitoring cycle complete. Check 'security_alerts.log' for forensic details.


The script demonstrates implementing a basic sliding time window detection mechanism to monitor the rate of outbound network connections and compares the count against a predefined threshold. When the simulated connection rate exceeds the threshold, the script triggers an alerting and logging mechanism, showcasing the security principles of Detect, Alert, and Respond to potential malicious activity like scanning or botnet C2 traffic.