In [None]:
import re
import pandas as pd
from datetime import datetime, timedelta
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import seaborn as sns

# -------- STEP 1: Parse multi-line Windows Security logs --------
def parse_windows_security_logs(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

    # Split events by 'Audit Success' or 'Audit Failure' lines which start each event block
    events = re.split(r'(?=Audit (Success|Failure))', content)

    parsed_data = []

    for event in events:
        if not event.strip():
            continue

        # Extract timestamp: format is "Audit Success\t18-07-2025 10:24:12\t..."
        timestamp_match = re.search(r'\d{2}-\d{2}-\d{4} \d{2}:\d{2}:\d{2}', event)
        if not timestamp_match:
            continue
        timestamp_str = timestamp_match.group()
        try:
            timestamp = datetime.strptime(timestamp_str, '%d-%m-%Y %H:%M:%S')
        except:
            continue

        # Extract event ID (4624 or 4625)
        event_id_match = re.search(r'Event ID\s*:\s*(\d+)', event)
        if not event_id_match:
            # Sometimes event ID is on header line after "4624"
            event_id_match = re.search(r'\t(4624|4625)\t', event)
        if not event_id_match:
            continue
        event_id = event_id_match.group(1) if event_id_match.groups() else event_id_match.group()
        result = 'success' if event_id == '4624' else 'failure'

        # Extract username - look for line starting with "Account Name:" in New Logon section
        username_match = re.search(r'Account Name:\s*(\S+)', event)
        username = username_match.group(1) if username_match else 'UNKNOWN'

        # Extract IP - "Source Network Address:" line (may be missing)
        ip_match = re.search(r'Source Network Address:\s*([\d\.]+)', event)
        ip = ip_match.group(1) if ip_match else '-'

        parsed_data.append({
            'timestamp': timestamp,
            'username': username,
            'ip': ip,
            'result': result
        })

    return parsed_data

# -------- STEP 2: Create DataFrame --------
def build_dataframe(data):
    df = pd.DataFrame(data)
    df = df.dropna(subset=['timestamp'])
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    return df

# -------- STEP 3: Feature Engineering --------
def engineer_features(df):
    df = df.sort_values(by='timestamp').reset_index(drop=True)
    df['failed_by_ip'] = 0
    df['attempts_by_user'] = 0
    df['is_new_ip_for_user'] = False
    df['success_rate_ip'] = 0.0

    user_history = {}

    for idx, row in df.iterrows():
        time_window_start = row['timestamp'] - timedelta(minutes=5)
        past = df[(df['timestamp'] >= time_window_start) & (df['timestamp'] < row['timestamp'])]

        failed_by_ip = len(past[(past['ip'] == row['ip']) & (past['result'] == 'failure')])
        attempts_by_user = len(past[past['username'] == row['username']])
        new_ip = row['ip'] not in user_history.get(row['username'], set())

        ip_rows = df[(df['ip'] == row['ip']) & (df['timestamp'] < row['timestamp'])]
        total_count = len(ip_rows)
        success_count = len(ip_rows[ip_rows['result'] == 'success'])
        success_rate = success_count / total_count if total_count else 0.0

        df.at[idx, 'failed_by_ip'] = failed_by_ip
        df.at[idx, 'attempts_by_user'] = attempts_by_user
        df.at[idx, 'is_new_ip_for_user'] = int(new_ip)
        df.at[idx, 'success_rate_ip'] = success_rate

        user_history.setdefault(row['username'], set()).add(row['ip'])

    return df

# -------- STEP 4: Anomaly Detection --------
def train_model(df):
    features = df[['failed_by_ip', 'attempts_by_user', 'is_new_ip_for_user', 'success_rate_ip']]
    model = IsolationForest(contamination=0.05, random_state=42)
    df['anomaly'] = model.fit_predict(features)
    return df

# -------- STEP 5: Flag Suspicious --------
def report_suspicious(df):
    suspicious = df[df['anomaly'] == -1]
    print("\nSuspicious login attempts:")
    print(suspicious[['timestamp', 'username', 'ip', 'result']])
    print("\nGrouped by IP:")
    print(suspicious.groupby('ip').size().sort_values(ascending=False))
    print("\nGrouped by Username:")
    print(suspicious.groupby('username').size().sort_values(ascending=False))
    return suspicious

# -------- STEP 6: Visualize Results --------
def visualize(df):
    plt.figure(figsize=(12, 4))
    df['timestamp'].dt.floor('min').value_counts().sort_index().plot()
    plt.title("Login Attempts Over Time")
    plt.ylabel("Count per Minute")
    plt.xlabel("Time")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 4))
    df[df['result'] == 'failure']['ip'].value_counts().head(10).plot(kind='bar')
    plt.title("Top Failed Login IPs")
    plt.ylabel("Failure Count")
    plt.xlabel("IP Address")
    plt.tight_layout()
    plt.show()

    pivot = pd.crosstab(df['username'], df['ip'])
    plt.figure(figsize=(12, 6))
    sns.heatmap(pivot, cmap="YlGnBu", linewidths=.5)
    plt.title("Login Frequency Heatmap (User vs IP)")
    plt.xlabel("IP")
    plt.ylabel("User")
    plt.tight_layout()
    plt.show()

# -------- MAIN PIPELINE --------
def main(log_file_path):
    log_data = parse_windows_security_logs(log_file_path)
    if not log_data:
        print("❌ No valid log entries found. Exiting.")
        return
    df = build_dataframe(log_data)
    df = engineer_features(df)
    df = train_model(df)
    report_suspicious(df)
    visualize(df)

if __name__ == "__main__":
    main("log_file.txt")  # replace with your actual file name


Reading from: log_file.txt
Sample log lines:
ï»¿Keywords	Date and Time	Source	Event ID	Task Category
Audit Success	18-07-2025 10:24:12	Microsoft-Windows-Security-Auditing	4624	Logon	"An account was successfully logged on.


❌ No valid log entries found. Exiting.
