<a href="https://colab.research.google.com/github/harikareddy114/HumanVSBot/blob/main/detect_bots.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount("/content/drive")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np
import joblib


In [None]:
xgb_model = joblib.load(
    "/content/drive/MyDrive/Account_Remediation/bot_detector_xgb.pkl"
)
print("Model loaded")

Model loaded


In [None]:
CANONICAL_COLUMNS = {
    "timegenerated": ["timegenerated", "time generated"],
    "correlationid": ["correlationid", "correlation id"],
    "userprincipalname": ["userprincipalname", "email"],
    "ipaddress": ["ipaddress", "ip"],
    "remediation_status": ["status (truep / falsep)", "truep/falsep"],
    "email_count": ["email_count", "emailcount"],
    "ip_count": ["ip_count", "ipcount"]
}


In [None]:
def normalize(text):
    return (
        str(text)
        .lower()
        .strip()
        .replace(" ", "")
        .replace("_", "")
        .replace("/", "")
        .replace("(", "")
        .replace(")", "")
    )


In [None]:
def detect_header_row(df, max_rows=5):
    for i in range(max_rows):
        values = [normalize(v) for v in df.iloc[i].values]

        matches = 0
        for variants in CANONICAL_COLUMNS.values():
            if any(normalize(v) in values for v in variants):
                matches += 1

        if matches >= 2:
            return i

    return None


In [None]:
def standardize_columns(df):
    col_map = {}

    for col in df.columns:
        col_norm = normalize(col)
        for canonical, variants in CANONICAL_COLUMNS.items():
            if col_norm in [normalize(v) for v in variants]:
                col_map[col] = canonical
                break

    return df.rename(columns=col_map)


In [None]:
def load_sheet_safely(file_path, sheet):
    raw = pd.read_excel(file_path, sheet_name=sheet, header=None)

    header_row = detect_header_row(raw)

    if header_row is None:
        print(f"⚠️ Skipping {sheet}: header not found")
        return None

    df = pd.read_excel(file_path, sheet_name=sheet, header=header_row)
    df = standardize_columns(df)

    required = {"timegenerated", "correlationid"}
    if not required.issubset(df.columns):
        print(f"⚠️ Skipping {sheet}: missing required columns")
        return None

    return df


In [None]:
FEATURES = [
    "total_logins",
    "session_duration_sec",
    "avg_time_diff_sec",
    "min_time_diff_sec",
    "unique_emails",
    "email_count",
    "ip_count",
    "logins_per_min",
]


In [None]:
def build_features(df):
    df = df.sort_values("timegenerated")

    df["time_diff_sec"] = (
        df["timegenerated"]
        .diff()
        .dt.total_seconds()
    )

    total_logins = len(df)

    session_duration_sec = (
        df["timegenerated"].max() - df["timegenerated"].min()
    ).total_seconds() if total_logins > 1 else 0

    avg_time_diff_sec = df["time_diff_sec"].mean() or 0
    min_time_diff_sec = df["time_diff_sec"].min() or 0

    unique_emails = df["userprincipalname"].nunique()

    email_count = (
        df["email_count"].max()
        if "email_count" in df.columns
        else 0
    )

    ip_count = (
        df["ip_count"].max()
        if "ip_count" in df.columns
        else 0
    )

    logins_per_min = (
        total_logins / (session_duration_sec / 60 + 1)
    )

    feature_values = [
        total_logins,
        session_duration_sec,
        avg_time_diff_sec,
        min_time_diff_sec,
        unique_emails,
        email_count,
        ip_count,
        logins_per_min,
    ]

    return pd.DataFrame([feature_values], columns=FEATURES)


In [None]:
def classify_session(features, xgb_model):
    prob = xgb_model.predict_proba(features)[:, 1][0]

    if prob >= 0.95:
        return "True Positive", prob
    elif prob >= 0.80:
        return "SUSPICIOUS", prob
    else:
        return "False Positive", prob


In [None]:
file_path = "/content/drive/MyDrive/Account_Remediation/testing.xlsx"
xls = pd.ExcelFile(file_path)

results = []

for sheet in xls.sheet_names:
    print(f"\nProcessing sheet: {sheet}")

    df = load_sheet_safely(file_path, sheet)
    if df is None:
        continue

    df["timegenerated"] = pd.to_datetime(
        df["timegenerated"], utc=True, errors="coerce"
    )

    for cid, group in df.groupby("correlationid"):
        features = build_features(group)
        decision, prob = classify_session(features, xgb_model)

        results.append({
            "sheet": sheet,
            "correlationid": cid,
            "decision": decision,
            "bot_probability": round(prob, 3)
        })



Processing sheet: Sheet1

Processing sheet: Sheet2

Processing sheet: Sheet3

Processing sheet: Sheet4

Processing sheet: Sheet5

Processing sheet: Sheet6

Processing sheet: Sheet7

Processing sheet: Sheet9

Processing sheet: Sheet10

Processing sheet: Sheet16


In [None]:
final_df = pd.DataFrame(results)
final_df.head()


Unnamed: 0,sheet,correlationid,decision,bot_probability
0,Sheet1,000e1bbf-3ad3-433d-af8d-234bf9975bc1,False Positive,0.596
1,Sheet1,00c0c254-1f74-46b8-bef8-966f5c97f651,False Positive,0.132
2,Sheet1,00d3a6b8-f127-4273-8cf2-b0afb0b09b0f,True Positive,0.96
3,Sheet1,0117272b-06eb-4755-8e7b-e7acf92371ad,True Positive,0.985
4,Sheet1,01213c82-1379-48a3-b3af-5a41656b0bcf,False Positive,0.05


In [None]:
final_df.to_excel("bot_detection_results.xlsx", index=False)

