In [16]:
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import sent_tokenize
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer , TfidfVectorizer
import re
import torch as t 
from torch.utils.data import TensorDataset , DataLoader 
import numpy as np
from pprint import pprint
from sklearn.metrics import classification_report , confusion_matrix
from sklearn.model_selection import train_test_split


Raw Text ↓ Data Split ↓ Cleaning ↓ TF-IDF (fit on train only) ↓ Transform val/test ↓ Dataset & DataLoader ↓ MLP (with Dropout) ↓ Loss (BCE or CrossEntropy) ↓ Optimizer (Adam) ↓ Training Loop ↓ Validation ↓ Evaluation ↓ Save model + vectorizer ↓ Inference

In [17]:
class Pipeline:
    def __init__(self , 
                split_data:bool , 
                feature_extraction:str,
                data):
        
        self.split_data = split_data
        self.feature_extraction = feature_extraction
        self.data = data
        self.model = None 
        self.vectorizer = None
    
    def clean_data(self):
            """
            _summary_
            Clean dataset : tokenized,stopwords,lemmatization
            Returns:
                X , Y
                _type_: Dataframe(text,label) 
            """
            df = pd.read_csv(self.data)
            tokenized = []
            Lematizer = WordNetLemmatizer()
            stop_words = stopwords.words("english")
            text = []
            for sentence in df["text"]:
                t_sentence = sent_tokenize(sentence)
                tokenized.append(t_sentence)
                for word in t_sentence :
                    review = word.lower()
                    review = review.split()
                    review = [Lematizer.lemmatize(word) for word in review if word not in stop_words]
                    text.append(review)
            
                                
            new_df = pd.DataFrame({
                "text": text,
                "label": df["label"]
            })
            X = new_df["text"]
            Y = new_df["label"].squeeze() 
            return X , Y
            
    def split_clean_data(self):
        """_summary_
        Check split_data(bool)
        True : split_data
        False : do not split_data
        Returns:
            _type_: _description_
        """
        assert len(self.data) != 0 
    
        if self.split_data:
            X , Y = self.clean_data()
            X_train, X_test, Y_train, Y_test = train_test_split(
                X, Y, test_size=0.2, random_state=42
            )
            return X_train, X_test, Y_train, Y_test
        else: 
            X , Y = self.clean_data()
            return X , Y
        
    def set_vectorizer(self):
        assert len(self.feature_extraction) >= 3 
                    
        if len(self.split_clean_data()) == 4 :
            X_train, X_test, Y_train, Y_test = self.split_clean_data()
                        
            X_train = [" ".join(doc) if isinstance(doc, list) else doc for doc in X_train]
            X_test = [" ".join(doc) if isinstance(doc, list) else doc for doc in X_test]
            
            Y_train = [" ".join(doc) if isinstance(doc, list) else doc for doc in Y_train]
            Y_test = [" ".join(doc) if isinstance(doc, list) else doc for doc in Y_test]
            
        else :
            X , Y = self.split_clean_data()
            X = [" ".join(doc) if isinstance(doc, list) else doc for doc in X]
            Y = [" ".join(doc) if isinstance(doc, list) else doc for doc in Y]
        
        if self.feature_extraction == "tfidf":
            self.vectorizer = TfidfVectorizer()
            X_train_vectorized = self.vectorizer.fit_transform(X_train)
            X_test_vectorized = self.vectorizer.transform(X_test)
            return X_train_vectorized, X_test_vectorized, Y_train, Y_test
                
        if self.feature_extraction == "bow":
            self.vectorizer = CountVectorizer()
            
            if self.split_data:  # if you have train/test split
                X_train_vectorized = self.vectorizer.fit_transform(X_train)
                X_test_vectorized = self.vectorizer.transform(X_test)
                return X_train_vectorized, X_test_vectorized, Y_train, Y_test
            else:  # no split
                X_vectorized = self.vectorizer.fit_transform(X)
                return X_vectorized, Y
        
    def utilities(self):
        if len(self.set_vectorizer()) == 4 :
            X_train_vectorized , X_test_vectorized , Y_train , Y_test = self.set_vectorizer()
            
            
            X_train_vectorized = t.tensor(
            X_train_vectorized.toarray(),
            dtype=t.float32
            )
            Y_train = t.LongTensor(Y_train)
            Y_test = t.LongTensor(Y_test)
            
            X_test_vectorized = t.tensor(
            X_test_vectorized.toarray(),
            dtype=t.float32
            )
            
            train_dataset = TensorDataset(X_train_vectorized , Y_train)
            test_dataset =  TensorDataset(X_test_vectorized , Y_test)
            train_data_loader = DataLoader(train_dataset , 64 , shuffle=True)
            test_data_loader = DataLoader(test_dataset , 64 , shuffle=False)
            
            return train_data_loader , test_data_loader 
        
        else : 
            X_vectorized , Y_vectorized = self.set_vectorizer()
            X_vectorized = t.tensor(
            X_vectorized.toarray(),
            dtype=t.float32
            )    
            Y_vectorized = t.LongTensor(Y_vectorized)
            train_dataset = TensorDataset(X_vectorized )
            test_dataset =  TensorDataset(Y_vectorized )
            train_data_loader = DataLoader(train_dataset , 64 , shuffle=True)
            test_data_loader = DataLoader(test_dataset , 64 , shuffle=False)
            
            return train_data_loader , test_data_loader
    
    def create_model(self , in_features , out_features):
        class Model(t.nn.Module):
            def __init__(self, in_features , out_features):
                super().__init__()
                self.network = t.nn.Sequential(
                    t.nn.Linear(in_features , 256),
                    t.nn.ReLU(),
                    t.nn.Dropout(0.3),
                    t.nn.Linear(256 , 128),
                    t.nn.ReLU(),
                    t.nn.Dropout(0.3),
                    t.nn.Linear(128 , out_features),
                )
            
            def forward(self , x):
                return self.network(x)
    
        return Model(in_features, out_features)
    
    def train(self , epochs , batch_size ,lr=0.0001):
        train_data_loader , test_data_loader = self.utilities()
        device = "cuda" if t.cuda.is_available() else "cpu"
        
        
        # Get one batch to determine input size
        sample_batch = next(iter(train_data_loader))
        input_size = sample_batch[0].shape[1]
        num_classes = len(t.unique(sample_batch[1]))

        self.model = self.create_model(input_size, num_classes).to(device)
                

        
        c = t.nn.CrossEntropyLoss()
        optim = t.optim.Adam(self.model.parameters() , lr=lr)
        
        all_preds = []
        all_labels = []

        for epoch in range(epochs):
            for idx , (batch_x , batch_y) in enumerate(train_data_loader):
                batch_x = batch_x.to(device)
                batch_y = batch_y.to(device)

                y_pred = self.model(batch_x)
                loss = c(y_pred , batch_y)

                optim.zero_grad()
                loss.backward()
                optim.step()
                

                all_preds.append(y_pred.argmax(dim=1).cpu())
                all_labels.append(batch_y.cpu())

                if idx % 50 == 0 :
                    print(f"Epoch {epoch}, Batch {idx}, Loss: {loss.item():.4f}")


        all_preds = t.cat(all_preds)
        all_labels = t.cat(all_labels)

        cr = classification_report(all_labels.numpy(), all_preds.numpy())
        cm = confusion_matrix(all_labels.numpy(), all_preds.numpy())
        print(cr)
        print(cm)
        return self.model
    
    def predict(self, text):
        # Preprocessing: same as training
        stop_words = stopwords.words("english")
        lemmatizer = WordNetLemmatizer()
        
        review = text.lower().split()
        final = [lemmatizer.lemmatize(word) for word in review if word not in stop_words]
        clean_text = " ".join(final)

        # Use the trained vectorizer from training
        X = self.vectorizer.transform([clean_text]).toarray()

        # Convert to tensor and send to the correct device
        device = "cuda" if t.cuda.is_available() else "cpu"
        X_tensor = t.FloatTensor(X).to(device)

        # Make prediction
        self.model.eval()
        with t.no_grad():
            output = self.model(X_tensor)
            pred = output.argmax(dim=1).item()
            probabilities = t.nn.functional.softmax(output, dim=1)

        return pred, probabilities.cpu().numpy()
        
        
        
        
        
        
        
        

In [18]:
pip = Pipeline(True , "bow" , "training.csv")
pip.train(15 , 64 )


sentence = "I feel tired"

pip.predict(sentence)

Epoch 0, Batch 0, Loss: 1.7808
Epoch 0, Batch 50, Loss: 1.7627
Epoch 0, Batch 100, Loss: 1.6749
Epoch 0, Batch 150, Loss: 1.6013
Epoch 1, Batch 0, Loss: 1.6382
Epoch 1, Batch 50, Loss: 1.4477
Epoch 1, Batch 100, Loss: 1.4465
Epoch 1, Batch 150, Loss: 1.3553
Epoch 2, Batch 0, Loss: 1.3182
Epoch 2, Batch 50, Loss: 1.1685
Epoch 2, Batch 100, Loss: 1.1184
Epoch 2, Batch 150, Loss: 0.9720
Epoch 3, Batch 0, Loss: 0.9371
Epoch 3, Batch 50, Loss: 0.7701
Epoch 3, Batch 100, Loss: 0.6983
Epoch 3, Batch 150, Loss: 0.7922
Epoch 4, Batch 0, Loss: 0.5517
Epoch 4, Batch 50, Loss: 0.4081
Epoch 4, Batch 100, Loss: 0.6807
Epoch 4, Batch 150, Loss: 0.4911
Epoch 5, Batch 0, Loss: 0.5453
Epoch 5, Batch 50, Loss: 0.5897
Epoch 5, Batch 100, Loss: 0.4313
Epoch 5, Batch 150, Loss: 0.3138
Epoch 6, Batch 0, Loss: 0.3395
Epoch 6, Batch 50, Loss: 0.3537
Epoch 6, Batch 100, Loss: 0.2584
Epoch 6, Batch 150, Loss: 0.3109
Epoch 7, Batch 0, Loss: 0.2437
Epoch 7, Batch 50, Loss: 0.2417
Epoch 7, Batch 100, Loss: 0.2269
E

(0,
 array([[0.5246639 , 0.40335864, 0.00270367, 0.05045561, 0.01544334,
         0.00337479]], dtype=float32))