<a href="https://colab.research.google.com/github/Ananya-Ahuja/Anomalies/blob/main/mac_ip_mismatch_module.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
from datetime import datetime

In [None]:
def parse_datetime(dt_str):
    try:
        return datetime.fromisoformat(dt_str)
    except Exception:
        return pd.NaT

### User Input

In [None]:
# Asking the user to input the firewall logs
def load_firewall_log():
    file_path = input("Enter the path to your firewall log CSV (with mac address): ")
    df = pd.read_csv(file_path)
    df['Timestamp'] = df['Timestamp'].apply(parse_datetime)
    return df

In [None]:
firewall_df = load_firewall_log()

Enter the path to your firewall log CSV (with mac address): /content/enhanced_firewall_log_with_mac.csv


In [None]:
# Asking the user to input the DHCP lease records
def load_dhcp_leases():
    file_path = input("Enter the path to your DHCP lease records CSV: ")
    df = pd.read_csv(file_path)
    df['Lease Start'] = df['Lease Start'].apply(parse_datetime)
    df['Lease End'] = df['Lease End'].apply(parse_datetime)
    return df

In [None]:
dhcp_df = load_dhcp_leases()

Enter the path to your DHCP lease records CSV: /content/dhcp_lease_records.csv


## Finding expected mac addresses of IP addresses

In [None]:
# The following function creates a lookup table (dictionary) from a DataFrame of DHCP lease records.
# This lookup will be used to efficiently determine which MAC address was assigned to a given IP address during specific time intervals.
def build_lease_lookup(dhcp_df):
    lease_dict = {}
    for _, row in dhcp_df.iterrows():
        ip = row['IP Address']
        mac = row['MAC Address']
        lease_start = row['Lease Start']
        lease_end = row['Lease End']
        lease_dict.setdefault(ip, []).append((lease_start, lease_end, mac))
    return lease_dict

In [None]:
# This function is used to determine which MAC address is supposed to be using a given IP address at a specific point in time, based on DHCP lease records.
def find_expected_mac(ip, timestamp, lease_dict):
    if ip not in lease_dict:
        return None
    for lease_start, lease_end, mac in lease_dict[ip]:
        if lease_start <= timestamp <= lease_end:
            return mac
    return None

In [None]:
# This function compares the observed records of ip and mac and
def detect_mac_ip_mismatches(firewall_df, lease_dict):
    anomalies = []
    for idx, row in firewall_df.iterrows():
        ip = row['Source IP']
        mac = row['Source MAC']
        timestamp = row['Timestamp']
        if not mac or pd.isna(timestamp):
            continue  # Skip if MAC or timestamp is missing because timestamp is necessary for comparison as DHCP lease records are valid for a particular time
        expected_mac = find_expected_mac(ip, timestamp, lease_dict)
        if expected_mac and mac.lower() != expected_mac.lower(): # if a mac address is present in the DHCP and it is not equal to the observed mac address
            anomaly = row.to_dict()
            anomaly['Expected MAC'] = expected_mac
            anomalies.append(anomaly)
    return pd.DataFrame(anomalies)

In [None]:
lease_dict = build_lease_lookup(dhcp_df)
anomalies_df = detect_mac_ip_mismatches(firewall_df, lease_dict)
if not anomalies_df.empty:
  anomalies_df.to_csv('mac_ip_mismatch_anomalies.csv', index=False)
  print(f"Flagged {len(anomalies_df)} MAC/IP mismatch events. Results saved to 'mac_ip_mismatch_anomalies.csv'.")
else:
  print("No MAC/IP mismatches detected.")

Flagged 7 MAC/IP mismatch events. Results saved to 'mac_ip_mismatch_anomalies.csv'.
