<a href="https://colab.research.google.com/github/bcnewman/AI-Driven-Network-Security/blob/main/AIDriven_Lab_Exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np

# Set random seed for reproducibility
np.random.seed(42)

# Generate sample data
n_samples = 1000
data = {
    'duration': np.random.randint(0, 3600, n_samples),  # Connection duration in seconds
    'protocol': np.random.choice(['TCP', 'UDP', 'ICMP'], n_samples),
    'src_bytes': np.random.randint(0, 1000000, n_samples),  # Bytes sent by source
    'dst_bytes': np.random.randint(0, 1000000, n_samples),  # Bytes sent by destination
    'flag': np.random.choice(['S0', 'S1', 'SF', 'REJ'], n_samples),  # Connection status flag
    'land': np.random.choice([0, 1], n_samples, p=[0.99, 0.01]),  # 1 if connection is to/from the same host/port, 0 otherwise
    'wrong_fragment': np.random.randint(0, 3, n_samples),  # Number of wrong fragments
    'urgent': np.random.randint(0, 3, n_samples),  # Number of urgent packets
}

# Create DataFrame
df = pd.DataFrame(data)

# Add a target variable (0 for normal, 1 for intrusion)
df['intrusion'] = np.random.choice([0, 1], n_samples, p=[0.8, 0.2])

# Adjust some features to correlate with intrusions
df.loc[df['intrusion'] == 1, 'duration'] += np.random.randint(1800, 3600, sum(df['intrusion'] == 1))
df.loc[df['intrusion'] == 1, 'src_bytes'] += np.random.randint(500000, 1000000, sum(df['intrusion'] == 1))

# Save to CSV
df.to_csv('simplified_network_data.csv', index=False)

print("Dataset created and saved as 'simplified_network_data.csv'")
print(df.head())
print("\nDataset shape:", df.shape)
print("\nIntrusion distribution:\n", df['intrusion'].value_counts(normalize=True))

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Load the data
print("Loading the dataset...")
df = pd.read_csv('simplified_network_data.csv')

# Display basic information about the dataset
print("\nDataset Overview:")
print(df.info())

# Show the first few rows of the data
print("\nFirst few rows of the dataset:")
print(df.head())

# Describe the numerical features
print("\nSummary statistics of numerical features:")
print(df.describe())

# Display the distribution of the 'protocol' feature
print("\nDistribution of protocols:")
print(df['protocol'].value_counts(normalize=True))

# Visualize the distribution of normal vs. intrusion connections
plt.figure(figsize=(8, 6))
df['intrusion'].value_counts().plot(kind='bar')
plt.title('Distribution of Normal vs. Intrusion Connections')
plt.xlabel('Connection Type (0: Normal, 1: Intrusion)')
plt.ylabel('Count')
plt.show()

# Correlation between numerical features
correlation_matrix = df[['duration', 'src_bytes', 'dst_bytes', 'wrong_fragment', 'urgent', 'intrusion']].corr()
print("\nCorrelation matrix:")
print(correlation_matrix)

# Script to explain the features
print("\nExplanation of key features:")
print("1. duration: The length of the connection in seconds.")
print("2. protocol: The type of protocol used (e.g., TCP, UDP, ICMP).")
print("3. src_bytes: The number of data bytes transferred from source to destination.")
print("4. dst_bytes: The number of data bytes transferred from destination to source.")
print("5. flag: Indicates the status of the connection (e.g., SF for normal connection, S0 for connection attempt rejected).")
print("6. land: Binary feature. 1 if connection is from/to the same host/port; 0 otherwise.")
print("7. wrong_fragment: Number of wrong fragments in the connection.")
print("8. urgent: Number of urgent packets.")
print("9. intrusion: Our target variable. 0 for normal connection, 1 for potential intrusion.")

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split

# Load the data
print("Loading the dataset...")
df = pd.read_csv('simplified_network_data.csv')

# Separate features and target variable
X = df.drop('intrusion', axis=1)
y = df['intrusion']

# Identify numerical and categorical columns
numerical_features = ['duration', 'src_bytes', 'dst_bytes', 'wrong_fragment', 'urgent']
categorical_features = ['protocol', 'flag', 'land']

# Create preprocessing steps for numerical and categorical data
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(drop='first', sparse_output=False), categorical_features)
    ])

# Fit the preprocessor to the data and transform
X_preprocessed = preprocessor.fit_transform(X)

# Get feature names after preprocessing
feature_names = (numerical_features +
                 preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_features).tolist())

# Create a new dataframe with preprocessed data
X_preprocessed_df = pd.DataFrame(X_preprocessed, columns=feature_names)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_preprocessed, y, test_size=0.2, random_state=42)

# Save preprocessed data and target variable
print("Saving preprocessed data...")
X_preprocessed_df.to_csv('preprocessed_features.csv', index=False)
pd.DataFrame(y, columns=['intrusion']).to_csv('target_variable.csv', index=False)

# Save training and testing sets
pd.DataFrame(X_train, columns=feature_names).to_csv('X_train.csv', index=False)
pd.DataFrame(X_test, columns=feature_names).to_csv('X_test.csv', index=False)
pd.DataFrame(y_train, columns=['intrusion']).to_csv('y_train.csv', index=False)
pd.DataFrame(y_test, columns=['intrusion']).to_csv('y_test.csv', index=False)

print("\nShape of training set:", X_train.shape)
print("Shape of testing set:", X_test.shape)

# Display the first few rows of the preprocessed data
print("\nFirst few rows of preprocessed data:")
print(X_preprocessed_df.head())

# Display summary statistics of preprocessed numerical features
print("\nSummary statistics of preprocessed numerical features:")
print(X_preprocessed_df[numerical_features].describe())

print("\nPreprocessed data and splits have been saved as CSV files.")

In [None]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Load the preprocessed data
print("Loading preprocessed data...")
X_train = pd.read_csv('X_train.csv')
X_test = pd.read_csv('X_test.csv')
y_train = pd.read_csv('y_train.csv')['intrusion']
y_test = pd.read_csv('y_test.csv')['intrusion']

# Initialize the Random Forest Classifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)

# Train the model
print("Training the Random Forest Classifier...")
rf_classifier.fit(X_train, y_train)

# Make predictions on the test set
y_pred = rf_classifier.predict(X_test)

# Calculate performance metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

print(f"\nModel Performance:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# Generate and display confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(10,7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()

# Display feature importances
feature_importance = pd.DataFrame({
    'feature': X_train.columns,
    'importance': rf_classifier.feature_importances_
}).sort_values('importance', ascending=False)

plt.figure(figsize=(10,7))
sns.barplot(x='importance', y='feature', data=feature_importance.head(10))
plt.title('Top 10 Most Important Features')
plt.show()

print("\nTop 10 Most Important Features:")
print(feature_importance.head(10))

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, roc_curve, auc

# Load the preprocessed data
print("Loading preprocessed data...")
X_train = pd.read_csv('X_train.csv')
X_test = pd.read_csv('X_test.csv')
y_train = pd.read_csv('y_train.csv')['intrusion']
y_test = pd.read_csv('y_test.csv')['intrusion']

# Initialize and train the Random Forest Classifier
print("Training the Random Forest Classifier...")
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X_train, y_train)

# Make predictions on the test set
y_pred = rf_classifier.predict(X_test)
y_pred_proba = rf_classifier.predict_proba(X_test)[:, 1]

# Calculate performance metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

print(f"\nModel Performance:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# Generate and display confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(10,7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()

# Display feature importances
feature_importance = pd.DataFrame({
    'feature': X_train.columns,
    'importance': rf_classifier.feature_importances_
}).sort_values('importance', ascending=False)

plt.figure(figsize=(10,7))
sns.barplot(x='importance', y='feature', data=feature_importance.head(10))
plt.title('Top 10 Most Important Features')
plt.show()

print("\nTop 10 Most Important Features:")
print(feature_importance.head(10))

# Function to get prediction and feature importances for a single instance
def predict_single_instance(instance):
    prediction = rf_classifier.predict(instance.reshape(1, -1))[0]
    probabilities = rf_classifier.predict_proba(instance.reshape(1, -1))[0]
    return prediction, probabilities

# Test the model on a few specific examples
print("\nTesting the model on specific examples:")
for i in range(5):  # Test 5 random instances
    instance = X_test.iloc[i].values
    prediction, probabilities = predict_single_instance(instance)
    print(f"\nInstance {i+1}:")
    print(f"Predicted class: {'Intrusion' if prediction == 1 else 'Normal'}")
    print(f"Probability of being an intrusion: {probabilities[1]:.4f}")

# Visualize decision boundary (for 2 most important features)
top_features = feature_importance['feature'].head(2).tolist()
X_top2 = X_test[top_features]

plt.figure(figsize=(10, 8))
scatter = plt.scatter(X_top2.iloc[:, 0], X_top2.iloc[:, 1], c=y_pred_proba, cmap='coolwarm')
plt.colorbar(scatter)
plt.xlabel(top_features[0])
plt.ylabel(top_features[1])
plt.title('Decision Boundary Visualization')
plt.show()

# ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(10, 8))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()