In [2]:
import os
import sys

In [None]:
# To add your own Drive Run this cell.
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Please append your own directory after ‘/content/drive/My Drive/'
### ========== TODO : START ========== ###
sys.path += ['/content/drive/My Drive/cm146-spring23/hw3/HW3-code']
### ========== TODO : END ========== ###

In [3]:
"""
Author      : Yi-Chieh Wu, Sriram Sankararman
Description : Twitter
"""

from string import punctuation

import numpy as np
import matplotlib.pyplot as plt
# !!! MAKE SURE TO USE LinearSVC.decision_function(X), NOT LinearSVC.predict(X) !!!
# (this makes ''continuous-valued'' predictions)
from sklearn.svm import LinearSVC
from sklearn.model_selection import StratifiedKFold
from sklearn import metrics

# Problem 3: Twitter Analysis Using SVM

In [22]:
######################################################################
# functions -- input/output
######################################################################

def read_vector_file(fname):
    """
    Reads and returns a vector from a file.

    Parameters
    --------------------
        fname  -- string, filename

    Returns
    --------------------
        labels -- numpy array of shape (n,)
                    n is the number of non-blank lines in the text file
    """
    return np.genfromtxt(fname)


def write_label_answer(vec, outfile):
    """
    Writes your label vector to the given file.

    Parameters
    --------------------
        vec     -- numpy array of shape (n,) or (n,1), predicted scores
        outfile -- string, output filename
    """

    # for this project, you should predict 70 labels
    if(vec.shape[0] != 70):
        print("Error - output vector should have 70 rows.")
        print("Aborting write.")
        return

    np.savetxt(outfile, vec)
    

In [23]:
######################################################################
# functions -- feature extraction
######################################################################

def extract_words(input_string):
    """
    Processes the input_string, separating it into "words" based on the presence
    of spaces, and separating punctuation marks into their own words.

    Parameters
    --------------------
        input_string -- string of characters

    Returns
    --------------------
        words        -- list of lowercase "words"
    """

    for c in punctuation :
        input_string = input_string.replace(c, ' ' + c + ' ')
    return input_string.lower().split()


def extract_dictionary(infile):
    """
    Given a filename, reads the text file and builds a dictionary of unique
    words/punctuations.

    Parameters
    --------------------
        infile    -- string, filename

    Returns
    --------------------
        word_list -- dictionary, (key, value) pairs are (word, index)
    """

    word_list = {}
    idx = 0
    with open(infile, 'r') as fid :
        # process each line to populate word_list
        for input_string in fid:
            words = extract_words(input_string)
            for word in words:
                if word not in word_list:
                    word_list[word] = idx
                    idx += 1
    return word_list


def extract_feature_vectors(infile, word_list):
    """
    Produces a bag-of-words representation of a text file specified by the
    filename infile based on the dictionary word_list.

    Parameters
    --------------------
        infile         -- string, filename
        word_list      -- dictionary, (key, value) pairs are (word, index)

    Returns
    --------------------
        feature_matrix -- numpy array of shape (n,d)
                          boolean (0,1) array indicating word presence in a string
                            n is the number of non-blank lines in the text file
                            d is the number of unique words in the text file
    """

    num_lines = sum(1 for line in open(infile,'r'))
    num_words = len(word_list)
    feature_matrix = np.zeros((num_lines, num_words))

    with open(infile, 'r') as fid :
        # process each line to populate feature_matrix
        for i, input_string in enumerate(fid):
            words = extract_words(input_string)
            for word in words:
                feature_matrix[i, word_list[word]] = 1.0

    return feature_matrix

In [29]:
######################################################################
# functions -- evaluation
######################################################################

def performance(y_true, y_pred, metric="accuracy"):
    """
    Calculates the performance metric based on the agreement between the
    true labels and the predicted labels.

    Parameters
    --------------------
        y_true -- numpy array of shape (n,), known labels
        y_pred -- numpy array of shape (n,), (continuous-valued) predictions
        metric -- string, option used to select the performance measure
                  options: 'accuracy', 'f1-score', 'auroc', 'precision',
                           'sensitivity', 'specificity'

    Returns
    --------------------
        score  -- float, performance score
    """
    # map continuous-valued predictions to binary labels
    y_label = np.sign(y_pred)
    y_label[y_label==0] = 1

    ### ========== TODO : START ========== ###
    # part 1a: compute classifier performance
    perf = None
    if metric=="accuracy":
        perf = metrics.accuracy_score(y_true=y_true, y_pred=y_label)
    if metric=="f1-score":
        perf = metrics.f1_score(y_true=y_true, y_pred=y_label)
    if metric=="auroc":
        perf = metrics.roc_auc_score(y_true=y_true, y_score=y_pred)
    if metric=="precision":
        perf = metrics.precision_score(y_true=y_true, y_pred=y_label)
    if metric=="sensitivity":
        perf = metrics.recall_score(y_true=y_true, y_pred=y_label)
    if metric=="specificity":
        tn, fp, fn, tp = metrics.confusion_matrix(y_true=y_true, y_pred=y_label).ravel()
        perf = tn /(tn + fp)
    return perf
    ### ========== TODO : END ========== ###


def cv_performance(clf, X, y, kf, metric="accuracy"):
    """
    Splits the data, X and y, into k-folds and runs k-fold cross-validation.
    Trains classifier on k-1 folds and tests on the remaining fold.
    Calculates the k-fold cross-validation performance metric for classifier
    by averaging the performance across folds.

    Parameters
    --------------------
        clf    -- classifier (instance of LinearSVC)
        X      -- numpy array of shape (n,d), feature vectors
                    n = number of examples
                    d = number of features
        y      -- numpy array of shape (n,), binary labels {1,-1}
        kf     -- model_selection.StratifiedKFold
        metric -- string, option used to select performance measure

    Returns
    --------------------
        score   -- float, average cross-validation performance across k folds
    """

    ### ========== TODO : START ========== ###
    # part 1b: compute average cross-validation performance
    folds = kf.get_n_splits(X, y)


    p = 0
    for i, (train, test) in enumerate( kf.split(X,y)):
            clf.fit(X[train], y[train])
            p += performance(y[test], clf.decision_function(X[test]), metric=metric)
            
    return p / folds
    ### ========== TODO : END ========== ###


def select_param_linear(X, y, kf, metric="accuracy"):
    """
    Sweeps different settings for the hyperparameter of a linear SVM,
    calculating the k-fold CV performance for each setting, then selecting the
    hyperparameter that 'maximize' the average k-fold CV performance.

    Parameters
    --------------------
        X      -- numpy array of shape (n,d), feature vectors
                    n = number of examples
                    d = number of features
        y      -- numpy array of shape (n,), binary labels {1,-1}
        kf     -- model_selection.StratifiedKFold
        metric -- string, option used to select performance measure

    Returns
    --------------------
        C -- float, optimal parameter value for linear SVM
    """

    print('Linear SVM Hyperparameter Selection based on ' + str(metric) + ':')
    C_range = 10.0 ** np.arange(-3, 3)

    ### ========== TODO : START ========== ###
    # part 1c: select optimal hyperparameter using cross-validation
    best = -1
    c_val = -1
    
    for c in C_range:
        acc = cv_performance(clf = LinearSVC(loss = 'hinge', random_state= 0, C=c), X=X, y=y, kf = kf, metric=metric)
        if acc > best:
            best = acc
            c_val = c
    print(c_val)
    return c_val
    ### ========== TODO : END ========== ###


def performance_test(clf, X, y, metric="accuracy"):
    """
    Estimates the performance of the classifier.

    Parameters
    --------------------
        clf          -- classifier (instance of LinearSVC)
                          [already fit to data]
        X            -- numpy array of shape (n,d), feature vectors of test set
                          n = number of examples
                          d = number of features
        y            -- numpy array of shape (n,), binary labels {1,-1} of test set
        metric       -- string, option used to select performance measure

    Returns
    --------------------
        score        -- float, classifier performance
    """


    ### ========== TODO : START ========== ###
    # part 2b: return performance on test data under a metric.
    y_pred = clf.decision_function(X)
    score = performance(y_true=y, y_pred=y_pred, metric=metric)
    return score
    ### ========== TODO : END ========== ###

In [30]:
######################################################################
# main
######################################################################

def main() :
    np.random.seed(1234)

    # read the tweets and its labels, change the following two lines to your own path.
    ### ========== TODO : START ========== ###
    file_path = '/Users/divikchotani/github/ece-m146/HW3-code/data/tweets.txt'
    label_path = '/Users/divikchotani/github/ece-m146/HW3-code/data/labels.txt'
    ### ========== TODO : END ========== ###
    dictionary = extract_dictionary(file_path)
    print(len(dictionary))
    X = extract_feature_vectors(file_path, dictionary)
    y = read_vector_file(label_path)
    # split data into training (training + cross-validation) and testing set
    X_train, X_test = X[:560], X[560:]
    y_train, y_test = y[:560], y[560:]

    metric_list = ["accuracy", "f1-score", "auroc", "precision", "sensitivity", "specificity"]

    ### ========== TODO : START ========== ###
    # part 1b: create stratified folds (5-fold CV)
    kf = StratifiedKFold(n_splits=5)
    
    # part 1c: for each metric, select optimal hyperparameter for linear SVM using CV
    cs = []
    for metric in metric_list:
        cs.append( select_param_linear(X=X_train, y=y_train, kf=kf, metric=metric))
    # part 2a: train linear SVMs with selected hyperparameters
    clfs = []
    for i in range(6):
        t = LinearSVC(loss = 'hinge', random_state= 0, C=cs[i]).fit(X = X_train, y = y_train)
        clfs.append(t)
    # part 2b: test the performance of your classifiers.
    for i, clf in enumerate(clfs):
        print(metric_list[i], performance_test(clf=clf, X = X_test, y=y_test, metric= metric_list[i]))
    ### ========== TODO : END ========== ###


if __name__ == "__main__" :
    main()

1811
Linear SVM Hyperparameter Selection based on accuracy:
1.0
Linear SVM Hyperparameter Selection based on f1-score:
1.0
Linear SVM Hyperparameter Selection based on auroc:
1.0
Linear SVM Hyperparameter Selection based on precision:
10.0
Linear SVM Hyperparameter Selection based on sensitivity:
0.001
Linear SVM Hyperparameter Selection based on specificity:
1.0
accuracy 0.7428571428571429
f1-score 0.47058823529411764
auroc 0.7424684159378038
precision 0.6363636363636364
sensitivity 1.0
specificity 0.8979591836734694


# Problem 4: Boosting vs. Decision Tree

In [31]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.model_selection import cross_val_score, train_test_split

In [33]:
class Data :
    
    def __init__(self) :
        """
        Data class.
        
        Attributes
        --------------------
            X -- numpy array of shape (n,d), features
            y -- numpy array of shape (n,), targets
        """
                
        # n = number of examples, d = dimensionality
        self.X = None
        self.y = None
        
        self.Xnames = None
        self.yname = None
    
    def load(self, filename, header=0, predict_col=-1) :
        """Load csv file into X array of features and y array of labels."""
        
        # determine filename
        f = filename
        
        # load data
        with open(f, 'r') as fid :
            data = np.loadtxt(fid, delimiter=",", skiprows=header)
        
        # separate features and labels
        if predict_col is None :
            self.X = data[:,:]
            self.y = None
        else :
            if data.ndim > 1 :
                self.X = np.delete(data, predict_col, axis=1)
                self.y = data[:,predict_col]
            else :
                self.X = None
                self.y = data[:]
        
        # load feature and label names
        if header != 0:
            with open(f, 'r') as fid :
                header = fid.readline().rstrip().split(",")
                
            if predict_col is None :
                self.Xnames = header[:]
                self.yname = None
            else :
                if len(header) > 1 :
                    self.Xnames = np.delete(header, predict_col)
                    self.yname = header[predict_col]
                else :
                    self.Xnames = None
                    self.yname = header[0]
        else:
            self.Xnames = None
            self.yname = None


# helper functions
def load_data(filename, header=0, predict_col=-1) :
    """Load csv file into Data class."""
    data = Data()
    data.load(filename, header=header, predict_col=predict_col)
    return data

In [34]:
# Change the path to your own data directory
### ========== TODO : START ========== ###
titanic = load_data("/Users/divikchotani/github/ece-m146/HW3-code/data/titanic_train.csv", header=1, predict_col=0)
### ========== TODO : END ========== ###
X = titanic.X; Xnames = titanic.Xnames
y = titanic.y; yname = titanic.yname
n,d = X.shape  # n = number of examples, d =  number of features

In [36]:
def error(clf, X, y, ntrials=100, test_size=0.2) :
    """
    Computes the classifier error over a random split of the data,
    averaged over ntrials runs.

    Parameters
    --------------------
        clf         -- classifier
        X           -- numpy array of shape (n,d), features values
        y           -- numpy array of shape (n,), target classes
        ntrials     -- integer, number of trials
        test_size   -- proportion of data used for evaluation

    Returns
    --------------------
        train_error -- float, training error
        test_error  -- float, test error
    """

    train_error = 0
    test_error = 0

    train_scores = [] 
    test_scores = []
    for i in range(ntrials):
        xtrain, xtest, ytrain, ytest = train_test_split (X,y, test_size = test_size, random_state = i)
        clf.fit (xtrain, ytrain)

        ypred = clf.predict (xtrain)
        err = 1 - metrics.accuracy_score (ytrain, ypred, normalize = True)
        train_scores.append (err)

        ypred = clf.predict (xtest)
        err = 1 - metrics.accuracy_score (ytest, ypred, normalize = True)
        test_scores.append (err)

    train_error =  np.mean (train_scores)
    test_error = np.mean (test_scores)
    return train_error, test_error


In [38]:
### ========== TODO : START ========== ###
# Part 4(a): Implement the decision tree classifier and report the training error.
print('Classifying using Decision Tree...')
dtclf = DecisionTreeClassifier(criterion='entropy', random_state=0)
print(error(dtclf, X=X, y=y))
### ========== TODO : END ========== ###

Classifying using Decision Tree...
(np.float64(0.011528998242530775), np.float64(0.24174825174825174))


In [None]:
### ========== TODO : START ========== ###
# Part 4(b): Implement the random forest classifier and adjust the number of samples used in bootstrap sampling.
print('Classifying using Random Forest...')
min_err = float('inf')
min_perc = 0
for i in range(1,9):
    rfclf = RandomForestClassifier(criterion='entropy', random_state=0, max_samples=i/10)
    q = error(rfclf, X=X, y=y)
    _, e = q
    if e < min_err:
        min_err = e
        min_perc = i*10
    print(e)

print(f"min error at {min_perc}% with an error of {min_err}")

### ========== TODO : END ========== ###

Classifying using Random Forest...
0.19874125874125873
0.19013986013986017
0.18734265734265737
0.18923076923076923
0.1920979020979021
0.19559440559440563
0.1995104895104895


KeyboardInterrupt: 

In [None]:
### ========== TODO : START ========== ###
# Part 4(c): Implement the random forest classifier and adjust the number of features for each decision tree.
print('Classifying using Random Forest...')
### ========== TODO : END ========== ###