In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from typing import List, Tuple, Union, Dict, Generic, TypeVar

In [30]:
dataTrain = pd.read_csv('data/HAI817_Projet_train.csv')
dataTest = pd.read_csv('data/HAI817_Projet_test.csv')
dataTrain

Unnamed: 0,public_id,text,title,our rating
0,5a228e0e,Distracted driving causes more deaths in Canad...,"You Can Be Fined $1,500 If Your Passenger Is U...",false
1,30c605a1,Missouri politicians have made statements afte...,Missouri lawmakers condemn Las Vegas shooting,mixture
2,c3dea290,Home Alone 2: Lost in New York is full of viol...,CBC Cuts Donald Trump's 'Home Alone 2' Cameo O...,mixture
3,f14e8eb6,But things took a turn for the worse when riot...,Obama’s Daughters Caught on Camera Burning US ...,false
4,faf024d6,It’s no secret that Epstein and Schiff share a...,Leaked Visitor Logs Reveal Schiff’s 78 Visits ...,false
...,...,...,...,...
1259,47423bb6,More than four million calls to the taxman are...,Taxman fails to answer four million calls a ye...,true
1260,097c142a,More under-18s are being taken to court for se...,Police catch 11‑year‑olds being used to sell d...,true
1261,08bc59f4,The Government’s much vaunted Help to Buy Isa ...,"Help to Buy Isa scandal: 500,000 first-time bu...",false
1262,af3393ce,The late Robin Williams once called cocaine “G...,A coke-snorting generation of hypocrites,true


In [11]:
# nltk download
import nltk
nltk.download('stopwords')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Luna\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Luna\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt.zip.


True

In [31]:
# text preprocessing
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string

stop_words = set(stopwords.words('english'))
punctuations = set(string.punctuation)

def remove_stop_words(text):
    # Tokenize the text
    words = word_tokenize(text)
    
    # convert to lowercase
    words = [word.lower() for word in words if word.isalpha()]
    
    # remove stop words
    words = [word for word in words if word not in stop_words]
    
    # remove punctuation
    words = [word for word in words if word not in punctuations]

    return ' '.join(words)

XpreprocessTrain = dataTrain['text'].apply(remove_stop_words)
XpreprocessTest = dataTest['text'].apply(remove_stop_words)
ytxtTrain = dataTrain['our rating']
ytxtTest = dataTest['our rating']
XpreprocessTrain[0]


'distracted driving causes deaths canada impaired driving every province territory laws driving operating cell phone tell passengers stay phones driving measures necessary distracted driving claimed lives impaired driving provinces like british columbia ontario quebec alberta nova scotia manitoba newfoundland labrador mobile phones even held passenger dangerous distraction driver starting next week distracted screen held passenger attracts penalty three demerit points drivers screens mix matter holding device using facetime taking selfies driver showing driver funny cat video provinces mobile phone categorised visual display unit meaning considered akin television screen important practice safe driving sake fellow drivers canada cracking distracted driving problem rollout stricter laws impose harsher penalties heftier fines guilty offenders taking effect next week adds serious penalties convicted distracted driving'

In [67]:
# Tokenization
class Tokenizer:
    def __init__(self, dtype: type = np.int64):
        self.txt2token = {}
        self.token2txt = {}
        self.tokens = set()
        self.dtype = dtype

    def fit(self, text: np.ndarray):
        count = 0
        for txt in text:
            for word in txt.split():
                if word not in self.txt2token:
                    self.txt2token[word] = count
                    self.token2txt[count] = word
                    count += 1
                self.tokens.add(self.txt2token[word])

    def histogram(self, text):
        tokenized_text = np.zeros((len(text), len(self.tokens)), dtype=self.dtype)
        for i, txt in enumerate(text):
            for word in txt.split():
                tokenized_text[i][self.txt2token[word]] += 1

        return tokenized_text
    
    def transform(self, text):
        tokenized_text = []
        for txt in text:
            tokenized_text.append(np.array([self.txt2token[word] for word in txt.split() if word in self.txt2token], dtype=self.dtype))
        return tokenized_text


Xtokenizer = Tokenizer(dtype=np.int64)
Xtokenizer.fit(np.concatenate((np.array(XpreprocessTrain), np.array(XpreprocessTest))))
XtokenTrain = Xtokenizer.transform(XpreprocessTrain)
XhistTrain = Xtokenizer.histogram(XpreprocessTrain)
XtokenTest = Xtokenizer.transform(XpreprocessTest)
XhistTest = Xtokenizer.histogram(XpreprocessTest)

Ytokenizer = Tokenizer(dtype=np.uint8)
Ytokenizer.fit(np.concatenate((np.array(ytxtTrain), np.array(ytxtTest))))
YtokenTrain = Ytokenizer.transform(ytxtTrain)
YtokenTest = Ytokenizer.transform(ytxtTest)

XtokenTrain[0], YtokenTrain[0]
            

(array([ 0,  1,  2,  3,  4,  5,  1,  6,  7,  8,  9,  1, 10, 11, 12, 13, 14,
        15, 16,  1, 17, 18,  0,  1, 19, 20,  5,  1, 21, 22, 23, 24, 25, 26,
        27, 28, 29, 30, 31, 32, 33, 16, 34, 35, 36, 37, 38, 39, 40, 41, 42,
         0, 43, 35, 36, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
        57, 58, 39, 59, 39, 60, 61, 62, 21, 33, 12, 63, 64, 65, 66, 67, 68,
        69, 70, 43, 71, 72, 73,  1, 74, 75, 49,  4, 76,  0,  1, 77, 78, 79,
         9, 80, 81, 82, 83, 84, 85, 86, 57, 87, 41, 42, 88, 89, 82, 90,  0,
         1], dtype=int64),
 array([0], dtype=uint8))

In [61]:
# classification
# naive bayes
from collections import defaultdict, Counter

class NaiveBayesClassifier:
    def __init__(self):
        self.classes = None
        self.class_prior = {}
        self.word_likelihood = {}
        self.vocab = set()
        self.word_count = {}

    def fit(self, X, y):
        self.classes = np.unique(np.concatenate(y))
        total_samples = len(y)
        
        # Initialize word count and class count
        self.word_count = {c: Counter() for c in self.classes}
        class_count = Counter()

        for i in range(total_samples):
            label = y[i][0]
            class_count[label] += 1
            for word in X[i]:
                self.word_count[label][word] += 1
                self.vocab.add(word)
        
        # Compute class prior probabilities P(C)
        self.class_prior = {c: count / total_samples for c, count in class_count.items()}
        
        # Compute word likelihood P(W|C) with Laplace smoothing
        self.word_likelihood = {c: {} for c in self.classes}
        vocab_size = len(self.vocab)
        for c in self.classes:
            total_words = sum(self.word_count[c].values())
            for word in self.vocab:
                # Apply Laplace smoothing
                self.word_likelihood[c][word] = (self.word_count[c][word] + 1) / (total_words + vocab_size)

    def predict(self, X):
        predictions = []
        for sample in X:
            log_probs = {}
            for c in self.classes:
                log_probs[c] = np.log(self.class_prior[c])
                for word in sample:
                    if word in self.vocab:  # Only consider words seen in training
                        log_probs[c] += np.log(self.word_likelihood[c][word])
                    else:
                        # Apply Laplace smoothing for unseen words
                        log_probs[c] += np.log(1 / (sum(self.word_count[c].values()) + len(self.vocab)))
            
            predicted_class = max(log_probs, key=log_probs.get)
            predictions.append(predicted_class)
        
        return predictions
    
    
# Train the model
model = NaiveBayesClassifier()
model.fit(XtokenTrain, YtokenTrain)

# Predict the test set
Ypred = model.predict(XtokenTest)

accuracySum = 0
for i in range(len(Ypred)):
    accuracySum += Ypred[i] == YtokenTest[i]
accuracy = accuracySum / len(Ypred)
accuracy

array([0.54738562])

In [93]:
# classification
# neural network
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out
    
def train_nn(X, y, model, criterion, optimizer, num_epochs=5):
    train_dataset = TextDataset(X, y)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    
    for epoch in range(num_epochs):
        for i, (texts, labels) in enumerate(train_loader):
            texts = texts.float()
            labels = labels.long()
            
            # Forward pass
            outputs = model(texts)
            loss = criterion(outputs, labels)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            print (f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item()}')
                
    print('Finished Training')
    
def test_nn(X: torch.Tensor, y: torch.Tensor, model):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for texts, labels in zip(X, y):
            outputs = model(texts)
            _, predicted = torch.max(outputs.data, 0)
            total += 1
            correct += (predicted == labels).sum().item()

        print(f'Accuracy: {100 * correct / total}%')
        
# Hyperparameters
input_size = len(Xtokenizer.tokens)
hidden_size = 500
num_classes = len(Ytokenizer.tokens)
num_epochs = 10
learning_rate = 0.001

# Initialize the model
model = NeuralNetwork(input_size, hidden_size, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
YtokenTrain1D = torch.tensor([y[0] / num_classes for y in YtokenTrain]).float()
YtokenTest1D = torch.tensor([y[0] / num_classes for y in YtokenTest]).float()
maxval = np.max(np.concatenate((XhistTrain, XhistTest)))
Xtrain = torch.tensor(XhistTrain / maxval).float()
Xtest = torch.tensor(XhistTest / maxval).float()



if torch.cuda.is_available():
    model = model.cuda()
    criterion = criterion.cuda()
    
    device = torch.device('cuda:0')
    Xtrain = Xtrain.to(device)
    Xtest = Xtest.to(device)
    YtokenTrain1D = YtokenTrain1D.to(device)
    YtokenTest1D = YtokenTest1D.to(device)
    

train_nn(Xtrain, YtokenTrain1D, model, criterion, optimizer, num_epochs)

# Test the model
test_nn(Xtest, YtokenTest1D, model)

Epoch [1/10], Step [1/40], Loss: 1.4017424583435059
Epoch [1/10], Step [2/40], Loss: 1.391738772392273
Epoch [1/10], Step [3/40], Loss: 1.3836076259613037
Epoch [1/10], Step [4/40], Loss: 1.3734073638916016
Epoch [1/10], Step [5/40], Loss: 1.3645902872085571
Epoch [1/10], Step [6/40], Loss: 1.3603006601333618
Epoch [1/10], Step [7/40], Loss: 1.3504338264465332
Epoch [1/10], Step [8/40], Loss: 1.3403682708740234
Epoch [1/10], Step [9/40], Loss: 1.3311219215393066
Epoch [1/10], Step [10/40], Loss: 1.3099629878997803
Epoch [1/10], Step [11/40], Loss: 1.294071078300476
Epoch [1/10], Step [12/40], Loss: 1.2838865518569946
Epoch [1/10], Step [13/40], Loss: 1.2620855569839478
Epoch [1/10], Step [14/40], Loss: 1.2543872594833374
Epoch [1/10], Step [15/40], Loss: 1.2485963106155396
Epoch [1/10], Step [16/40], Loss: 1.1748766899108887


KeyboardInterrupt: 