In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
from urllib.parse import urlparse

class MITREExtractor:
    #necessary components, the init method, just captures the url and setting arguments for the upcoming methods
    def __init__(self, url, max_pages=20):
        self.driver = webdriver.Chrome() 
        self.driver.get(url)
        self.max_pages = max_pages
        self.page_count = 0
        self.base_domain = urlparse(url).netloc

    def extract_text_data(self):
        text_data = ""
        body_element = self.driver.find_element(By.TAG_NAME, 'body')
        text_data = body_element.text
        return text_data

    def find_next_visible_link(self):
        links = self.driver.find_elements(By.TAG_NAME, 'a')
        for link in links:
            if link.is_displayed():
                href = link.get_attribute('href')
                if href and urlparse(href).netloc == self.base_domain:
                    return link
        return None

    def extract_and_save(self, output_file):
        while self.page_count < self.max_pages:
            # Extract text data from the current page
            page_text_data = self.extract_text_data()
            
            # Append the text data to a file
            with open(output_file, 'a', encoding='utf-8') as file:
                file.write("%s\n\n" % page_text_data)
            
            # Find the next visible link within the same domain and click it
            next_link = self.find_next_visible_link()
            if next_link:
                next_link.click()
                self.page_count += 1
                time.sleep(2)  # Wait for the next page to load
            else:
                print("No more visible links within the domain to follow.")
                break

        # Close the WebDriver
        self.driver.quit()

# Usage
extractor = MITREExtractor("https://attack.mitre.org/")
extractor.extract_and_save('extracted_text_data.txt')

In [1]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

# Set up Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless")  # Run in headless mode (optional)

# Set up the Chrome WebDriver (update the path to where you saved chromedriver)
service = Service('/path/to/chromedriver')
driver = webdriver.Chrome(service=service, options=chrome_options)

# Navigate to the MITRE ATT&CK website
url = "https://attack.mitre.org/"
driver.get(url)

# Wait for the page to load (adjust the time if needed)
time.sleep(5)

# Find all 'a' tags
links = driver.find_elements(By.TAG_NAME, 'a')

# Extract href and text from each link
extracted_links = []
for link in links:
    href = link.get_attribute('href')
    text = link.text
    if href and text:
        extracted_links.append(f"{text}: {href}")

# Close the browser
driver.quit()

# Save the extracted links to a text file
with open('mitre_attack_links.txt', 'w', encoding='utf-8') as f:
    for link in extracted_links:
        f.write(link + '\n')

print(f"Extracted {len(extracted_links)} links and saved to mitre_attack_links.txt")

NoSuchDriverException: Message: Unable to obtain driver for chrome; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors/driver_location


In [None]:
from transformers import BertTokenizer, BertForSequenceClassification, BertForQuestionAnswering, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset
import json

# Load the JSON data
with open('dataset.json', 'r') as file:
    data = json.load(file)

# Extract texts and uniqueIds
texts = [entry['description'] for entry in data]
unique_ids = [entry['uniqueId'] for entry in data]

# Create a mapping from uniqueId to numerical labels
unique_id_to_label = {unique_id: idx for idx, unique_id in enumerate(set(unique_ids))}
label_to_unique_id = {idx: unique_id for unique_id, idx in unique_id_to_label.items()}
labels = [unique_id_to_label[unique_id] for unique_id in unique_ids]

class SecurityDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Load tokenizer and model for classification
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
classification_model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(unique_id_to_label))

# Create dataset and data loaders for classification
dataset = SecurityDataset(texts, labels, tokenizer, max_len=128)
train_loader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True)

# Training arguments for classification
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
)

# Trainer for classification
trainer = Trainer(
    model=classification_model,
    args=training_args,
    train_dataset=dataset,
    eval_dataset=dataset
)

# Train the classification model
trainer.train()

# Save the classification model and tokenizer
classification_model.save_pretrained('./fine_tuned_classification_model')
tokenizer.save_pretrained('./fine_tuned_classification_model')

# Load tokenizer and model for passage retrieval (question answering)
qa_model = BertForQuestionAnswering.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')

# Function to classify raw text content and retrieve supporting passage
def classify_and_retrieve(text, classification_model, qa_model, tokenizer, label_to_unique_id, document):
    # Classification
    encoding = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=128,
        return_token_type_ids=False,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt',
    )
    input_ids = encoding['input_ids']
    attention_mask = encoding['attention_mask']
    
    outputs = classification_model(input_ids, attention_mask=attention_mask)
    _, prediction = torch.max(outputs.logits, dim=1)
    predicted_id = label_to_unique_id[prediction.item()]
    
    # Passage retrieval
    qa_encoding = tokenizer.encode_plus(
        text,
        document,
        add_special_tokens=True,
        max_length=512,
        return_token_type_ids=True,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt',
    )
    input_ids = qa_encoding['input_ids']
    attention_mask = qa_encoding['attention_mask']
    token_type_ids = qa_encoding['token_type_ids']
    
    qa_outputs = qa_model(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
    start_scores, end_scores = qa_outputs.start_logits, qa_outputs.end_logits
    start_idx = torch.argmax(start_scores)
    end_idx = torch.argmax(end_scores) + 1
    
    answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[0][start_idx:end_idx]))
    
    return predicted_id, answer

# Example usage
raw_text = "Example raw text content to classify."
document = "Full document text from which to retrieve supporting passages."
predicted_id, supporting_passage = classify_and_retrieve(raw_text, classification_model, qa_model, tokenizer, label_to_unique_id, document)
print(f"Predicted uniqueId: {predicted_id}")
print(f"Supporting passage: {supporting_passage}")

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
 [1656/1656 06:43, Epoch 3/3]
Step	Training Loss
500	0.357000
1000	0.033300
1500	0.003600
config.json: 100%
 443/443 [00:00<00:00, 22.4kB/s]
model.safetensors: 100%
 1.34G/1.34G [00:11<00:00, 244MB/s]
Some weights of the model checkpoint at bert-large-uncased-whole-word-masking-finetuned-squad were not used when initializing BertForQuestionAnswering: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForQuestionAnswering from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForQuestionAnswering from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
An error occurred: Expected all tensors to be on the same device, but found at least two devices, cu