In [None]:
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score
from sklearn import set_config #To display pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer

In [None]:
df = pd.read_csv('knnData.csv')

# Extract features and labels for training data
X_train = df[['trainPoints_x1', 'trainPoints_x2']].values
y_train = df['trainLabel'].values

# Extract features and labels for test data
X_test = df[['testPoints_x1', 'testPoints_x2']].values
y_test = df['testLabel'].values

In [None]:
class DistanceWeightedKNN:
    def __init__(self, k=3, p=1):
        """
        Initialize the Distance-Weighted KNN model.
        :param k: Number of neighbors to consider.
        :param p: Distance metric (1 for Manhattan, 2 for Euclidean, np.inf for Chebyshev).
        """
        self.k = k
        self.p = p
        self.train_data = None
        self.train_labels = None

    def fit(self, train_data, train_labels):
        """
        Fit the model with the training data.
        :param train_data: Array of training points (features).
        :param train_labels: Array of training labels.
        """
        self.train_data = train_data
        self.train_labels = train_labels

    def _distance(self, point1, point2):
        """
        Compute distance between two points based on the given metric.
        """
        if self.p == 1:
            return np.sum(np.abs(np.array(point1) - np.array(point2)))
        elif self.p == 2:
            return np.sqrt(np.sum((np.array(point1) - np.array(point2)) ** 2))
        elif self.p == np.inf:
            return np.max(np.abs(np.array(point1) - np.array(point2)))
        else:
            raise ValueError("Unsupported distance metric.")

    def _predict_point(self, test_point):
        """
        Predict the label for a single test point.
        :param test_point: Test point for which to predict the label.
        """
        # Calculate distances and weights
        distances = [self._distance(test_point, train_point) for train_point in self.train_data]
        weights = [1 / (d ** 2) if d != 0 else float('inf') for d in distances]  # Avoid division by zero
        neighbors = sorted(zip(distances, weights, self.train_labels))[:self.k]

        numerator = 0
        denominator = 0

        for _, weight, label in neighbors:
            numerator += weight * label
            denominator += weight

        return np.sign(numerator / denominator)

        # Aggregate weighted votes
        # weighted_votes = Counter()
        # for _, weight, label in neighbors:
        #     weighted_votes[label] += weight

        # # Return the label with the highest weighted vote
        # return weighted_votes.most_common(1)[0][0]

    def predict(self, test_data):
        """
        Predict labels for multiple test points.
        :param test_data: Array of test points.
        :return: Predicted labels for the test points.
        """
        return [self._predict_point(test_point) for test_point in test_data]

    def score(self, test_data, test_labels):
        """
        Compute the accuracy of the model on test data.
        :param test_data: Array of test points.
        :param test_labels: Array of true labels for the test points.
        :return: Accuracy score.
        """
        predictions = self.predict(test_data)
        correct = sum(p == t for p, t in zip(predictions, test_labels))
        return correct / len(test_labels)

# # Example Usage:
# # Define dataset (replace with actual data)
# train_data = np.array(df[['trainPoints_x1','trainPoints_x2']])
# test_data = np.array(df[['testPoints_x1','testPoints_x2']])
# train_labels = np.array(df['trainLabel'])
# test_labels = np.array(df['testLabel'])

# # Initialize and fit the model
# knn = DistanceWeightedKNN(k=3, p=np.inf)  # Using Euclidean distance (p=2)
# knn.fit(train_data, train_labels)

# # Predict and evaluate
# predictions = knn.predict(test_data)
# accuracy = knn.score(test_data, test_labels)

# print(f"Predictions: {predictions}")
# print(f"Accuracy: {accuracy:.2f}")


# Without Pipeline

## Manhattan distance

In [None]:
knn_model = DistanceWeightedKNN()

In [None]:
knn_model.fit(X_train,y_train)

In [None]:
y_pred = knn_model.predict(X_test)
print(f"Predictions: {y_pred}")

Predictions: [-1.0, 1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0]


In [None]:
# Calculate accuracy
accuracy = knn_model.score(y_test, y_pred)
print("Accuracy:", accuracy)

Accuracy: 0.875


## Euclidean Distance

In [None]:
knn_model_1 = DistanceWeightedKNN(p=2)
knn_model_1.fit(X_train,y_train)
y_pred_1 = knn_model_1.predict(X_test)
print(f"Predictions: {y_pred_1}")

accuracy_1 = knn_model_1.score(y_test, y_pred_1)
print("Accuracy:", accuracy_1)

Predictions: [-1.0, 1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, 1.0]
Accuracy: 0.825


## Chebysev Distancce

In [None]:
knn_model_2 = DistanceWeightedKNN(p=np.inf)
knn_model_2.fit(X_train,y_train)
y_pred_2 = knn_model_2.predict(X_test)
print(f"Predictions: {y_pred_2}")

accuracy_2 = knn_model_2.score(y_test, y_pred_2)
print("Accuracy:", accuracy_2)

Predictions: [-1.0, 1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0]
Accuracy: 0.875


# With Pipeline

## Manhattan distance

In [None]:
pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('scaler', StandardScaler()),
        ("KNN", DistanceWeightedKNN())
    ]
)

In [None]:
def has_scaler(pipeline):
  """Checks if the pipeline has a step named 'scaler'.

  Args:
    pipeline: The scikit-learn pipeline object.

  Returns:
    True if the pipeline has a step named 'scaler', False otherwise.
  """
  return 'scaler' in pipeline.named_steps

# if pipeline has scaler, return 1
if has_scaler(pipeline):
    # Extract features and labels for training data
    X_train = df[['trainPoints_x1', 'trainPoints_x2']]
    y_train = df['trainLabel']

    # Extract features and labels for test data
    X_test = df[['testPoints_x1', 'testPoints_x2']]
    y_test = df['testLabel']
    X_test = X_test.rename(columns={'testPoints_x1': 'trainPoints_x1', 'testPoints_x2': 'trainPoints_x2'})
    # y_test = y_test.rename(columns={'testLabel': 'trainLabel'})

In [None]:
pipeline.fit(X_train,y_train)

In [None]:
y_pred = pipeline.predict(X_test)
print(f"Predictions: {y_pred}")

Predictions: [-1.0, 1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0]




In [None]:
# Calculate accuracy
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

Accuracy: 0.85


## Euclidean Distance

In [None]:
pipeline_1 = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('scaler', StandardScaler()),
        ("KNN", DistanceWeightedKNN(p=2))
    ]
)

pipeline_1.fit(X_train,y_train)
y_pred_1 = pipeline_1.predict(X_test)
print(f"Predictions: {y_pred_1}")

accuracy_1 = accuracy_score(y_test, y_pred_1)
print("Accuracy:", accuracy_1)

Predictions: [-1.0, 1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, 1.0]
Accuracy: 0.825




## Chebysev Distancce

In [None]:
pipeline_2 = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('scaler', StandardScaler()),
        ("KNN", DistanceWeightedKNN(p=np.inf))
    ]
)

pipeline_2.fit(X_train,y_train)
y_pred_2 = pipeline_2.predict(X_test)
print(f"Predictions: {y_pred_2}")

accuracy_2 = accuracy_score(y_test, y_pred_2)
print("Accuracy:", accuracy_2)

Predictions: [-1.0, 1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0, -1.0, -1.0, -1.0, 1.0]
Accuracy: 0.8


