In [1]:
import random
import re
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque

# Function to clean text
def clean_text(text):
    text = re.sub(r'<.*?>', '', text)  # remove HTML tags
    text = re.sub(r'[^\w\s]', '', text)  # remove special characters
    text = re.sub(r'\d+', '', text)  # remove numbers
    text = text.lower().strip()  # lowercase and strip whitespace
    return text

# Load and clean data
df = pd.read_csv("data.csv")


In [2]:
df.head()

Unnamed: 0,Sentence,Sentiment
0,The GeoSolutions technology will leverage Bene...,positive
1,"$ESI on lows, down $1.50 to $2.50 BK a real po...",negative
2,"For the last quarter of 2010 , Componenta 's n...",positive
3,According to the Finnish-Russian Chamber of Co...,neutral
4,The Swedish buyout firm has sold its remaining...,neutral


In [3]:
df.shape

(5842, 2)

In [4]:
df.size

11684

In [5]:
df = df.sample(1000, random_state=42, )

In [6]:
df.head()

Unnamed: 0,Sentence,Sentiment
4584,A few employees would remain at the Oulu plant...,neutral
177,Comparable net sales are expected to increase ...,positive
167,"Tesla is recalling 2,700 Model X cars: https:/...",negative
5585,Finnish software developer Done Solutions Oyj ...,positive
2339,Compagnie de Financement Foncier - Is to issue...,neutral


In [7]:
df.size

2000

In [8]:
df.dropna(inplace=True)  # Remove missing values
df.drop_duplicates(inplace=True)  # Remove duplicates
df = df[df['Sentiment'].isin(['positive', 'neutral', 'negative'])]  # Keep only known sentiments
df['Sentence'] = df['Sentence'].apply(clean_text)  # Clean sentences

# Preprocess text and labels
sentences = df['Sentence'].values
labels = df['Sentiment'].map({'negative': 0, 'neutral': 1, 'positive': 2}).values

# TF-IDF encoding
vectorizer = TfidfVectorizer(max_features=1000)
X = vectorizer.fit_transform(sentences).toarray()

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, random_state=42)

# DQN Parameters
state_size = X.shape[1]
num_actions = 3
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
gamma = 0.95
lr = 0.001
batch_size = 64
memory = deque(maxlen=2000)

# DQN Model
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.out = nn.Linear(64, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.out(x)

model = DQN(state_size, num_actions)
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.MSELoss()

# Epsilon-greedy policy
def act(state):
    global epsilon
    if np.random.rand() <= epsilon:
        return random.randrange(num_actions)
    state = torch.FloatTensor(state).unsqueeze(0)
    with torch.no_grad():
        q_values = model(state)
    return torch.argmax(q_values).item()

# Replay function
def replay():
    global epsilon
    if len(memory) < batch_size:
        return
    minibatch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in minibatch:
        state = torch.FloatTensor(state)
        next_state = torch.FloatTensor(next_state)
        target = reward
        if not done:
            target += gamma * torch.max(model(next_state)).item()
        target_f = model(state)
        target_val = target_f.clone()
        target_val[action] = target
        optimizer.zero_grad()
        loss = loss_fn(target_f, target_val.detach())
        loss.backward()
        optimizer.step()
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

# Training loop
episodes = 10
for e in range(episodes):
    for i in range(len(X_train)):
        state = X_train[i]
        action = act(state)
        reward = 1 if action == y_train[i] else -1
        done = True  # One-step
        next_state = X_train[i]
        memory.append((state, action, reward, next_state, done))
        replay()
    print(f"Episode {e+1}/{episodes} completed. Epsilon: {epsilon:.2f}")

# Evaluation
correct = 0
total = len(X_test)
for i in range(total):
    state = torch.FloatTensor(X_test[i]).unsqueeze(0)
    with torch.no_grad():
        pred = torch.argmax(model(state)).item()
    if pred == y_test[i]:
        correct += 1
print(f"Test Accuracy: {100 * correct / total:.2f}%")

Episode 1/10 completed. Epsilon: 0.02
Episode 2/10 completed. Epsilon: 0.01
Episode 3/10 completed. Epsilon: 0.01
Episode 4/10 completed. Epsilon: 0.01
Episode 5/10 completed. Epsilon: 0.01
Episode 6/10 completed. Epsilon: 0.01
Episode 7/10 completed. Epsilon: 0.01
Episode 8/10 completed. Epsilon: 0.01
Episode 9/10 completed. Epsilon: 0.01
Episode 10/10 completed. Epsilon: 0.01
Test Accuracy: 43.50%
