<a href="https://colab.research.google.com/github/imcalledaditi/Dynamic-Threat-Assessment-System/blob/main/Dynamic_Threat_Assessment_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install newsapi-python

Collecting newsapi-python
  Downloading newsapi_python-0.2.7-py2.py3-none-any.whl.metadata (1.2 kB)
Downloading newsapi_python-0.2.7-py2.py3-none-any.whl (7.9 kB)
Installing collected packages: newsapi-python
Successfully installed newsapi-python-0.2.7


In [2]:
# Import NewsApiClient
from newsapi import NewsApiClient

# Initialize NewsAPI client with your API key
newsapi = NewsApiClient(api_key="72e3f1667346402d83d2a5eef280e38a")  # Replace with your valid API key

# Function to fetch news articles
def fetch_news_data(query, language="en", page_size=10):
    """
    Fetches news articles based on the given query using NewsAPI.

    Args:
        query (str): Search query for articles.
        language (str): Language of the articles (default is 'en').
        page_size (int): Number of articles to fetch (default is 10).

    Returns:
        list: A list of dictionaries containing news article details.
    """
    try:
        # Fetch articles from NewsAPI
        articles = newsapi.get_everything(q=query, language=language, page_size=page_size)
        # Process and return the articles
        return [
            {
                "title": article["title"],
                "description": article["description"],
                "url": article["url"],
                "source": article["source"]["name"]
            }
            for article in articles["articles"]
        ]
    except Exception as e:
        print(f"Error fetching NewsAPI data: {e}")
        return []

# Example query for news articles
query = "cybersecurity OR threats"
news_data = fetch_news_data(query)

# Display the results
print(f"Collected {len(news_data)} news articles:")
for i, article in enumerate(news_data, 1):
    print(f"\nArticle {i}:")
    print(f"Title: {article['title']}")
    print(f"Description: {article['description']}")
    print(f"Source: {article['source']}")
    print(f"URL: {article['url']}")


Collected 6 news articles:

Article 1:
Title: US Privacy Snags a Win as Judge Limits Warrantless FBI Searches
Description: Plus: A hacker finds an issue with Cloudflare’s systems that could reveal app users’ rough locations, and the Trump administration puts a wrench in a key cybersecurity investigation.
Source: Wired
URL: https://www.wired.com/story/section-702-fbi-searches-unconstitutional/

Article 2:
Title: US Cyber Trust Mark launches as the Energy Star of smart home security
Description: The US launched a new Cyber Trust Mark label for smart home devices to assure consumers that internet-connected products meet cybersecurity standards.
Source: The Verge
URL: https://www.theverge.com/2025/1/7/24338168/us-cyber-trust-mark-smart-home-security

Article 3:
Title: The FCC’s Jessica Rosenworcel Isn’t Leaving Without a Fight
Description: As the US faces “the worst telecommunications hack in our nation’s history,” by China’s Salt Typhoon hackers, the outgoing FCC chair is determined to bo

**Data Preprocessing**

In [3]:
#Install NLP tools
!pip install spacy
!python -m spacy download en_core_web_sm


Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m97.8 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


Tokenization, cleaning, and sentiment analysis:

In [5]:
import spacy

nlp = spacy.load("en_core_web_sm")

# Preprocess text function
def preprocess_text(text):
    doc = nlp(text)
    # Tokenize, remove stopwords, and lemmatize
    clean_tokens = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct]
    return " ".join(clean_tokens)

# Example: Preprocessing news articles
# Assume `collected_articles` is a list of dictionaries with a key "content" containing article text
collected_articles = [
    {"content": "Breaking news! The stock market hit an all-time high today."},
    {"content": "Scientists discover a new species of spider in the Amazon rainforest."},
    {"content": "A massive storm causes power outages in several cities."},
]

# Preprocess all articles
processed_articles = [preprocess_text(article["content"]) for article in collected_articles]

# Example output
print(processed_articles)




['break news stock market hit time high today', 'scientist discover new specie spider Amazon rainforest', 'massive storm cause power outage city']


**Threat Detection Model**

 Fine-tuning BERT for Sentiment/Threat Analysis

In [11]:
#Install Hugging Face library:
!pip install transformers datasets requests



In [13]:
import requests

# Fetch news articles
api_key = "72e3f1667346402d83d2a5eef280e38a"
url = f"https://newsapi.org/v2/everything?q=threat&language=en&apiKey={api_key}"
response = requests.get(url)

if response.status_code == 200:
    articles = response.json()["articles"]
    processed_news = [article["title"] + " " + article["description"] for article in articles if article["description"]]
else:
    raise Exception("Failed to fetch news data")


In [15]:
#process the dataset
from datasets import Dataset

# Example labeling logic
labels = [1 if "threat" in text.lower() else 0 for text in processed_news]

# Create Dataset object
data = Dataset.from_dict({"text": processed_news, "label": labels})

In [16]:
#tokenize the dataset

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Tokenize the dataset
def tokenize(batch):
    return tokenizer(batch["text"], padding=True, truncation=True, max_length=512)

tokenized_data = data.map(tokenize, batched=True)


Map:   0%|          | 0/87 [00:00<?, ? examples/s]

In [17]:
#Split the Data for Training and Evaluation
# 80-20 train-test split
split_data = tokenized_data.train_test_split(test_size=0.2)
train_dataset = split_data["train"]
eval_dataset = split_data["test"]

**Fine-tune the BERT Model**

In [21]:
from transformers import AutoModelForSequenceClassification, Trainer, TrainingArguments
import numpy as np

# Load the pre-trained model
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)

# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",                # Directory for model checkpoints
    num_train_epochs=20,                    # Number of training epochs
    per_device_train_batch_size=8,         # Batch size per device
    save_strategy="epoch",                 # Save checkpoints at the end of every epoch
    evaluation_strategy="epoch",           # Evaluate at the end of every epoch
    load_best_model_at_end=True,           # Load the best model at the end of training
    save_total_limit=2,                    # Limit total checkpoints
    logging_dir="./logs",                  # Directory for logs
    logging_steps=100                      # Log every 100 steps
)

# Define evaluation metrics
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    accuracy = (preds == labels).mean()
    return {"accuracy": accuracy}

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

# Train the model
trainer.train()

# Save the fine-tuned model
model.save_pretrained("./threat_detection_model")
tokenizer.save_pretrained("./threat_detection_model")

print("Model fine-tuning complete and saved at './threat_detection_model'.")


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Accuracy
1,No log,0.663272,0.555556
2,No log,0.561723,0.666667
3,No log,0.594566,0.777778
4,No log,0.645625,0.666667
5,No log,1.007931,0.611111
6,No log,1.742534,0.611111
7,No log,1.39587,0.611111
8,No log,1.504337,0.666667
9,No log,1.666077,0.611111
10,No log,1.831582,0.611111


Model fine-tuning complete and saved at './threat_detection_model'.


prediction

In [22]:
from transformers import pipeline

# Load the saved model and tokenizer
classifier = pipeline("text-classification", model="./threat_detection_model", tokenizer="./threat_detection_model")

# Example predictions
test_texts = ["This is a threat to national security.", "Today's weather is sunny."]
predictions = classifier(test_texts)

print(predictions)


Device set to use cuda:0


[{'label': 'LABEL_0', 'score': 0.636592447757721}, {'label': 'LABEL_0', 'score': 0.7427060008049011}]
