# KNN ALGORITHM

In [1]:
#imports

import numpy as np
import pandas as pd
import math
from collections import Counter
from scipy.spatial.distance import cdist

from sklearn import datasets
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE

### Loading the dataset

In [2]:
def load_data_from_folder(folder_path):
    files = os.listdir(folder_path)
    
    df_list = []
    
    for file in files:
        file_path = os.path.join(folder_path, file)
        df = pd.read_csv(file_path)
        df_list.append(df)
    
    combined_df = df_list[0]
    
    for df in df_list[1:]:
        combined_df = pd.merge(combined_df, df, on='id', how='outer')
    
    return combined_df

test_folder = '../data/test/'
test_df = load_data_from_folder(test_folder)

train_folder = '../data/train/'
train_df = load_data_from_folder(train_folder)


In [3]:
def preprocess_data(train_df, test_size=0.3, random_state=42):
    X = train_df.drop(['attack_cat', 'label'], axis=1) 
    y = train_df['attack_cat'] 
    
    numeric_columns = X.select_dtypes(include=['number']).columns
    categorical_columns = X.select_dtypes(exclude=['number']).columns
    
    for col in numeric_columns:
        X[col] = X[col].fillna(X[col].median())
    
    for col in categorical_columns:
        X[col] = X[col].fillna(X[col].mode()[0])
    
    scaler = StandardScaler()
    X[numeric_columns] = scaler.fit_transform(X[numeric_columns])
    
    label_encoders = {}
    for col in categorical_columns:
        le = LabelEncoder()
        X[col] = le.fit_transform(X[col])
        label_encoders[col] = le
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, stratify=y, random_state=random_state
    )
    
    smote = SMOTE(random_state=random_state)
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

    return X_train_resampled, X_test, y_train_resampled, y_test, scaler, label_encoders

In [4]:
def preprocess_test_data(test_df, scaler, label_encoders):
    X_test = test_df.copy()
    
    numeric_columns = X_test.select_dtypes(include=['number']).columns
    categorical_columns = X_test.select_dtypes(exclude=['number']).columns
    
    for col in numeric_columns:
        X_test[col] = X_test[col].fillna(X_test[col].median())
    
    for col in categorical_columns:
        X_test[col] = X_test[col].fillna(X_test[col].mode()[0])

    X_test[numeric_columns] = scaler.transform(X_test[numeric_columns])

    for col in categorical_columns:
        if col in label_encoders:
            le = label_encoders[col]
            if 'unknown' not in le.classes_:
                le.classes_ = np.append(le.classes_, 'unknown')
            
            X_test[col] = X_test[col].apply(lambda x: x if x in le.classes_ else 'unknown')
            X_test[col] = le.transform(X_test[col])
    
    return X_test


## KNN IMPLEMENTATION

In [5]:
class KNNClassifier:
    def __init__(self, k=3, metric="euclidean", weights="uniform", batch_size=1000):
        """
        Initialize the Memory-Efficient KNN Classifier.
        
        Parameters:
        -----------
        k : int, default=3
            Number of neighbors to use
        metric : str, default="euclidean"
            Distance metric to use
        weights : str, default="uniform"
            Weight function used in prediction
        batch_size : int, default=1000
            Number of test points to process in each batch
        """
        self.k = k
        self.metric = metric
        self.weights = weights
        self.batch_size = batch_size
        self.train_data = None
        self.train_labels = None
    
    def fit(self, train_data, train_labels):
        """
        Store the training data and labels.
        """
        self.train_data = np.asarray(train_data, dtype=np.float32)
        self.train_labels = np.asarray(train_labels)
        return self
    
    def _compute_batch_distances(self, test_batch):
        """
        Compute distances for a batch of test points.
        """
        if self.metric == 'euclidean':
            distances = cdist(test_batch, self.train_data, metric='euclidean')
        elif self.metric == 'manhattan':
            distances = cdist(test_batch, self.train_data, metric='cityblock')
        elif self.metric == 'minkowski':
            distances = cdist(test_batch, self.train_data, metric='minkowski')
        else:
            raise ValueError(f"Unsupported metric: {self.metric}")
        return distances
    
    def predict(self, test_points):
        """
        Predict labels for test points using batched processing.
        """
        # Validate that model has been trained
        if self.train_data is None:
            raise ValueError("Model has not been trained. Call 'fit' first.")
        
        # Convert test points to numpy array
        test_points = np.asarray(test_points, dtype=np.float32)
        
        # Prepare for batched prediction
        predictions = []
        
        # Process test points in batches
        for i in range(0, len(test_points), self.batch_size):
            # Get current batch
            test_batch = test_points[i:i+self.batch_size]
            
            # Compute distances for current batch
            distances = self._compute_batch_distances(test_batch)
            
            # Predict for each point in the batch
            batch_predictions = []
            for point_distances in distances:
                # Get indices of k nearest neighbors
                k_indices = np.argpartition(point_distances, self.k)[:self.k]
                
                # Get labels of k nearest neighbors
                k_labels = self.train_labels[k_indices]
                k_dist = point_distances[k_indices]
                
                # Apply weighting if specified
                if self.weights == 'distance':
                    # Avoid division by zero
                    weights = 1.0 / (k_dist + 1e-8)
                    unique_labels, _ = np.unique(k_labels, return_counts=True)
                    weighted_votes = []
                    for label in unique_labels:
                        label_mask = (k_labels == label)
                        weighted_vote = np.sum(weights[label_mask])
                        weighted_votes.append((label, weighted_vote))
                    prediction = max(weighted_votes, key=lambda x: x[1])[0]
                else:
                    # Uniform weighting (most common label)
                    prediction = Counter(k_labels).most_common(1)[0][0]
                
                batch_predictions.append(prediction)
            
            # Add batch predictions to overall predictions
            predictions.extend(batch_predictions)
        
        return np.array(predictions)
    
    def score(self, X_test, y_test):
        """
        Compute the accuracy of the classifier.
        """
        predictions = self.predict(X_test)
        return np.mean(predictions == y_test)
    
    def __repr__(self):
        return (f"MemoryEfficientKNNClassifier(k={self.k}, "
                f"metric='{self.metric}', "
                f"weights='{self.weights}', "
                f"batch_size={self.batch_size})")
        
    def submit(self, X_test, filename='submission.csv'):
        """
        Make predictions on test data and save them to a CSV file.
        """
        predictions = self.predict(X_test)
        submission_df = pd.DataFrame({'id': test_df['id'], 'attack_cat': predictions})
        submission_df.to_csv(filename, index=False)
        return submission_df

In [None]:
from typing import Union, Callable, Literal

class KNNClassifier:
    def __init__(
        self, 
        n_neighbors: int = 3, 
        distance_metric: Literal['euclidean', 'manhattan', 'minkowski'] = 'euclidean',
        p: float = 2  
    ):
        """
        K-Nearest Neighbors Classifier implemented from scratch.
        
        Parameters:
        -----------
        n_neighbors : int, default=3
            Number of neighbors to use for classification
        
        distance_metric : str, default='euclidean'
            Distance metric to use. Options:
            - 'euclidean': Euclidean distance
            - 'manhattan': Manhattan (city block) distance
            - 'minkowski': Minkowski distance (generalization of Euclidean and Manhattan)
        
        p : float, default=2
            Power parameter for Minkowski distance
        """
        self.n_neighbors = n_neighbors
        self.distance_metric = distance_metric
        self.p = p
        self.X_train = None
        self.y_train = None
    
    def _distance(self, x1: np.ndarray, x2: np.ndarray) -> float:
        """
        Calculate distance between two points based on selected metric.
        
        Parameters:
        -----------
        x1 : numpy array
            First data point
        x2 : numpy array
            Second data point
        
        Returns:
        --------
        float
            Distance between x1 and x2
        """
        # Validate input shapes
        if x1.shape != x2.shape:
            raise ValueError("Input points must have the same shape")
        
        if self.distance_metric == 'euclidean':
            return np.sqrt(np.sum((x1 - x2)**2))
        
        elif self.distance_metric == 'manhattan':
            return np.sum(np.abs(x1 - x2))
        
        elif self.distance_metric == 'minkowski':
            return np.sum(np.abs(x1 - x2)**self.p)**(1/self.p)
        
        else:
            raise ValueError(f"Unsupported distance metric: {self.distance_metric}")
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        """
        Fit the KNN classifier by storing training data.
        
        Parameters:
        -----------
        X : numpy array
            Training feature data
        y : numpy array
            Training labels
        """
        # Validate inputs
        if X.ndim != 2:
            raise ValueError("X must be a 2D array")
        
        if len(X) != len(y):
            raise ValueError("X and y must have the same number of samples")
        
        self.X_train = X
        self.y_train = y
        
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict class labels for input data.
        
        Parameters:
        -----------
        X : numpy array
            Test samples
        
        Returns:
        --------
        numpy array
            Predicted class labels
        """
        if self.X_train is None or self.y_train is None:
            raise ValueError("Classifier not fitted. Call fit() first.")
        
        # Vectorized prediction for better performance
        predictions = np.array([self._predict_single(x) for x in X])
        return predictions
    
    def _predict_single(self, x: np.ndarray):
        """
        Predict class for a single sample.
        
        Parameters:
        -----------
        x : numpy array
            Single test sample
        
        Returns:
        --------
        Predicted class label
        """
        # Compute distances to all training points
        distances = np.array([self._distance(x, x_train) for x_train in self.X_train])
        
        print(distances)

        # Get indices of k nearest neighbors
        k_indices = np.argsort(distances)[:self.n_neighbors]
        
        # Get the labels of the k nearest neighbors
        k_nearest_labels = self.y_train[k_indices]
        
        # Use majority voting
        unique_labels, counts = np.unique(k_nearest_labels, return_counts=True)
        return unique_labels[np.argmax(counts)]
    
    def score(self, X_test: np.ndarray, y_test: np.ndarray) -> float:
        """
        Calculate the accuracy of the classifier.
        
        Parameters:
        -----------
        X_test : numpy array
            Test feature data
        y_test : numpy array
            True labels for test data
        
        Returns:
        --------
        float
            Accuracy score (0 to 1)
        """
        predictions = self.predict(X_test)
        return np.mean(predictions == y_test)

In [6]:
# # Train the model
# knn = KNNClassifier(k=3, metric="euclidean", weights="distance", batch_size=500)
# knn.fit(X_train, y_train)

# # Evaluate the model
# train_accuracy = knn.score(X_train, y_train)

X_train, X_test, y_train, y_test, scaler, label_encoders = preprocess_data(train_df)
X_test_processed = preprocess_test_data(test_df, scaler, label_encoders)

ValueError: The feature names should match those that were passed during fit.
Feature names must be in the same order as they were in fit.


In [22]:
knn = KNNClassifier(k=3, metric="euclidean", weights="distance", batch_size=500)
knn.fit(X_train, y_train)

knn.submit(X_test_processed, 'submission.csv')

ValueError: array length 52603 does not match index length 20583