In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from scipy.stats import norm, expon, uniform, pareto
from sklearn.metrics import mean_squared_error
import warnings

# Task 1

# load the dataset
original = pd.read_csv("C:\\Users\\mosta\\Downloads\\Train_data.csv")
data = original

# identify the target column and separate features
class_col = 'class'
X = data.drop(columns=[class_col])
y = data[class_col]

# split data into training (70%) and testing (30%) sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# separate numeric and categorical columns
numeric_cols = X_train.select_dtypes(include=['float64', 'int64']).columns
categorical_cols = X_train.select_dtypes(include=['object']).columns

# preprocess numeric data (scale to compute Z-scores)
scaler = StandardScaler()
X_train_numeric = scaler.fit_transform(X_train[numeric_cols])
X_test_numeric = scaler.transform(X_test[numeric_cols])

# preprocess categorical data (one-hot encoding)
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
X_train_categorical = encoder.fit_transform(X_train[categorical_cols])
X_test_categorical = encoder.transform(X_test[categorical_cols])

# combine processed numeric and categorical data
X_train_processed = np.hstack((X_train_numeric, X_train_categorical))
X_test_processed = np.hstack((X_test_numeric, X_test_categorical))

# convert the processed data into DataFrame for Z-score computation
X_train_processed_df = pd.DataFrame(X_train_processed)
X_test_processed_df = pd.DataFrame(X_test_processed)

# compute Z-scores
z_scores = np.abs((X_train_processed_df - X_train_processed_df.mean()) / X_train_processed_df.std())

# experiment with different thresholds
thresholds = np.arange(start= 2, stop= 3, step= .05)
results = []

for threshold in thresholds:
    print(f"\nThreshold: {threshold}")

    # predict anomalies for test data
    z_test = np.abs((X_test_processed_df - X_train_processed_df.mean()) / X_train_processed_df.std())
    anomaly_preds = (z_test > threshold).any(axis=1).astype(int)
    # convert y_test to binary (assuming 'normal' = 0, 'anomaly' = 1)
    y_test_binary = (y_test == 'anomaly').astype(int)

    # Calculate metrics
    accuracy = accuracy_score(y_test_binary, anomaly_preds)
    precision = precision_score(y_test_binary, anomaly_preds)
    recall = recall_score(y_test_binary, anomaly_preds)

    print(f"Accuracy: {accuracy:.2f}, Precision: {precision:.2f}, Recall: {recall:.2f}")
    anomalies_detected = anomaly_preds.sum()
    print(f"Anomalies Detected: {anomalies_detected}")
    results.append((threshold, accuracy, precision, recall))

# plot performance metrics for different thresholds
thresholds, accuracies, precisions, recalls = zip(*results)
plt.figure(figsize=(10, 6))
plt.plot(thresholds, accuracies, label='Accuracy', marker='o')
plt.plot(thresholds, precisions, label='Precision', marker='o')
plt.plot(thresholds, recalls, label='Recall', marker='o')
plt.xlabel('Z-Score Threshold')
plt.ylabel('Metric Value')
plt.title('Performance Metrics vs Z-Score Thresholds')
plt.legend()
plt.grid()
plt.show()

Data preparation is complete!


In [None]:
#task 2
original_data = pd.read_csv("notebook/Train_data.csv")
train_data = pd.read_csv("notebook/Train_data.csv")

# Bring the 'class' column back from original data
train_data['class'] = original_data.loc[train_data.index, 'class']

# Convert 'class' column to binary values
train_data['class'] = train_data['class'].map({'anomaly': 1, 'normal': 0})

# Exclude columns with zero variance
numerical_columns = train_data.select_dtypes(include=['float64', 'int64']).columns
numerical_columns = [col for col in numerical_columns if train_data[col].std() > 0]

# Replace NaN/Inf values
train_data = train_data.replace([np.inf, -np.inf], np.nan).dropna()

# Function to calculate MSE
def calculate_mse(data, dist, params):
    """Calculate the Mean Square Error for a given distribution."""
    pdf = dist.pdf(data, *params)
    mse = ((data - pdf) ** 2).mean()
    return mse

# Function to fit a distribution and compute MSE
def fit_distribution(data, dist_name):
    """Fit a distribution to the data and calculate MSE."""
    dist = getattr(stats, dist_name)
    params = dist.fit(data)
    mse = calculate_mse(data, dist, params)
    return mse, params

# Numerical columns
numerical_columns = train_data.select_dtypes(include=['float64', 'int64']).columns

# Results dictionary
pdf_results = {}

# Handle NaN values and skip columns with NaN data
for column in numerical_columns:
    print(f"Processing column: {column}")
    data = train_data[column]
    
    # Skip column if it contains NaN values
    if data.isna().sum() > 0:
        print(f"Skipping column {column} due to NaN values.")
        continue
    
    # Skip column if it has constant or zero variance
    if data.nunique() <= 1 or data.std() == 0:
        print(f"Skipping column {column} due to insufficient variance or constant values.")
        continue
    
    column_results = {}
    distributions = ['norm', 'expon', 'uniform', 'pareto']
    
    for dist_name in distributions:
        mse_all, params_all = fit_distribution(data, dist_name)
        column_results[dist_name] = {'mse': mse_all, 'params': params_all}
    
    # Choose the best distribution
    best_fit = min(column_results.items(), key=lambda x: x[1]['mse'])
    print(f"Best fit for {column}: {best_fit[0]} with MSE {best_fit[1]['mse']}")

In [None]:
# Convert 'class' column to binary (0 for normal, 1 for anomaly)
train_data['class'] = train_data['class'].map({'normal': 0, 'anomaly': 1})

# Identify categorical columns
categorical_columns = train_data.select_dtypes(include=['object']).columns

# Function to calculate PMF
def calculate_pmf(data):
    """Calculate the Probability Mass Function (PMF) for categorical data."""
    value_counts = data.value_counts()
    total_count = len(data)
    pmf = value_counts / total_count
    return pmf

# Dictionary to store the PMFs for all categorical columns
pmf_results = {}

# Calculate PMF for each categorical column
for column in categorical_columns:
    print(f"Calculating PMF for column: {column}")
    
    # Calculate the overall PMF for the column
    pmf_results[column] = {}
    pmf_results[column]['overall_pmf'] = calculate_pmf(train_data[column])
    
    # Calculate the conditional PMF for anomaly (class == 1)
    anomaly_data = train_data[train_data['class'] == 1]
    pmf_results[column]['anomaly_pmf'] = calculate_pmf(anomaly_data[column])
    
    # Calculate the conditional PMF for normal (class == 0)
    normal_data = train_data[train_data['class'] == 0]
    pmf_results[column]['normal_pmf'] = calculate_pmf(normal_data[column])

# Print PMF results for verification
for column, pmf_data in pmf_results.items():
    print(f"\nPMF for column: {column}")
    print("Overall PMF:")
    print(pmf_data['overall_pmf'])
    print("Conditional PMF for Anomalies (class=1):")
    print(pmf_data['anomaly_pmf'])
    print("Conditional PMF for Normal (class=0):")
    print(pmf_data['normal_pmf'])