# Importing

In [1]:
import pandas as pd
import numpy as np
import Funct_modelling
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer

  Referenced from: <FB2FD416-6C4D-3621-B677-61F07C02A3C5> /Users/fauzanghaza/Applications/miniconda3/envs/ML/lib/python3.9/site-packages/torchvision/image.so
  warn(


In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
# Hyper parameter
input_size = 15474
hidden_size = 100
num_epochs = 100
batch_size = 128
learning_rate = 0.001

# Preprocess & vectorizing

In [4]:
class ElectionDataset(Dataset):
    def __init__(self, features, labels, train=True, transform=None, test_size=0.2, random_state=42):
        """
        Args:
            features (numpy array): The feature array of shape (num_samples, num_features)
            labels (numpy array): The label array of shape (num_samples,)
            train (bool): If True, loads training data, if False loads test data
            transform (callable, optional): Optional transform to be applied on a sample
            test_size (float): Proportion of the dataset to include in the test split
            random_state (int): Seed for reproducibility of the split
        """
        self.transform = transform
        self.train = train
        X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=random_state)

        if self.train:
            self.x = X_train
            self.y = y_train
        else:
            self.x = X_test
            self.y = y_test

        self.n_samples = len(self.x)
        
    def __getitem__(self, index):
        if index >= self.n_samples:
            raise IndexError("Index out of range")

        sample = self.x[index], self.y[index]
        
        if self.transform:
            sample = self.transform(sample)
        
        return sample
    
    def __len__(self):
        return self.n_samples


In [5]:
class ToTensor:
    def __call__(self, sample):
        inputs, label = sample
        return torch.tensor(inputs, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)
    
class DropNA:
    def __call__(self, sample):
        inputs, label = sample
        inputs = pd.DataFrame(inputs)
        inputs = inputs.dropna(axis=0, how='any') 
        label = label[inputs.index]
        print(label)
        print(inputs.index)
        
        return inputs.values, label
    
class Preprocessing:
    def __init__(self, df):
        self.df = df

    def undersampling(self):
        x_p = self.df[self.df['label'] == 'Positive']
        x_n = self.df[self.df['label'] == 'Negative']
        x_temp = x_p.sample(x_n.label.count(), random_state=42)
        x_under = pd.concat([x_temp, x_n], axis=0, ignore_index=True)
        return x_under

    def tfidf_vec(self, data):
        # Transform the text data using TF-IDF
        vectorizer = TfidfVectorizer()
        data = vectorizer.fit_transform(data)
        data = data.toarray()
        data = pd.DataFrame(data, columns=vectorizer.get_feature_names_out())
        return data


    def process(self, tag=False):
        self.df = self.undersampling()
        tag_data = []

        if tag:
            tag_data = self.df[self.df.columns[-20:]]

        X = self.df['no_stopwords']
        y = self.df['label']

        # Convert the text data using TF-IDF
        X = self.tfidf_vec(X)

        # Optionally concatenate tag data if required
        if tag:
            tag_data = tag_data.values  # Convert tag data to numpy if present
            X = np.hstack((X, tag_data))
        
        y = y.values

        # Train-test split
        return X, y

In [6]:
df = pd.read_csv('../../preprocessing/Training/data/clean/regular/gabungan.csv')
df = df.dropna()

In [7]:
preprocessor = Preprocessing(df)
X, y = preprocessor.process(tag=False)
X = X.to_numpy()
label_mapping = {
    'Positive': 1,
    'Negative': 0
}
# Apply the mapping to your labels
y = np.array([label_mapping[label] for label in y])


In [8]:
transformer = torchvision.transforms.Compose([ToTensor()])

In [9]:
# Assuming 'X' and 'y' are your features and labels
train_dataset = ElectionDataset(features=X, labels=y, train=True, transform=transformer, random_state=42)
test_dataset = ElectionDataset(features=X, labels=y, train=False, transform=transformer, random_state=42)

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


In [10]:
examples = iter(train_loader)
example_data, example_targets = next(examples)
print(example_data)
print(example_data.shape, example_targets.shape)

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
torch.Size([128, 15474]) torch.Size([128])


# Modeling

In [17]:
class NeuralNet(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(NeuralNet, self).__init__()
        self.input_size = input_size
        
        self.l1 = nn.Linear(input_size, hidden_size)
        self.relu1 = nn.ReLU()
        
        self.l2 = nn.Linear(hidden_size, hidden_size * 2) 
        self.relu2 = nn.LeakyReLU(negative_slope=0.01)
        
        self.l3 = nn.Linear(hidden_size * 2, hidden_size * 2)
        self.relu3 = nn.LeakyReLU(negative_slope=0.01)
        
        self.l4 = nn.Linear(hidden_size * 2, hidden_size)
        self.relu4 = nn.ReLU()
        
        self.l5 = nn.Linear(hidden_size, hidden_size)
        self.relu5 = nn.LeakyReLU(negative_slope=0.01)
        
        self.l6 = nn.Linear(hidden_size, hidden_size // 2)
        self.relu6 = nn.ReLU()

        self.l7 = nn.Linear(hidden_size // 2, 1)
        
        self.dropout = nn.Dropout(p=0.5)
        self.batch_norm1 = nn.BatchNorm1d(hidden_size * 2)
        self.batch_norm2 = nn.BatchNorm1d(hidden_size)
    
    def forward(self, x):
        # First layer
        out = self.l1(x)
        out = self.relu1(out)
        
        # Second layer (with BatchNorm and Dropout)
        out = self.l2(out)
        out = self.relu2(out)
        out = self.batch_norm1(out)
        out = self.dropout(out)
        
        # Third layer
        out = self.l3(out)
        out = self.relu3(out)
        out = self.batch_norm1(out)
        out = self.dropout(out)
        
        # Fourth layer
        out = self.l4(out)
        out = self.relu4(out)
        
        # Fifth layer
        out = self.l5(out)
        out = self.relu5(out)
        
        # Sixth layer (bottleneck)
        out = self.l6(out)
        out = self.relu6(out)
        
        # Final output layer
        out = self.l7(out)
        out = torch.sigmoid(out)  # Sigmoid for binary classification
        
        return out

In [18]:
model = NeuralNet(input_size, hidden_size).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# Training

In [19]:
# training loop
n_total_steps = len(train_loader)
for epoch in range(num_epochs):
    for i, (features, labels) in enumerate(train_loader):
        features = features.to(device)
        labels = labels.to(device)
        labels = labels.view(-1, 1)
        # forward
        output = model(features)
        loss = criterion(output, labels)
        
        # backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if ((i+1) % 22 == 0):
            print(f'epoch {epoch+1}/{num_epochs}, step {i+1}/{n_total_steps}, loss = {loss.item():.4f}')

epoch 1/100, step 22/110, loss = 0.7045
epoch 1/100, step 44/110, loss = 0.6908
epoch 1/100, step 66/110, loss = 0.6994
epoch 1/100, step 88/110, loss = 0.6943
epoch 1/100, step 110/110, loss = 0.6961
epoch 2/100, step 22/110, loss = 0.6947
epoch 2/100, step 44/110, loss = 0.6895
epoch 2/100, step 66/110, loss = 0.6967
epoch 2/100, step 88/110, loss = 0.7000
epoch 2/100, step 110/110, loss = 0.7032
epoch 3/100, step 22/110, loss = 0.6926
epoch 3/100, step 44/110, loss = 0.6960
epoch 3/100, step 66/110, loss = 0.7014
epoch 3/100, step 88/110, loss = 0.6933
epoch 3/100, step 110/110, loss = 0.6751
epoch 4/100, step 22/110, loss = 0.6908
epoch 4/100, step 44/110, loss = 0.6924
epoch 4/100, step 66/110, loss = 0.6999
epoch 4/100, step 88/110, loss = 0.6877
epoch 4/100, step 110/110, loss = 0.6914
epoch 5/100, step 22/110, loss = 0.6890
epoch 5/100, step 44/110, loss = 0.7025
epoch 5/100, step 66/110, loss = 0.6951
epoch 5/100, step 88/110, loss = 0.6921
epoch 5/100, step 110/110, loss = 0.

# Score Naive Bayes untag

In [28]:
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    for features, labels in test_loader:
        features = features.to(device)
        labels = labels.to(device)
        labels = labels.view(-1, 1)  # Make sure labels are in the same shape as output
        output = model(features)
        
        # Use sigmoid to get probabilities for binary classification
        predictions = (output >= 0.5).float()  # Round the output (sigmoid result) to 0 or 1
        
        n_samples += labels.shape[0]
        n_correct += (predictions == labels).sum().item()
        
    acc = 100.0 * n_correct / n_samples
    
    print(f'correct = {n_correct}/{n_samples}')
    print(f'Accuracy = {acc:.2f}%')


correct = 1984/3509
Accuracy = 56.54%


In [30]:
torch.save(model.state_dict(), 'model/pytorch.pth')

In [33]:
# Define the model again (must match the saved architecture)
model = NeuralNet(input_size=input_size, hidden_size=hidden_size)

# Load the state_dict into the model
model.load_state_dict(torch.load('model/pytorch.pth'))

# Set the model to evaluation mode
model.eval()

  model.load_state_dict(torch.load('model/pytorch.pth'))


NeuralNet(
  (l1): Linear(in_features=15474, out_features=100, bias=True)
  (relu1): ReLU()
  (l2): Linear(in_features=100, out_features=200, bias=True)
  (relu2): LeakyReLU(negative_slope=0.01)
  (l3): Linear(in_features=200, out_features=200, bias=True)
  (relu3): LeakyReLU(negative_slope=0.01)
  (l4): Linear(in_features=200, out_features=100, bias=True)
  (relu4): ReLU()
  (l5): Linear(in_features=100, out_features=100, bias=True)
  (relu5): LeakyReLU(negative_slope=0.01)
  (l6): Linear(in_features=100, out_features=50, bias=True)
  (relu6): ReLU()
  (l7): Linear(in_features=50, out_features=1, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (batch_norm1): BatchNorm1d(200, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm2): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)