In [1]:
import re
import pandas as pd

# 🛠 Define priority mapping for logs
PRIORITY_MAP = {
    "ERROR": "High",
    "WARNING": "Medium",
    "INFO": "Low"
}

# 🛠 Function to parse a single log line
def parse_log_line(line):
    """Extracts timestamp, log level, and message from a log line."""
    match = re.match(r"\[(.*?)\]\s+(\w+):\s+(.*)", line)
    return match.groups() if match else (None, None, line.strip())

# 🛠 Function to check if an IP is invalid
def is_invalid_ip(ip):
    """Checks if an IP is invalid (malformed or incomplete)."""
    valid_ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'  # Example: 192.168.1.1
    incomplete_ip_pattern = r'\b(?:\d{1,3}\.){1,2}\d*\b'  # Example: 192.168.
    if re.fullmatch(valid_ip_pattern, ip):
        return False
    return True

# 🛠 Function to check if an email is invalid
def is_invalid_email(email):
    """Checks if an email is invalid based on formatting."""
    email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
    if not re.fullmatch(email_pattern, email):
        return True
    if '..' in email or email.count('@') != 1:
        return True
    return False

# 🛠 Function to mask emails and IPs inside the message
def mask_message(message, email_map, ip_map):
    """Replaces emails & IPs with generic USERNAME/IP-ADDRESS tokens."""
    email_pattern = r'[\w\.-]+@[\w\.-]+'
    ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'

    # Replaces found email with masked USERNAME
    def replace(match, mapping, token_prefix):
        item = match.group(0)
        if item not in mapping:
            mapping[item] = f"{token_prefix}{len(mapping)+1:02d}"
        return mapping[item]

    masked_message = re.sub(email_pattern, lambda x: replace(x, email_map, "USERNAME-"), message)
    masked_message = re.sub(ip_pattern, lambda x: replace(x, ip_map, "IP-ADDRESS-"), masked_message)

    return masked_message

# 🛠 Function to summarize logs
def summarize_logs(df):
    """Groups logs by level, message, and priority and aggregates first/last timestamps and counts."""
    return df.groupby(['level', 'masked_message', 'priority']).agg(
        first_timestamp=('timestamp', 'min'),
        last_timestamp=('timestamp', 'max'),
        count=('masked_message', 'count'),
        invalid_id=('invalid_id', lambda x: any(x))  # True if any invalid
    ).reset_index()

# 🛠 Function to read logs and process them
def read_log_file(file_path):
    """Reads a log file and processes each line."""
    email_map, ip_map = {}, {}
    data = []

    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            ts, level, message = parse_log_line(line)  # Parse the line
            masked_message = mask_message(message, email_map, ip_map)  # Mask sensitive info
            priority = PRIORITY_MAP.get(level, "Low")  # Assign priority

            # Detect if there is any invalid IP or Email
            invalid_id = any(is_invalid_ip(ip) for ip in re.findall(r'\b(?:\d{1,3}\.){1,3}\d*\b', message)) or \
                         any(is_invalid_email(email) for email in re.findall(r'[\w\.-]+@[\w\.-]+', message))

            data.append((ts, level, message, masked_message, priority, invalid_id))

    # Create a pandas DataFrame
    df = pd.DataFrame(data, columns=["timestamp", "level", "message", "masked_message", "priority", "invalid_id"])
    df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
    return df, email_map, ip_map

# 🛠 Main function to run everything
def main():
    file_path = "sample_log_file 1.txt" //replace with the file
    df, email_map, ip_map = read_log_file(file_path)

    # Print Original Logs
    print("\nOriginal Logs:\n")
    print(df[['timestamp', 'level', 'message']].head(10).to_string(index=False))

    # Print Masked Logs
    print("\nMasked Logs:\n")
    print(df[['timestamp', 'level', 'masked_message', 'priority', 'invalid_id']].head(10).to_string(index=False))

    # Summarize logs
    summary_df = summarize_logs(df)
    print("\nSummary (Masked):\n")
    print(summary_df.head(10).to_string(index=False))

    # Recompose original messages
    email_inverse, ip_inverse = {v: k for k, v in email_map.items()}, {v: k for k, v in ip_map.items()}
    summary_df['recomposed_message'] = summary_df['masked_message'].apply(
        lambda msg: re.sub(r'USERNAME-\d{2}', lambda m: email_inverse.get(m.group(0), m.group(0)),
                re.sub(r'IP-ADDRESS-\d{2}', lambda m: ip_inverse.get(m.group(0), m.group(0)), msg))
    )

    # Print Recomposed Summary
    print("\nSummary (Recomposed):\n")
    print(summary_df[['level', 'recomposed_message', 'priority', 'count', 'first_timestamp', 'last_timestamp', 'invalid_id']]
          .head(10).to_string(index=False))

# 🛠 Execute everything
if __name__ == "__main__":
    main()


FileNotFoundError: [Errno 2] No such file or directory: 'sample_log_file 1.txt'