<a href="https://colab.research.google.com/github/CyberMetrics/Prototypes/blob/main/Prototype_002.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

A Base Model That Can Be Able To Do Three At a Time(Anomaly,Classify,Time Series)

In [None]:
import pandas as pd
import numpy as np
from collections import Counter

# ----------------- Logistic Regression from scratch -----------------
class SimpleLogisticRegression:
    def __init__(self, lr=0.01, epochs=1000):
        self.lr = lr
        self.epochs = epochs
        self.w = None
        self.b = 0

    def _sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def fit(self, X, y):
        X = np.array(X, dtype=float)
        y = np.array(y, dtype=float)
        n, d = X.shape
        self.w = np.zeros(d)
        self.b = 0

        for _ in range(self.epochs):
            z = X.dot(self.w) + self.b
            pred = self._sigmoid(z)
            grad_w = (1/n) * X.T.dot(pred - y)
            grad_b = (1/n) * np.sum(pred - y)
            self.w -= self.lr * grad_w
            self.b -= self.lr * grad_b

    def predict_proba(self, X):
        z = X.dot(self.w) + self.b
        return self._sigmoid(z)

    def predict(self, X, threshold=0.5):
        return (self.predict_proba(X) >= threshold).astype(int)

# ----------------- Hybrid Security Model -----------------
class HybridSecurityModel:
    def __init__(self):
        self.classifier = SimpleLogisticRegression()
        self.is_trained = False

    # ---------- Numeric Data Pipeline ----------
    def fit_numeric(self, X, y):
        self.classifier.fit(X, y)
        self.is_trained = True

    def analyze_numeric(self, X_batch):
        X_df = pd.DataFrame(X_batch)
        results = []

        # 1. Anomaly detection (z-score)
        mean = X_df.mean()
        std = X_df.std().replace(0, 1)
        zscores = ((X_df - mean)/std).abs()
        anomalies = (zscores > 2).any(axis=1).astype(int)

        # 2. Classification
        preds = self.classifier.predict(X_batch) if self.is_trained else ["Unknown"]*len(X_batch)

        # 3. Sequence analysis (rolling mean spike)
        rolling_mean = X_df.mean(axis=1).rolling(window=3, min_periods=1).mean()
        seq_alert = (X_df.mean(axis=1) > rolling_mean*1.5).astype(int)

        for i in range(len(X_batch)):
            results.append({
                "Sample": i,
                "AnomalyFlag": anomalies.iloc[i],
                "Class": preds[i],
                "SeqAlert": seq_alert.iloc[i],
                "FinalAlert": 1 if anomalies.iloc[i]==1 or seq_alert.iloc[i]==1 else 0
            })
        return pd.DataFrame(results)

    # ---------- Log Data Pipeline ----------
    def analyze_logs(self, df):
        df = df.copy()

        # 1. Anomaly detection (rare events)
        event_counts = Counter(df["EventId"])
        df["AnomalyFlag"] = df["EventId"].apply(lambda x: 1 if event_counts[x]==1 else 0)

        # 2. Classification (rule-based)
        def classify(row):
            content = row["Content"].lower()
            if ("fail" in content or "error" in content) and row["Level"].lower()=="info":
                return "Mismatch"
            return "OK"
        df["ClassCheck"] = df.apply(classify, axis=1)

        # 3. Sequence analysis
        df = df.sort_values("LineId")
        df["LineGap"] = df["LineId"].diff().fillna(0).astype(int)
        df["SeqAlert"] = df["LineGap"].apply(lambda g: 1 if g>1 else 0)

        # Final alert
        df["FinalAlert"] = df.apply(
            lambda r: 1 if (r.AnomalyFlag==1 or r.ClassCheck=="Mismatch" or r.SeqAlert==1) else 0,
            axis=1
        )
        return df[["LineId","EventId","Level","AnomalyFlag","ClassCheck","SeqAlert","FinalAlert"]]



Running Sample Test with Small Amount Of Numeric Data

In [None]:
# ----------------- SAMPLE TEST -----------------
if __name__ == "__main__":
    model = HybridSecurityModel()

    # ---------- Numeric Test ----------
    X_train = np.array([[1,2],[2,1],[2,2],[3,3],[10,10]])
    y_train = np.array([0,0,0,0,1])
    model.fit_numeric(X_train, y_train)

    X_batch = np.array([[2,2],[3,4],[9,9],[11,11],[2,1]])
    print("\n--- Numeric Data Analysis ---")
    print(model.analyze_numeric(X_batch))



--- Numeric Data Analysis ---
   Sample  AnomalyFlag  Class  SeqAlert  FinalAlert
0       0            0      0         0           0
1       1            0      0         0           0
2       2            0      1         1           1
3       3            0      1         0           0
4       4            0      0         0           0


Feeding Our Model real World Data

In [None]:
    # ---------- Log Test ----------
    log_data = [
        [1,"2016-09-28","04:30:30","Info","CBS","Loaded Servicing Stack v6.1.7601.23505...","E23"],
        [2,"2016-09-28","04:30:31","Info","CSI","00000001@2016/9/27:20:30:31.455 WcpInitialize...","E13"],
        [3,"2016-09-28","04:30:31","Info","CSI","00000002@2016/9/27:20:30:31.458 WcpInitialize...","E13"],
        [5,"2016-09-28","04:30:31","Info","CBS","Ending TrustedInstaller initialization.","E17"],
        [11,"2016-09-28","04:30:31","Info","CBS","SQM: Failed to start upload... HRESULT=0x80004005","E39"],
        [14,"2016-09-28","04:30:31","Info","CBS","SQM: Warning: Failed to upload all unsent reports.","E43"],
    ]
    df_logs = pd.DataFrame(log_data, columns=["LineId","Date","Time","Level","Component","Content","EventId"])

    print("\n--- Log Data Analysis ---")
    print(model.analyze_logs(df_logs))



--- Log Data Analysis ---
   LineId EventId Level  AnomalyFlag ClassCheck  SeqAlert  FinalAlert
0       1     E23  Info            1         OK         0           1
1       2     E13  Info            0         OK         0           0
2       3     E13  Info            0         OK         0           0
3       5     E17  Info            1         OK         1           1
4      11     E39  Info            1   Mismatch         1           1
5      14     E43  Info            1   Mismatch         1           1
