In [1]:
import os
print(os.getcwd())

c:\Users\farre\CYB333 Project\CYB-333-Project


In [5]:
import os

# CONFIRM THIS PATH MATCHES YOUR os.getcwd() OUTPUT EXACTLY
project_root = r'C:\Users\farre\CYB333 Project\CYB-333-Project'

test_file_path = os.path.join(project_root, 'test_access.txt')

print(f"Attempting to read file: {test_file_path}")

try:
    with open(test_file_path, 'r') as f:
        content = f.read()
    print("Successfully read content from test_access.txt:")
    print(content)
except Exception as e:
    print(f"Error reading test_access.txt: {e}")

Attempting to read file: C:\Users\farre\CYB333 Project\CYB-333-Project\test_access.txt
Successfully read content from test_access.txt:
Hello


In [1]:
import pandas as pd
import re
from datetime import datetime

def load_and_parse_logs(file_path):
    """
    Reads log file, parses and restructures into pandas DataFrame.
    """
    parsed_data = []

    # Regex extracts timestamp, log level, and message
    # Assumes log format: "YYYY-MM-DD HH:MM:SS [LEVEL] message" (Note: no milliseconds in current example_log.txt)
    # Group 1: (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) for timestamp
    # Group 2: (INFO|DEBUG|ERROR|WARNING|CRITICAL|ALERT|UNKNOWN) for log level
    # Group 3: (.*) for message
    log_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(INFO|DEBUG|ERROR|WARNING|CRITICAL|ALERT|UNKNOWN)\s+(.*)")

    with open(file_path, 'r') as file:
        for line in file:
            match = log_pattern.match(line)
            if match:
                timestamp_str, level, message = match.groups()
                # Convert timestamp string to datetime object (matches format without milliseconds)
                timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
                parsed_data.append({
                    'timestamp': timestamp_obj,
                    'level': level,
                    'message': message.strip(),
                    'original_log': line.strip() # Keep original log line
                })
            else:
                # If the line does not match the expected format, log it as an unknown entry
                parsed_data.append({
                    'timestamp': None,
                    'level': 'UNKNOWN',
                    'message': line.strip(),
                    'original_log': line.strip()
                })
    # Create DataFrame from dictionaries
    df = pd.DataFrame(parsed_data)
    return df

def filter_logs(df_logs): # Function now correctly accepts a DataFrame
    """
    Filter for patterns in logs.
    """
    # Define patterns to search for in message or level columns
    failed_login_pattern = r"failed login" # Regex pattern for string containment
    critical_alert_levels = ['CRITICAL', 'ALERT', 'ERROR'] # Levels to filter by

    # Use boolean indexing with .str.contains() for messages and .isin() for levels
    # .copy() is used to ensure a new DataFrame is returned, avoiding SettingWithCopyWarning
    filtered_df = df_logs[
        (df_logs['message'].str.contains(failed_login_pattern, case=False, na=False)) |
        (df_logs['level'].isin(critical_alert_levels))
    ].copy()
    return filtered_df

def prioritize_alerts(df_filtered_logs): # Function now correctly accepts a DataFrame
    """
    Prioritize alerts based on severity.
    """
    # Define priority levels
    # Use of .isin() for checking multiple conditions
    prioritized_df = df_filtered_logs[
        df_filtered_logs['level'].isin(['CRITICAL', 'ALERT', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'UNKNOWN'])
    ].copy()

    # Sort by level
    # Define a custom sort order
    level_order = pd.CategoricalDtype(
        ['CRITICAL', 'ALERT', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'UNKNOWN'],
        ordered=True
    )
    prioritized_df['level'] = prioritized_df['level'].astype(level_order)
    prioritized_df = prioritized_df.sort_values(by='level', ascending=False) # Highest priority first
    return prioritized_df

if __name__ == "__main__":
    # Load and parse logs to DataFrame
    df_logs = load_and_parse_logs('sample_logs/example_log.txt') # Corrected filename here

    print("Parsed Logs DataFrame (Head):")
    print(df_logs.head())
    print("\nDataFrame Info:")
    df_logs.info()

    # Call updated filter_logs function with DataFrame
    df_filtered_logs = filter_logs(df_logs)
    print("\nFiltered Logs (DataFrame):") # Descriptive label
    print(df_filtered_logs)

    # Call updated prioritize_alerts function with filtered DataFrame
    df_prioritized_alerts = prioritize_alerts(df_filtered_logs)
    print("\nPrioritized Alerts (DataFrame):") # Descriptive label
    print(df_prioritized_alerts)

Parsed Logs DataFrame (Head):
            timestamp     level  \
0 2025-06-22 10:00:01      INFO   
2 2025-06-22 10:00:10  CRITICAL   
3 2025-06-22 10:00:15      INFO   
4 2025-06-22 10:00:20     ERROR   

                                             message  \
0        User 'solemn' logged in from 192.168.1.100.   
1  Attempted login by 'unknown_user' from 203.0.1...   
2  Malware detected on host 'server-prod-01' path...   
3                        System health check passed.   
4                          Database connection lost.   

                                        original_log  
0  2025-06-22 10:00:01 INFO User 'solemn' logged ...  
2  2025-06-22 10:00:10 CRITICAL Malware detected ...  
3  2025-06-22 10:00:15 INFO System health check p...  
4  2025-06-22 10:00:20 ERROR Database connection ...  

DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype         
---  ------    