Training Model 

In [7]:
import os
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

def plot_confusion_matrix(labels_test, y_pred_round, model_name, fold):
    conf_matrix = confusion_matrix(labels_test, y_pred_round)

    # Plot the confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix ' + model_name + ' fold'+ str(fold))

In [8]:
# Step 1: Import necessary libraries
import xgboost as xgb
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd

y_pred = None

def xg_boost_test(X_train, y_train, X_test, y_test, model_name, fold):
    global y_pred

    xgb_model = xgb.XGBClassifier()
    xgb_model.load_model(f"final/best/{model_name}_fold_{fold}.json")

    # Step 6: Make predictions
    y_pred = xgb_model.predict(X_test)

    # Step 7: Evaluate the model using accuracy, precision, recall, and F1 score
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='binary')  # Use 'macro' for multi-class precision
    recall = recall_score(y_test, y_pred, average='binary')        # Use 'macro' for multi-class recall
    f1 = f1_score(y_test, y_pred, average='binary')                # Use 'macro' for multi-class F1 score

    # Print the results
    print('Model: ' + model_name + ' fold ' + str(fold))
    print(f"Accuracy of XGBoost: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print()

    plot_confusion_matrix(y_test, y_pred, model_name, fold)

    return accuracy, f1, precision, recall

In [9]:
import en_core_web_sm
from docx import Document
import re

nlp = en_core_web_sm.load()
window_size = 1

def sentence_hierarchy(check_sentence, paragraph):
    doc = nlp(paragraph)
    sentences = [sent.text.replace('\r\n', '').replace('\n', '').replace('\r', '').strip() for sent in doc.sents]

    found = False

    text = check_sentence

    for i, sentence in enumerate(sentences):
        if check_sentence.strip() in sentence:
            target_index = i
            found = True
            text = ''

            break

    if not found:
        print('========================')
        print(check_sentence)
        print(sentences)
    else:
        for i_win in range(-window_size,window_size +1):
            if i_win != -window_size:
                text += "[SEP]"

            if target_index + i_win < 0 or target_index + i_win >= len(sentences):
                text += "[NULL]"
            else:
                text += sentences[target_index + i_win]
    return text

def extract_sentences_labels(news_file_path, ex_file_path):

    with open(news_file_path, 'r', encoding='utf-8') as file:
        news_text = file.read().strip()
    with open(ex_file_path, 'r', encoding='utf-8') as file:
        ex_text = file.read().strip()

    lines = news_text.splitlines()
    news_text = "\n".join(lines[1:])

    doc = nlp(news_text)
    # Extract sentences
    sentences = [sent.text.replace('\r\n', '').replace('\n', '').replace('\r', '') for sent in doc.sents]
    labels = []
    news_paths = []

    for sentence in sentences:
        news_paths.append(news_file_path)

        if sentence in ex_text:
            labels.append('1')
        else:
            labels.append('0')

    return news_paths, sentences, labels

def extract_sentences_paragraphs_labels(news_file_path):

    with open(news_file_path, 'r', encoding='utf-8') as file:
        news_text = file.read().strip()

    lines = news_text.splitlines()
    news_text = "\n".join(lines[1:])
    
    sentences = []
    paragraphs = []
    hierarchies = []

    for par in lines[1:]:
        doc = nlp(par)

        p_sentences = [sent.text.replace('\r\n', '').replace('\n', '').replace('\r', '') for sent in doc.sents]
        p_paragraphs = [par.replace('\r\n', '').replace('\n', '').replace('\r', '') for i in p_sentences]
        p_hierarchies = [sentence_hierarchy(sent, par) for sent in p_sentences]

        sentences += p_sentences
        paragraphs += p_paragraphs
        hierarchies += p_hierarchies
    
    labels = []
    news_paths = []

    for sentence in sentences:
        news_paths.append(news_file_path)

        labels.append('0')

    return paragraphs, hierarchies, sentences, labels

def extract_dataset_from_files(news_file_path, annotated_file_path):
    procedural_sentences = read_docx(annotated_file_path)

    with open(news_file_path, 'r', encoding='utf-8') as file:
        news_text = file.read().strip()

    lines = news_text.splitlines()
    news_text = "\n".join(lines[1:])
    
    sentences = []
    paragraphs = []
    hierarchies = []
    labels = []
    paragraph_labels = []

    for par in lines[1:]:
        doc = nlp(par)

        p_sentences = [sent.text.replace('\r\n', '').replace('\n', '').replace('\r', '') for sent in doc.sents]
        p_paragraphs = [par.replace('\r\n', '').replace('\n', '').replace('\r', '') for i in p_sentences]
        p_hierarchies = [sentence_hierarchy(sent, par) for sent in p_sentences]

        p_labels = [0] * len(p_sentences)

        for sentence in p_sentences:
            sentences.append(sentence)
            if(check_phrases_in_sentence(sentence, procedural_sentences)):
                labels.append(1)
                p_labels = [1] * len(p_sentences)
            else:
                labels.append(0)

        # sentences += p_sentences
        paragraphs += p_paragraphs
        hierarchies += p_hierarchies
        paragraph_labels += p_labels

    return paragraphs, hierarchies, sentences, labels, paragraph_labels

def read_docx(file_path):
    doc = Document(file_path)
    full_text = []
    for paragraph in doc.paragraphs:
        full_text.append(paragraph.text)
    return extract_text_inside_brackets('\n'.join(full_text))

def extract_text_inside_brackets(input_string):
    # Define the regex pattern
    pattern = r'<(.*?)>'
    # Find all matches
    matches = re.findall(pattern, input_string)

    sentences = []

    for par in matches:
        doc = nlp(par)
        
        sentences = sentences  + [sent.text.replace('\r\n', '').replace('\n', '').replace('\r', '').replace('(', '').replace(')', '') for sent in doc.sents]


    return sentences

def check_phrases_in_sentence(sentence, phrases_array):
    # Check if any phrase from the array is present in the sentence
    return any(phrase in sentence.replace('(', '').replace(')', '') for phrase in phrases_array)



In [10]:
import tensorflow as tf
import torch
import numpy as np
from datetime import datetime

def text_embedding(texts, pretrained_model, tokenizer, batch_size = 32, pool_type = 1):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Move models to GPU
    pretrained_model = pretrained_model.to(device)

    n = len(texts)
    text = np.zeros((n, pretrained_model.config.hidden_size))
    step = 0

    start_time = datetime.now()

    for i in range(0, n, batch_size):
        end = min(i + batch_size, n)
        batch_texts = texts[i:end]
        
        if pool_type == 1:
            # Tokenize for RoBERTa
            inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
            with torch.no_grad():  # Disable gradient calculation for inference
                outputs = pretrained_model(**inputs)
            embeddings = outputs.pooler_output.cpu().numpy()

        if pool_type == 2:
            # Tokenize for SBERT
            inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
            with torch.no_grad():  # Disable gradient calculation for inference
                outputs = pretrained_model(**inputs)
            token_embeddings = outputs.last_hidden_state
            input_mask_expanded = inputs['attention_mask'].unsqueeze(-1).expand(token_embeddings.size()).float()
            sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
            sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
            embeddings = (sum_embeddings / sum_mask).cpu().numpy()  # Move output back to CPU

        if pool_type == 3:
            inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
            with torch.no_grad():  # Disable gradient calculation for inference
                outputs = pretrained_model(**inputs, output_hidden_states=True)
            hidden_states = outputs.hidden_states
            last_hidden_state = hidden_states[-1]
            embeddings = last_hidden_state.mean(dim=1).detach().cpu().numpy()


        # Store the results in preallocated arrays
        text[i:end] = embeddings

        # Estimate and print remaining time
        if i % (batch_size * 10) == 0 or end == n:  # Reduce the frequency of print statements
            step = end
            delta = datetime.now() - start_time
            resulting_delta = (delta / step) * (n - step)
            hours = resulting_delta.seconds // 3600
            minutes = (resulting_delta.seconds // 60) % 60
            seconds = resulting_delta.seconds % 60
            print(f"\rLoading... {step}/{n} | {hours} hr, {minutes} minutes, {seconds} seconds remaining     ", end='')

    return text

In [11]:
news_path = 'bbc_news\\news_articles'
news_category_path = os.path.join(news_path, 'tech')
news_file_path = os.path.join(news_category_path, '001.txt')

paragraphs, hierarchies, sentences, labels = extract_sentences_paragraphs_labels(news_file_path)

In [12]:
from transformers import AutoModel, AutoTokenizer

s_path = 'model_oversampled_xlnet'
h_path = 'model_oversampled_roberta_h'

tokenizer_s = AutoTokenizer.from_pretrained('jvasdigital/model_oversampled_xlnet')
pretrained_model_s = AutoModel.from_pretrained('jvasdigital/model_oversampled_xlnet')


tokenizer_h = AutoTokenizer.from_pretrained('jvasdigital/model_oversampled_roberta_h')
pretrained_model_h = AutoModel.from_pretrained('jvasdigital/model_oversampled_roberta_h')

text_test_s = text_embedding(sentences, pretrained_model_s, tokenizer_s, batch_size = 128, pool_type = 3)
text_test_h = text_embedding(hierarchies, pretrained_model_h, tokenizer_h, batch_size = 128, pool_type = 1)

combined_data_train = np.concatenate([text_test_h, text_test_s], axis=-1)



Loading... 32/32 | 0 hr, 0 minutes, 0 seconds remaining     

In [13]:
xgb_model = xgb.XGBClassifier()
xgb_model.load_model(f"final/best/xlnet + roberta oversampled_fold_1.json")

# Step 6: Make predictions
y_pred = xgb_model.predict(combined_data_train)

extracted = [sentences[i] for i in range(len(sentences)) if y_pred[i] == 1]

print('Predicted procedural sentences:')
for i in range(len(extracted)):
    print(extracted[i])

Predicted procedural sentences:
The Kyrgyz Republic, a small, mountainous state of the former Soviet republic, is using invisible ink and ultraviolet readers in the country's elections as part of a drive to prevent multiple voting.
In an effort to live up to its reputation in the 1990s as "an island of democracy", the Kyrgyz President, Askar Akaev, pushed through the law requiring the use of ink during the upcoming Parliamentary and Presidential elections.
The US government agreed to fund all expenses associated with this decision.
However, the presence of ultraviolet light (of the kind used to verify money) causes the ink to glow with a neon yellow light.
If the ink shows under the UV light the voter will not be allowed to enter the polling station.
