In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.isri import ISRIStemmer
import pandas as pd
import random
import requests
from bs4 import BeautifulSoup


# Web Scraping with Error Handling
def scrape_arabic_texts(urls):
    texts = []
    for url in urls:
        try:
            response = requests.get(url)
            response.raise_for_status()  # Ensure the request was successful
            soup = BeautifulSoup(response.content, 'html.parser')
            # Adjust based on the site's structure
            articles = soup.find_all('p')
            for article in articles:
                text = article.get_text(strip=True)
                if text and len(text.split()) > 5:  # Filter very short texts
                    texts.append(text)
        except Exception as e:
            print(f"Error scraping {url}: {e}")
    return texts

# Example websites
urls = [
    "https://www.aljazeera.net/sport",
    "https://arabic.cnn.com/sport"
]

# Scrape Arabic texts
texts = scrape_arabic_texts(urls)

# Assign random relevance scores (replace with meaningful scoring if possible)
scores = [random.uniform(0, 10) for _ in texts]

# Create DataFrame
dataset = pd.DataFrame({'Text': texts, 'Score': scores})
print(dataset.head())

                                                Text     Score
0  وضع ناصر الخليفي رئيس نادي باريس سان جيرمان ال...  6.465954
1  خسر النجم البرتغالي كريستيانو رونالدو لاعب الن...  6.036766
2  تغطية مباشرة لمباراة الهلال السعودي ضد الغرافة...  6.521478
3  تجاوز فريق برشلونة كبوته، محققا فوزا عريضا خار...  3.712635
4  ردت اللجنة الأولمبية الدولية على احتمال تعارض ...  8.656187


In [6]:
import nltk
nltk.download('stopwords')
import nltk
nltk.download('punkt_tab')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [7]:

# Arabic NLP Preprocessing Pipeline
stemmer = ISRIStemmer()
stop_words = set(stopwords.words("arabic"))

def preprocess_text(text):
    # Tokenize
    tokens = word_tokenize(text)
    # Normalize (remove diacritics and special characters)
    tokens = [word for word in tokens if word.isalnum()]
    # Remove stopwords
    tokens = [word for word in tokens if word not in stop_words]
    # Stemming
    tokens = [stemmer.stem(word) for word in tokens]
    return " ".join(tokens)

# Apply preprocessing to the dataset
dataset['Processed_Text'] = dataset['Text'].apply(preprocess_text)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(dataset['Processed_Text'], dataset['Score'], test_size=0.2, random_state=42)

# Convert text data to indices (simplified)
vocab = {}
def build_vocab(texts):
    index = 0
    for text in texts:
        tokens = word_tokenize(text)
        for token in tokens:
            if token not in vocab:
                vocab[token] = index
                index += 1
    vocab["<UNK>"] = len(vocab)  # Add unknown token
build_vocab(X_train)

# Convert the texts into indices
def text_to_indices(text, vocab):
    tokens = word_tokenize(text)
    return [vocab.get(token, vocab["<UNK>"]) for token in tokens]

X_train_indices = [text_to_indices(text, vocab) for text in X_train]
X_test_indices = [text_to_indices(text, vocab) for text in X_test]

# Pad sequences to have the same length (optional, here we use max length of training data)
max_len = max([len(seq) for seq in X_train_indices])
X_train_indices = [seq + [0] * (max_len - len(seq)) for seq in X_train_indices]
X_test_indices = [seq + [0] * (max_len - len(seq)) for seq in X_test_indices]

# Convert to tensors
X_train_tensor = torch.tensor(X_train_indices, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_indices, dtype=torch.long)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32)

# Create DataLoader
train_data = TensorDataset(X_train_tensor, y_train_tensor)
test_data = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)


In [8]:

# Define different RNN architectures

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(RNNModel, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        rnn_out, _ = self.rnn(embedded)
        out = self.fc(rnn_out[:, -1, :])
        return out

class BidirectionalRNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(BidirectionalRNNModel, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = nn.RNN(hidden_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(2 * hidden_size, output_size)  # Multiply by 2 for bidirectional

    def forward(self, x):
        embedded = self.embedding(x)
        rnn_out, _ = self.rnn(embedded)
        out = self.fc(rnn_out[:, -1, :])
        return out

In [9]:
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(GRUModel, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        gru_out, _ = self.gru(embedded)
        out = self.fc(gru_out[:, -1, :])
        return out

In [10]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        out = self.fc(lstm_out[:, -1, :])
        return out


In [11]:

# Choose model type (You can change this to 'RNN', 'BidirectionalRNN', 'GRU', or 'LSTM')
model_type = 'LSTM'  # You can also try 'RNN', 'BidirectionalRNN', 'GRU'

# Instantiate the selected model
if model_type == 'RNN':
    model = RNNModel(input_size=len(vocab), hidden_size=128, output_size=1)
elif model_type == 'BidirectionalRNN':
    model = BidirectionalRNNModel(input_size=len(vocab), hidden_size=128, output_size=1)
elif model_type == 'GRU':
    model = GRUModel(input_size=len(vocab), hidden_size=128, output_size=1)
elif model_type == 'LSTM':
    model = LSTMModel(input_size=len(vocab), hidden_size=128, output_size=1)

# Define loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Function to compute accuracy (based on absolute error)
def compute_accuracy(y_pred, y_true, threshold=1.0):
    abs_error = torch.abs(y_pred - y_true)
    correct = (abs_error <= threshold).float()
    accuracy = correct.sum() / correct.size(0)
    return accuracy

# Train the model
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

# Evaluate the model
model.eval()
with torch.no_grad():
    y_pred = []
    y_true = []
    accuracies = []
    for inputs, labels in test_loader:
        outputs = model(inputs)
        y_pred.extend(outputs.squeeze().tolist())
        y_true.extend(labels.tolist())

        # Compute accuracy for the batch
        batch_accuracy = compute_accuracy(outputs.squeeze(), labels)
        accuracies.append(batch_accuracy.item())

# Calculate MSE and accuracy on test data
mse = criterion(torch.tensor(y_pred), torch.tensor(y_true))
accuracy = sum(accuracies) / len(accuracies)

print(f"Test MSE: {mse.item()}")


Epoch 1/5, Loss: 35.75990295410156
Epoch 2/5, Loss: 31.750770568847656
Epoch 3/5, Loss: 28.2264347076416
Epoch 4/5, Loss: 25.14720916748047
Epoch 5/5, Loss: 22.46776008605957
Test MSE: 6.828156471252441
