In [None]:
import math
import random

import numpy as np
import pandas as pd

import string
import re
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer
# from bs4 import BeautifulSoup as bs
# import lxml

nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import math
import enum


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [None]:
class Metric(enum.Enum):
    EUCLIDEAN_DISTANCE = "euclidean"
    HAMMING_DISTANCE = "hamming"
    MANHATTAN_DISTANCE = "manhattan"
    COSINE_SIMILARITY = "cosine"

In [None]:
from nltk.util import clean_html
class kNN_Texts:

    ### DO NOT change anything in the constructor
    def __init__(self, vectorizer_class=None, 
                 K=None,
                 metric=Metric.EUCLIDEAN_DISTANCE):
        
        self.__vectorizer = vectorizer_class(analyzer=lambda text: text)
        # self.__vectorizer = vectorizer_class(analyzer=self.__preprocess_text)
        
        self.__K = K
        self.__metric = metric

        self.__train_vocabulary = None
        self.__train_feature_vectors = None
        self.__train_labels = None

    """@staticmethod"""
    def __preprocess_text(self, text):
        ### TODO 
        #Lowercase the text
        text = text.lower() 
        #Number Removal
        text = re.sub(r'[-+]?\d+', '', text)  
        
        #Remove hyperlinks
        text = re.sub(r'https?:\/\/\S*', '', text)
        text = re.sub(r'https?:\/\/.*[\r\n]*', '', text)
        text = re.sub(r'https?:\/\/.*\s*', '', text)
        text = re.sub(r'www\.\S*', '', text)
        text = re.sub(r'\S*\.(com|info|net|org)', '', text)
        #Remove punctuations
        text = text.translate((str.maketrans('', '', string.punctuation)))  
        #Tokenize
        text = word_tokenize(text)
        #Remove stopwords
        stop_words = set(stopwords.words('english'))
        text = [word for word in text if not word in stop_words]
        #Lemmatize tokens
        lemmatizer = WordNetLemmatizer()
        text = [lemmatizer.lemmatize(word) for word in text]
        #Stemming tokensaazwxx
        stemmer = PorterStemmer()
        text = [stemmer.stem(word) for word in text]

        preprocessed_text = text

        return preprocessed_text

    def __fit_vectorizer(self, texts):
        ### TODO
        # fit the attribute vectorizer
        self.__vectorizer.fit(texts)  
        ### TODO
        self.__train_vocabulary = self.__vectorizer.vocabulary_  #

    def __vectorize_texts(self, texts):
        ### TODO
        texts_feature_vectors = self.__vectorizer.transform(texts).toarray()

        return texts_feature_vectors

    def __train(self, texts, labels):
        ### TODO
        preprocessed_texts = list(map(self.__preprocess_text, texts))
        self.__fit_vectorizer(preprocessed_texts)
        ### TODO
        train_feature_vectors = self.__vectorize_texts(preprocessed_texts)
       
        ### TODO
        train_labels = list(labels)  
       
        return train_feature_vectors, train_labels

    def fit(self, texts, labels):
        self.__train_feature_vectors, self.__train_labels = self.__train(texts=texts, 
                                                                         labels=labels)

    def __compute_metric_to_train_points(self, feature_vector):
        metric_values = None

        ### TODO
        a = feature_vector
        t = self.__train_feature_vectors

        if self.__metric == Metric.EUCLIDEAN_DISTANCE:
            #Eucl_dist = sqrt ( sum( (a[i]-b[i])^2) )
            dist = []
            for i in range(len(t)):  # For each feature in train features
                b = t[i]
                sum = 0
                for j in range(len(b)):
                    sum += pow((a[j] - b[j]), 2)
                sqrt = math.sqrt(sum)    
                
                dist.append((sqrt, self.__train_labels[i]))
            metric_values = dist
        
        elif self.__metric == Metric.HAMMING_DISTANCE:
            # Hd = count++ if a[j] != b[j]
            dist = []
            for i in range(len(t)):  # For each feature in train features
                b = t[i]
                sum = 0
                for j in range(len(b)):
                    if a[j] != b[j]:
                        sum += 1   
                dist.append((sum, self.__train_labels[i]))
            metric_values = dist

        elif self.__metric == Metric.MANHATTAN_DISTANCE:
            #MH_dist = sum( abs(a[i]-b[i]) )
            dist = []
            for i in range(len(t)):  
                b = t[i]
                sum = 0
                for j in range(len(b)):
                    sum += abs(a[j] - b[j])    
                dist.append((sum, self.__train_labels[i]))

            metric_values = dist

        elif self.__metric == Metric.COSINE_SIMILARITY:
            #cos_sim = Sum_of(a[i].b[i]) / sqrt( sum_of(a[i]^2) ) . sqrt( sum_of(b[i]^2) )
            dist = []
            for i in range(len(t)):     # For each feature in the train features
                b = t[i]                # a = A Feature Vector,  b = A single Train Feature Vector, 
                sum, aj_bj, a_sqr, b_sqr = 0
                for j in range(len(b)):
                    aj_bj += (a[j] * b[j])
                    a_sqr += pow(a[i], 2)
                    b_sqr += pow(b[i], 2)
                
                res = aj_bj / (math.sqrt(a_sqr) * math.sqrt(b_sqr))
                dist.append((res, self.__train_labels[i]))
            metric_values = dist

        return metric_values

    def predict(self, texts):
        ### TODO
        preprocessed_texts = list(map(self.__preprocess_text, texts))

        ### TODO
        test_feature_vectors = self.__vectorize_texts(preprocessed_texts)

        ### TODO 
        predict_label = []
        for i in range(len(test_feature_vectors)):
            distance_metric = self.__compute_metric_to_train_points(test_feature_vectors[i])
            
            flag = 1 #For "cosine_similarity", make the flag = 0 
            if flag == 0:
                #for "cosine_similarity"
                distance_metric.sort(reverse = True)  #Descendingly
            else:
                distance_metric.sort()  #Ascendingly
            
            distance_metric = distance_metric[: self.__K]
            c_1 = 0; c_0 = 0
            for d in distance_metric:
                if d[1] == 1:
                    c_1 += 1
                elif d[1] == 0:
                    c_0 += 1  
                else:
                    pass      
            
            if c_1 > c_0:
                major_class = 1
            else:
                major_class = 0      
            
            predict_label.append(major_class)     
            

            waighted = False #make 'True' if waighted kNN is need 
            if waighted == True :
                w_prediction = 0
                for d in distance_metric:
                    if self.__train_labels[i] == 0:
                        w_prediction += (1/d[0]) * -1
                    else:
                        w_prediction += (1/d[0]) * 1
            
                predict_label.append(1 if w_prediction >= 0 else 0)

        #test_metric_values = None
       
        ### TODO
        predictions = predict_label

        return predictions

**Train.csv**

In [None]:

train_set = pd.read_csv("train.csv", na_values='')

train_set.drop(columns = 'id', axis = 1)
train_set.dropna(inplace = True)
train_set.dropna()
train_set.reset_index(inplace = True)

test_set = pd.read_csv("test.csv", na_values='')
test_set.dropna(inplace = True)
test_set.reset_index(inplace = True)
test_set.drop(columns = 'id', axis = 1)

train_label = train_set["label"]
train_feature = train_set["tweet"]

x_train, x_test, y_train, y_test= train_test_split(train_feature, train_label, 
                                   train_size = 0.8, 
                                   stratify = train_label,
                                   random_state = 911 )  #With stratify
                                                                                              

**CountVectorizer**

In [None]:
clf = kNN_Texts(vectorizer_class=CountVectorizer,
                metric=Metric.EUCLIDEAN_DISTANCE,
                K=9)

clf.fit(x_train, y_train)
predictions = clf.predict(x_test)
print(len(y_test))
print(len(predictions))


tn, fp, fn, tp = confusion_matrix(predictions, y_test)

print("TP: {}\nTN: {}\nFP: {}\nFN: {}".format(tp, tn, fp, fn))


print(clf.predict(test_set))

**TfidfVectorizer**

In [None]:
clf = kNN_Texts(vectorizer_class=TfidfVectorizer,
                metric=Metric.EUCLIDEAN_DISTANCE,
                K=9)

clf.fit(x_train, y_train)
predictions = clf.predict(x_test)
print(len(y_test))
print(len(predictions))


tn, fp, fn, tp = confusion_matrix(predictions, y_test)

print("TP: {}\nTN: {}\nFP: {}\nFN: {}".format(tp, tn, fp, fn))


print(clf.predict(test_set))

In [None]:
clf._kNN_Texts__metric

In [None]:
dir(clf)