<a href="https://colab.research.google.com/github/ebamberg/research-projects-ml/blob/main/utilities/synthetic_data_generator/synthetic_logfiles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import random
import datetime
from typing import List, Dict

# Configuration
IP_POOLS = {
    'normal': [
        '192.168.1.10', '192.168.1.15', '192.168.1.20', '192.168.1.25',
        '10.0.0.100', '10.0.0.105', '10.0.0.110', '172.16.0.50',
        '203.0.113.45', '198.51.100.78', '172.16.1.200'
    ],
    'suspicious': [
        '185.220.101.42', '77.88.55.80', '94.102.49.190',
        '159.203.178.9', '134.195.196.26', '91.189.89.199'
    ]
}

USERNAMES = {
    'normal': [
        'john.doe', 'jane.smith', 'mike.johnson', 'sarah.wilson', 'david.brown',
        'lisa.davis', 'tom.anderson', 'emma.taylor', 'alex.martin', 'anna.garcia',
        'chris.rodriguez', 'maria.lopez', 'kevin.lee', 'amy.white', 'robert.clark'
    ],
    'suspicious': [
        'admin', 'root', 'administrator', 'test', 'guest', 'user123',
        'tempuser', 'backup', 'service', 'default'
    ]
}

NORMAL_MESSAGES = [
    "User authentication successful",
    "Session started",
    "Loading user dashboard",
    "Accessing user profile",
    "Database query executed",
    "API call to user service",
    "File upload initiated",
    "Report generation started",
    "Email notification sent",
    "Cache refresh completed",
    "User preferences loaded",
    "Data validation successful",
    "Transaction processed",
    "Backup operation completed",
    "Session timeout warning",
    "User logout successful",
    "Connection established",
    "Resource access granted",
    "Form submission processed",
    "Security check passed"
]

SUSPICIOUS_MESSAGES = [
    "Multiple failed login attempts detected",
    "SQL injection attempt blocked",
    "Unauthorized access attempt",
    "Privilege escalation detected",
    "Suspicious file access pattern",
    "Port scanning activity detected",
    "Brute force attack identified",
    "Abnormal data access volume",
    "Security policy violation",
    "Malformed request detected",
    "Directory traversal attempt",
    "Cross-site scripting attempt blocked",
    "Authentication bypass attempt",
    "Suspicious API call pattern",
    "Elevated permissions requested",
    "System file access attempt",
    "Network intrusion detected",
    "Data exfiltration pattern",
    "Session hijacking attempt",
    "Buffer overflow attempt detected"
]

LOG_LEVELS = ['INFO', 'DEBUG', 'ERROR']

def generate_session_id():
    """Generate a realistic session ID"""
    return f"sess_{random.randint(100000, 999999)}"

def generate_timestamp(base_time, offset_minutes=0):
    """Generate timestamp with optional offset"""
    return (base_time + datetime.timedelta(minutes=offset_minutes)).strftime("%Y-%m-%d %H:%M:%S")

def create_normal_session():
    """Create a normal user session with 5-8 log entries"""
    session_id = generate_session_id()
    ip = random.choice(IP_POOLS['normal'])
    username = random.choice(USERNAMES['normal'])
    base_time = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))

    logs = []
    num_entries = random.randint(5, 8)

    # First entry - always login
    logs.append({
        'ip': ip,
        'session_id': session_id,
        'username': username,
        'timestamp': generate_timestamp(base_time),
        'message': "User authentication successful",
        'log_level': 'INFO'
    })

    # Middle entries - normal activities
    for i in range(1, num_entries - 1):
        logs.append({
            'ip': ip,
            'session_id': session_id,
            'username': username,
            'timestamp': generate_timestamp(base_time, i * random.randint(1, 5)),
            'message': random.choice(NORMAL_MESSAGES),
            'log_level': random.choice(['INFO', 'DEBUG'])
        })

    # Last entry - logout
    logs.append({
        'ip': ip,
        'session_id': session_id,
        'username': username,
        'timestamp': generate_timestamp(base_time, num_entries * random.randint(2, 8)),
        'message': "User logout successful",
        'log_level': 'INFO'
    })

    return logs

def create_suspicious_session():
    """Create a suspicious session with anomalous patterns"""
    session_id = generate_session_id()
    base_time = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))

    # Choose type of suspicious activity
    attack_type = random.choice([
        'brute_force', 'privilege_escalation', 'data_exfiltration',
        'sql_injection', 'multiple_failed_logins'
    ])

    logs = []

    if attack_type == 'brute_force':
        # Multiple IPs, failed logins
        ips = random.sample(IP_POOLS['suspicious'], 2)
        username = random.choice(USERNAMES['suspicious'])

        for i in range(8):
            logs.append({
                'ip': random.choice(ips),
                'session_id': session_id,
                'username': username,
                'timestamp': generate_timestamp(base_time, i * 2),
                'message': f"Authentication failed for user {username}",
                'log_level': 'ERROR'
            })

        logs.append({
            'ip': ips[0],
            'session_id': session_id,
            'username': username,
            'timestamp': generate_timestamp(base_time, 20),
            'message': "Brute force attack identified",
            'log_level': 'ERROR'
        })

    elif attack_type == 'privilege_escalation':
        ip = random.choice(IP_POOLS['suspicious'])
        username = random.choice(USERNAMES['normal'])

        logs.extend([
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time), 'message': "User authentication successful", 'log_level': 'INFO'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 2), 'message': "Elevated permissions requested", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 4), 'message': "System file access attempt", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 6), 'message': "Privilege escalation detected", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 8), 'message': "Security policy violation", 'log_level': 'ERROR'},
        ])

    elif attack_type == 'data_exfiltration':
        ip = random.choice(IP_POOLS['suspicious'])
        username = random.choice(USERNAMES['normal'])

        logs.extend([
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time), 'message': "User authentication successful", 'log_level': 'INFO'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 2), 'message': "Database query executed", 'log_level': 'INFO'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 4), 'message': "Abnormal data access volume", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 6), 'message': "Data exfiltration pattern", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 8), 'message': "Suspicious file access pattern", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 10), 'message': "Network intrusion detected", 'log_level': 'ERROR'},
        ])

    elif attack_type == 'sql_injection':
        ip = random.choice(IP_POOLS['suspicious'])
        username = random.choice(USERNAMES['normal'])

        logs.extend([
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time), 'message': "User authentication successful", 'log_level': 'INFO'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 2), 'message': "Database query executed", 'log_level': 'INFO'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 4), 'message': "Malformed request detected", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 6), 'message': "SQL injection attempt blocked", 'log_level': 'ERROR'},
            {'ip': ip, 'session_id': session_id, 'username': username,
             'timestamp': generate_timestamp(base_time, 8), 'message': "Security policy violation", 'log_level': 'ERROR'},
        ])

    elif attack_type == 'multiple_failed_logins':
        ip = random.choice(IP_POOLS['suspicious'])
        usernames = random.sample(USERNAMES['suspicious'], 3)

        for i, user in enumerate(usernames):
            for j in range(3):
                logs.append({
                    'ip': ip,
                    'session_id': session_id,
                    'username': user,
                    'timestamp': generate_timestamp(base_time, i*10 + j*2),
                    'message': f"Authentication failed for user {user}",
                    'log_level': 'ERROR'
                })

        logs.append({
            'ip': ip,
            'session_id': session_id,
            'username': 'system',
            'timestamp': generate_timestamp(base_time, 35),
            'message': "Multiple failed login attempts detected",
            'log_level': 'ERROR'
        })

    return logs

def generate_log_dataset():
    """Generate complete log dataset with 50 sessions"""
    all_logs = []

    # Generate 40 normal sessions
    print("Generating 40 normal sessions...")
    for i in range(40):
        session_logs = create_normal_session()
        all_logs.extend(session_logs)

    # Generate 10 suspicious sessions
    print("Generating 10 suspicious sessions...")
    for i in range(10):
        session_logs = create_suspicious_session()
        all_logs.extend(session_logs)

    # Sort by timestamp
    all_logs.sort(key=lambda x: x['timestamp'])

    return all_logs

def export_to_csv(logs, filename="logfile_testdata.csv"):
    """Export logs to CSV format"""
    import csv

    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['ip', 'session_id', 'username', 'timestamp', 'message', 'log_level']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        for log in logs:
            writer.writerow(log)

    print(f"Dataset exported to {filename}")

def print_sample_logs(logs, num_samples=20):
    """Print sample log entries"""
    print(f"\nSample of {num_samples} log entries:")
    print("-" * 120)
    print(f"{'IP':<15} {'Session ID':<12} {'Username':<15} {'Timestamp':<19} {'Level':<5} {'Message':<50}")
    print("-" * 120)

    for log in logs[:num_samples]:
        print(f"{log['ip']:<15} {log['session_id']:<12} {log['username']:<15} "
              f"{log['timestamp']:<19} {log['log_level']:<5} {log['message']:<50}")

def analyze_dataset(logs):
    """Provide dataset statistics"""
    sessions = set(log['session_id'] for log in logs)
    users = set(log['username'] for log in logs)
    ips = set(log['ip'] for log in logs)

    log_levels = {}
    for log in logs:
        level = log['log_level']
        log_levels[level] = log_levels.get(level, 0) + 1

    suspicious_keywords = ['failed', 'attack', 'unauthorized', 'violation', 'injection', 'intrusion']
    suspicious_count = sum(1 for log in logs if any(keyword in log['message'].lower() for keyword in suspicious_keywords))

    print(f"\nDataset Statistics:")
    print(f"Total log entries: {len(logs)}")
    print(f"Unique sessions: {len(sessions)}")
    print(f"Unique users: {len(users)}")
    print(f"Unique IP addresses: {len(ips)}")
    print(f"Log levels distribution: {log_levels}")
    print(f"Entries with suspicious indicators: {suspicious_count}")

# Generate the dataset
if __name__ == "__main__":
    print("Generating log file test data...")
    logs = generate_log_dataset()

    # Export to CSV
    export_to_csv(logs)

    # Show sample and statistics
    print_sample_logs(logs)
    analyze_dataset(logs)

    print(f"\nDataset generation complete!")
    print(f"- 40 normal sessions with typical user behavior")
    print(f"- 10 suspicious sessions with various attack patterns")
    print(f"- Each session contains 5-8 log entries")
    print(f"- Total entries: {len(logs)}")

Generating log file test data...
Generating 40 normal sessions...
Generating 10 suspicious sessions...
Dataset exported to logfile_testdata.csv

Sample of 20 log entries:
------------------------------------------------------------------------------------------------------------------------
IP              Session ID   Username        Timestamp           Level Message                                           
------------------------------------------------------------------------------------------------------------------------
192.168.1.15    sess_848345  alex.martin     2025-07-21 13:59:19 INFO  User authentication successful                    
192.168.1.15    sess_848345  alex.martin     2025-07-21 14:01:19 INFO  Database query executed                           
192.168.1.15    sess_848345  alex.martin     2025-07-21 14:03:19 DEBUG Cache refresh completed                           
192.168.1.15    sess_848345  alex.martin     2025-07-21 14:07:19 DEBUG Email notification sent     