In [None]:
import pandas as pd
from scipy.stats import chi2_contingency
from sklearn.model_selection import train_test_split, RandomizedSearchCV,cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
import seaborn as sns
import gc
# Ignore all warnings
import warnings
warnings.filterwarnings("ignore")

In [None]:
#Step 1: Data Exploration and Understanding
# Load the dataset
df = pd.read_csv(r'D:\ranjiny\Guvi_python\capstone_microsoft\GUIDE_Train.csv')

# Step 1 a: Initial inspection
print(df.info())
print(df.describe(include='all'))
print(df.head())


# Distribution of the target variable
sns.countplot(x='IncidentGrade', data=df)
plt.title('Distribution of IncidentGrade')
plt.show()

In [None]:
#Step 1 b: Exploratory Data Analysis (EDA):
df.describe()

In [None]:
columns=['Category','ActionGrouped','ActionGranular','EntityType', 'EvidenceRole', 
        'ResourceType', 'Roles','OSFamily', 'OSVersion','AntispamDirection','SuspicionLevel','LastVerdict']
for col in columns:
    plt.figure(figsize=(12, 6))
    sns.countplot(data=df, x=col, hue='IncidentGrade')
    plt.title(f"{col} vs Incident grade count")
    plt.xticks(rotation=45, ha='right')  # Rotate and align ticks
    plt.tight_layout()  # Adjust layout to prevent clipping
    plt.legend(title='Incident Grade' )
    plt.show()
    
gc.collect()

In [None]:
#crosstab and Chi2 test

#Null Hypothesis : the cols are independent with Incident grade (not correlated)
#Alternate Hypothesis : the cols are correlated
#if p_value is >=0.05 then failed to reject null hypothesis


# Initialize summary list
summary_results = []

# Loop through columns
for col in df.columns:
    if col != 'IncidentGrade':  # Exclude the target column
        crosstab = pd.crosstab(df[col], df['IncidentGrade'])
        print(f"Crosstab of {col} and IncidentGrade:\n{crosstab}\n")
        
        # Perform Chi-Squared Test
        chi2, p, dof, expected = chi2_contingency(crosstab)
        print(f"Chi-squared: {chi2}, p-value: {p}\n")
        

        summary_results.append({
            'Column': col,
            'Chi2 Statistic': chi2,
            'p-value': p,
            'Degrees of Freedom': dof,
            'Significant': p < 0.05
        })

# Create a summary DataFrame
summary_df = pd.DataFrame(summary_results)


print(summary_df)
gc.collect()

In [None]:
#Step 2:Data Preprocessing
# Check for missing values
print(df.isnull().sum())
print(df.info())

In [None]:
# Calculate the percentage of null values for each column
null_percentage = df.isnull().sum() / len(df) * 100

# # Create a DataFrame for better readability
null_percentage_df = pd.DataFrame(null_percentage).reset_index()
null_percentage_df.columns = ['Column', 'Percentage of Nulls']

print(null_percentage_df)
gc.collect()

In [None]:
#Step 2a:Handling Missing Data
#drop columns with null values > 50%
df.drop(columns=['MitreTechniques','ActionGrouped','ActionGranular','EmailClusterId','ThreatFamily','ResourceType','Roles','SuspicionLevel'
                 ,'AntispamDirection','LastVerdict'],axis=1,inplace=True)

#drop null values for df.dropna(inplace=True)
df.dropna(inplace=True)

In [None]:
#Step 2b:Feature Engineering

# Convert timestamps to datetime
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Extract features from the timestamp and drop it
#df['Hour'] = df['Timestamp'].dt.hour
df['Day_of_week'] = df['Timestamp'].dt.dayofweek.astype('int8')

df.drop(columns=['Timestamp'],axis=1,inplace=True)
print(df.info())

gc.collect()

In [None]:
for col in ['Hour','Day_of_week']:
    plt.figure(figsize=(12, 6))
    sns.countplot(data=df, x=col, hue='IncidentGrade')
    plt.title(f"{col} vs Incident grade count")
    plt.xticks(rotation=45, ha='right')  # Rotate and align ticks
    plt.tight_layout()  # Adjust layout to prevent clipping
    plt.legend(title='Incident Grade')
    plt.show()
gc.collect()

for col in ['Hour','Day_of_week']:
    crosstab = pd.crosstab(df[col], df['IncidentGrade'])
    print(f"Crosstab of {col} and IncidentGrade : {crosstab}" )
    # Perform Chi-Squared Test
    chi2, p, dof, expected = chi2_contingency(crosstab)
    print(f"Chi-squared: {chi2}, p-value: {p}")
gc.collect()

In [None]:
#Step 2c:Encoding Categorical Variables: 
gc.collect()
def Top_3_Cate(A):
    print(A)
    top_3_categories = df[A].value_counts().nlargest(3).index

    # Step 2: Replace other categories with 'rest'
    df[A] = df[A].apply(lambda x: x if x in top_3_categories else 'Others')
    return A

for col in ['Category', 'EntityType','EvidenceRole','OSFamily','OSVersion','CountryCode']:
    Top_3_Cate(col)

# One-Hot Encoding
df = pd.get_dummies(df, columns=['Category', 'EntityType','EvidenceRole','OSFamily','OSVersion','CountryCode'],dtype='int8',drop_first=True)

gc.collect()
# Label encoding
model = LabelEncoder()

for col in (['Id','OrgId','IncidentId','AlertId','DetectorId','AlertTitle','DeviceId','Sha256','IpAddress','Url','AccountSid'
             ,'AccountUpn','AccountObjectId','AccountName','DeviceName','NetworkMessageId','RegistryKey','RegistryValueName','RegistryValueData'
            ,'OAuthApplicationId','ApplicationId','ApplicationName','FileName','FolderPath','ResourceIdName','State','City','IncidentGrade']):
    df[col] = model.fit_transform(df[col])

gc.collect()

In [None]:
#Step 3:Data Splitting 
gc.collect()
X = df.drop(columns=['IncidentGrade'],axis=1)  # Drop output column 
y = df['IncidentGrade']

#Step 3a: Train-Validation Split & Step 3b: Stratification
x_train,x_test,y_train,y_test=train_test_split(X,y,test_size=.3,stratify=y, random_state=42)
gc.collect()

In [None]:
def model_metrics(y_input,y_pred):
    #print("*******Train******")
    print(f"Accuracy: {accuracy_score(y_input,y_pred)}")
    print(f"Precision: {precision_score(y_input,y_pred,average='macro')}")
    print(f"Recall : {recall_score(y_input,y_pred,average='macro')}")
    print(f"F1 Score: {f1_score(y_input,y_pred,average='macro')}")
    # Calculate the confusion matrix
    cm = confusion_matrix(y_input, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Class 0', 'Class 1', 'Class 2'])
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Confusion Matrix for train")
    plt.show()
    return

In [None]:
models=[LogisticRegression(),DecisionTreeClassifier(),RandomForestClassifier(),XGBClassifier(),LGBMClassifier()]

for model in models:
    print(type(model).__name__)
    model.fit(x_train,y_train)

    cvs=cross_val_score(model,X,y,cv=5)
    print(f"Cross validation score:{cvs}")
    
    train_pred = model.predict(x_train)
    test_pred = model.predict(x_test)
    print("*******Train******")
    model_metrics(y_train,train_pred)
    print("*******Test******")
    model_metrics(y_test,test_pred)
gc.collect()  

In [None]:
#Neural network model
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# # Convert to PyTorch tensors
x_train, x_test = torch.tensor(x_train.values, dtype=torch.float32), torch.tensor(x_test.values, dtype=torch.float32)
y_train, y_test = torch.tensor(y_train.values, dtype=torch.long), torch.tensor(y_test.values, dtype=torch.long)

# Create TensorDataset and DataLoader for batch processing
train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)

# Create DataLoader instances for batching
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)  # Use batch_size = 64 for training
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)   # Use batch_size = 64 for testing

# Define Neural Network
class CyberNN(nn.Module):
    def __init__(self, size_nn, nclass):
        super(CyberNN, self).__init__()
        self.fc1 = nn.Linear(size_nn, 175)
        self.fc2 = nn.Linear(175, 50)
        self.fc3 = nn.Linear(50, nclass)

    def forward(self, X):
        X = torch.relu(self.fc1(X))
        X = torch.relu(self.fc2(X))
        X = self.fc3(X)
        return X

# Initialize model, criterion, and optimizer
model = CyberNN(len(X.columns), len(y.unique()))
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

# Training Loop
epochs = 10
for epoch in range(1, epochs+1):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    all_train_preds = []
    all_train_labels = []
    
    # Loop over the training data in batches
    for inputs, labels in train_loader:
        optimizer.zero_grad()  # Zero the gradients
        output = model(inputs)  # Forward pass
        loss = criterion(output, labels)  # Calculate loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update the weights
        
        running_loss += loss.item()

        # Collect predictions and labels for metrics calculation
        _, preds = torch.max(output, 1)
        all_train_preds.append(preds)
        all_train_labels.append(labels)
        
    # if epoch % 10 ==0 or epoch == epochs:
    # # Calculate the average loss for the epoch
    print(f"{epoch}/{epochs} Loss: {running_loss / len(train_loader):.5f}")

# Evaluate the Model on Training Data
with torch.no_grad():
    model.eval()  # Set the model to evaluation mode
    
    # Get predictions and labels for training data
    all_train_preds = torch.cat(all_train_preds)
    all_train_labels = torch.cat(all_train_labels)
    
       
    print("*******Train******")
    model_metrics(all_train_labels, all_train_preds)

# Evaluate the Model on Testing Data
with torch.no_grad():
    all_test_preds = []
    all_test_labels = []
    
    # Loop over the test data in batches
    for inputs, labels in test_loader:
        output = model(inputs)  # Forward pass
        _, preds = torch.max(output, 1)
        
        all_test_preds.append(preds)
        all_test_labels.append(labels)
    
    # Concatenate all test predictions and labels
    all_test_preds = torch.cat(all_test_preds)
    all_test_labels = torch.cat(all_test_labels)
    

    print("*******Test******")
    model_metrics(all_test_labels,all_test_preds)

