# K Nearest Neighbors

In [1]:
import numpy as np
from math import sqrt, exp
from csv import reader

In [2]:
# calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    return sqrt(sum((row1[:-1]-row2[:-1])**2))

In [68]:
# Locate the most similar neighbors
def get_neighbors(train, test_row, num_neighbors):
    distances = np.array(list(map(lambda x: euclidean_distance(test_row, x), train[:, :-1])))
    enriched = np.column_stack((train, distances))
    return enriched[enriched[:, -1].argsort()][:num_neighbors, -2]

In [69]:
# Make a classification prediction with neighbors
def predict_classification(train, test_row, num_neighbors):
    neighbors = get_neighbors(train, test_row, num_neighbors)
    u, c = np.unique(neighbors, return_counts = True)
    return u[c == c.max()][0]

In [70]:
# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
            dataset.append(row)
    arr = np.array(dataset)
    
    classes, labels = str_column_to_int(arr[:, -1])
    return np.hstack((arr[:, :-1], labels.reshape(-1, 1))).astype(np.float)

In [71]:
# Convert string column to integer (one-hot encoding)
def str_column_to_int(labels):
    return np.unique(labels, return_inverse=True)

In [72]:
# Find the min and max values for each column
def dataset_minmax(dataset):
    return np.min(dataset, axis=0), np.max(dataset, axis=0)

In [73]:
# Rescale dataset columns to the range 0-1
def normalize_dataset(dataset, min_vals, max_vals):
    denominator = max_vals - min_vals
    for i in range(dataset.shape[0]):
        dataset[i] = (dataset[i] - min_vals) / denominator
    return dataset

In [74]:
# Split a dataset into k folds
def cross_validation_split(dataset, n_folds):
    # random shuffle before sending 
    np.random.shuffle(dataset)
    
    return np.array_split(dataset, n_folds)

In [75]:
# Calculate accuracy percentage
def accuracy_metric(actual, predicted):
    return (actual == predicted).sum() / actual.shape[0] * 100.0

In [76]:
# Evaluate an algorithm using a cross validation split
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
    folds = cross_validation_split(dataset, n_folds)
    scores = list()
    for i in range(n_folds):
        test = folds[i]
        train = np.concatenate([folds[j] for j in range(n_folds) if j != i])
        
        predicted = algorithm(train, test[:, :-1], *args)
        actual = test[:, -1]
        
        accuracy = accuracy_metric(actual, predicted)
        scores.append(accuracy)
    
    return scores

In [77]:
# kNN Algorithm
def k_nearest_neighbors(train, test, num_neighbors):
    predictions = list()
    for row in test:
        output = predict_classification(train, row, num_neighbors)
        predictions.append(output)
    return np.array(predictions)

In [78]:
# Test the kNN on the Iris Flowers dataset
filename = 'data/iris.csv'
dataset = load_csv(filename)

In [79]:
# Normalize the data
min_vals, max_vals = dataset_minmax(dataset[:, :-1])
dataset[:, :-1] = normalize_dataset(dataset[:, :-1], min_vals, max_vals)

In [80]:
# evaluate algorithm
n_folds = 5
num_neighbors = 5

scores = evaluate_algorithm(dataset, k_nearest_neighbors, n_folds, num_neighbors)
print('Scores: %s' % scores)
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))

Scores: [93.33333333333333, 90.0, 96.66666666666667, 86.66666666666667, 96.66666666666667]
Mean Accuracy: 92.667%


# Statistical Linear Model

In [31]:
# Calculate the mean value of a list of numbers
def mean(values):
    return np.mean(values, axis=0)

In [32]:
# Calculate the variance of a list of numbers
def variance(values, mean):
    return sum((values - mean)**2)

In [33]:
# Calculate covariance between x and y
def covariance(x, mean_x, y, mean_y):
    return sum((x - mean_x) * (y - mean_y))

In [34]:
# Calculate coefficients
def coefficients(dataset):
    x_mean, y_mean = mean(dataset)
    x_variance = variance(dataset[:, 0], x_mean)
    
    b1 = covariance(dataset[:, 0], x_mean, dataset[:, 1], y_mean) / x_variance
    b0 = y_mean - b1 * x_mean
    
    return [b0, b1]

In [35]:
# Test simple linear regression
dataset = np.array([[1, 1], [2, 3], [4, 3], [3, 2], [5, 5]])
mean_ds = mean(dataset)
variance_ds = variance(dataset, mean_ds)
covariance_ds = covariance(dataset[:, 0], mean_ds[0], dataset[:, 1], mean_ds[1])
coeff = coefficients(dataset)

In [36]:
# Load a txt file
def load_txt(filename):
    dataset = list()
    with open(filename, 'r') as file:
        for line in file.readlines():
            dataset.append(line.split())
    return np.array(dataset, dtype=np.float)

In [37]:
def train_test_split(dataset, split):
    training_elements = int(split*dataset.shape[0])
    np.random.shuffle(dataset)
    return dataset[:training_elements], dataset[training_elements:]

In [38]:
def rmse_metric(actual, predicted):
    return np.sqrt(np.mean((predicted-actual)**2))

In [39]:
# Evaluate an algorithm using a train/test split
def evaluate_algorithm(dataset, algorithm, split, *args):
    train, test = train_test_split(dataset, split)
    
    predicted = algorithm(train, test[:, :-1], *args)
    actual = test[:, -1]
    
    rmse = rmse_metric(actual, predicted)
    
    return rmse

In [40]:
# Simple linear regression algorithm
def simple_linear_regression(train, test):
    b0, b1 = coefficients(train)
    return test*b1 + b0

In [42]:
# Simple linear regression on insurance dataset
# load and prepare data
filename = 'data/AutoInsurSweden.txt'
dataset = load_txt(filename)

In [43]:
# evaluate algorithm
split = 0.9
rmse = evaluate_algorithm(dataset, simple_linear_regression, split)
print('RMSE: %.3f' % (rmse))

RMSE: 42.319


# Linear Regression

In [131]:
# Make a prediction with coefficients
def predict(row, coefficients):
    return sum(row * coefficients[:-1]) + coefficients[-1]


In [132]:
# Estimate linear regression coefficients using stochastic gradient descent
def coefficients_sgd(train, l_rate, n_epoch):
    coef = np.zeros(train.shape[1])
    for epoch in range(n_epoch):
        sum_error = 0
        for row in train:
            
            yhat = predict(row[:-1], coef)
            
            error = yhat - row[-1]
            sum_error += error**2
            
            # update weights
            coef[:-1] -= (row[:-1] * (l_rate * error))
            
            # update bias
            coef[-1] -= l_rate * error
            
#         print('>epoch=%d, lrate=%.3f, error=%.3f' % (epoch, l_rate, sum_error))
    
    return coef

In [133]:
# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        for row in file.readlines():
            if not row:
                continue
            dataset.append(row.split(";"))
    return np.array(dataset[1:], dtype=np.float)

In [134]:
# Linear Regression Algorithm With Stochastic Gradient Descent
def linear_regression_sgd(train, test, l_rate, n_epoch):
    predictions = list()
    
    coef = coefficients_sgd(train, l_rate, n_epoch)
    
    for row in test:
        yhat = predict(row[:-1], coef)
        predictions.append(yhat)
    
    return np.array(predictions)

In [135]:
# load and prepare data
filename = 'data/winequality-white.csv'
dataset = load_csv(filename)

In [136]:
# Normalize the data
min_vals, max_vals = dataset_minmax(dataset[:, :-1])
dataset[:, :-1] = normalize_dataset(dataset[:, :-1], min_vals, max_vals)

In [178]:
# Evaluate an algorithm using a cross validation split
def evaluate_algorithm(dataset, algorithm, metric, n_folds, *args):
    folds = cross_validation_split(dataset, n_folds)
    scores = list()
    for i in range(n_folds):
        test = folds[i]
        train = np.concatenate([folds[j] for j in range(n_folds) if j != i])
        
        predicted = algorithm(train, test, *args)
        actual = test[:, -1]
        
        result = metric(actual, predicted)
        scores.append(result)
    
    return scores

In [138]:
# evaluate algorithm
n_folds = 5
l_rate = 0.01
n_epoch = 50
scores = evaluate_algorithm(dataset, linear_regression_sgd, rmse_metric, n_folds, l_rate, n_epoch)
print('Scores: %s' % scores)
print('Mean RMSE: %.3f' % (sum(scores)/float(len(scores))))

Scores: [0.7618192718728922, 0.7398903864832463, 0.7558842278594348, 0.7669434343985889, 0.7636128414004474]
Mean RMSE: 0.758


In [108]:
dataset = np.array([[1, 1], [2, 3], [4, 3], [3, 2], [5, 5]])
coef = np.array([0.8, 0.4])
l_rate = 0.001
n_epoch = 50
coef = coefficients_sgd(dataset, l_rate, n_epoch)
print(coef)

[0.80172203 0.22998235]


# Logistic regression

In [139]:
# Make a prediction with coefficients
def predict(row, coefficients):
    yhat = sum(row * coefficients[:-1]) + coefficients[-1]
    return 1.0 / (1.0 + exp(-yhat))

In [177]:
# Estimate logistic regression coefficients using stochastic gradient descent
def coefficients_sgd(train, l_rate, n_epoch):
    coef = np.zeros(train.shape[1])
    for epoch in range(n_epoch):
        sum_error = 0
        for row in train:
            yhat = predict(row[:-1], coef)
            
            error = row[-1] - yhat
            
            # update weights
            coef[:-1] += (row[:-1] * (l_rate * error * yhat * (1.0 - yhat)))
            
            # update bias
            coef[-1] += l_rate * error * yhat * (1.0 - yhat)

#         print('>epoch=%d, lrate=%.3f, error=%.3f' % (epoch, l_rate, sum_error))
    return coef

In [172]:
# Logistic Regression Algorithm With Stochastic Gradient Descent
def logistic_regression(train, test, l_rate, n_epoch):
    predictions = list()
    
    coef = coefficients_sgd(train, l_rate, n_epoch)
    
    for row in test:
        yhat = predict(row[:-1], coef)
        predictions.append(round(yhat))
    
    return np.array(predictions, dtype=np.float)

In [173]:
# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
            dataset.append(row)
        return np.array(dataset, dtype=np.float)


In [179]:
# load and prepare data
filename = 'data/pima-indians-diabetes.csv'
dataset = load_csv(filename)

In [180]:
# Normalize the data
min_vals, max_vals = dataset_minmax(dataset[:, :-1])
dataset[:, :-1] = normalize_dataset(dataset[:, :-1], min_vals, max_vals)

In [181]:
# evaluate algorithm
n_folds = 5
l_rate = 0.1
n_epoch = 100
scores = evaluate_algorithm(dataset, logistic_regression, accuracy_metric, n_folds, l_rate, n_epoch)
print('Scores: %s' % scores)
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))

Scores: [72.72727272727273, 81.81818181818183, 74.67532467532467, 73.20261437908496, 73.8562091503268]
Mean Accuracy: 75.256%


# Perceptron

In [195]:
# Make a prediction with weights
def predict(row, weights):
    activation = sum(row * weights[:-1]) + weights[-1]
    return 1.0 if activation >= 0.0 else 0.0

In [206]:
# Perceptron Algorithm With Stochastic Gradient Descent
def perceptron(train, test, l_rate, n_epoch):
    predictions = list()
    weights = train_weights(train, l_rate, n_epoch)
    
    for row in test:
        prediction = predict(row[:-1], weights)
        predictions.append(prediction)
    
    return np.array(predictions)

In [209]:
# Estimate Perceptron weights using stochastic gradient descent
def train_weights(train, l_rate, n_epoch):
    weights = np.zeros(train.shape[1])
    for epoch in range(n_epoch):
        sum_error = 0.0
        for row in train:
            prediction = predict(row[:-1], weights)
            error = row[-1] - prediction
            sum_error += error**2
            
            # update weights
            weights[:-1] += (row[:-1] * (l_rate * error))
            
            # update bias
            weights[-1] += l_rate * error
            
#         print('>epoch=%d, lrate=%.3f, error=%.3f' % (epoch, l_rate, sum_error))
    
    return weights

In [210]:
# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
            dataset.append(row)
    arr = np.array(dataset)
    
    classes, labels = str_column_to_int(arr[:, -1])
    return np.hstack((arr[:, :-1], labels.reshape(-1, 1))).astype(np.float)

In [211]:
filename = 'data/sonar.csv'
dataset = load_csv(filename)

In [212]:
# evaluate algorithm
n_folds = 3
l_rate = 0.01
n_epoch = 500
scores = evaluate_algorithm(dataset, perceptron, accuracy_metric, n_folds, l_rate, n_epoch)
print('Scores: %s' % scores)
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))

Scores: [64.28571428571429, 68.11594202898551, 78.26086956521739]
Mean Accuracy: 70.221%
