In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!pip install trafilatura
!pip install trafilatura dateparser
!pip install langdetect
import pandas as pd
import numpy as np

Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m38.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: langdetect
  Building wheel for langdetect (setup.py) ... [?25l[?25hdone
  Created wheel for langdetect: filename=langdetect-1.0.9-py3-none-any.whl size=993223 sha256=ed4f5355daad8583739fed10dcb4f67c3292c215d887e2fcc77ad310fbc1e3cf
  Stored in directory: /root/.cache/pip/wheels/0a/f2/b2/e5ca405801e05eb7c8ed5b3b4bcf1fcabcd6272c167640072e
Successfully built langdetect
Installing collected packages: langdetect
Successfully installed langdetect-1.0.9


In [27]:
import trafilatura
import json
import dateparser
from langdetect import detect  # langdetect for manual language filtering
from trafilatura.settings import DEFAULT_CONFIG
from copy import deepcopy
import re
import requests

# Function to load EasyList (ad-blocking) rules
def load_easylist_rules():
    """
    Loads EasyList rules from the provided URL and parses the rules into a list.
    """
    url = "https://easylist.to/easylist/easylist.txt"
    response = requests.get(url)

    if response.status_code == 200:
        return response.text.splitlines()  # Split the response into a list of lines
    else:
        print("Error: Could not retrieve EasyList.")
        return []

# Function to remove ad-related content based on EasyList rules
def remove_ad_content(content, easylist_rules):
    """
    Removes ad-related content from the extracted content using EasyList rules.
    """
    for rule in easylist_rules:
        if rule.startswith('||'):  # Domain-based ad blocking rule
            ad_domain = rule[2:]
            content = re.sub(r'https?://(?:[a-zA-Z0-9-]+\.)?' + re.escape(ad_domain), '', content)

    return content

def remove_unwanted_content(content, unwanted_keywords):
    """
    Removes the sentence containing the first occurrence of any unwanted keywords,
    removes everything after the ellipsis ("..."), and everything after the unwanted keyword.
    Also removes everything after "Related:".
    """

    # Remove unwanted backslashes
    content = content.replace("\\", "")

    # Create the regex pattern for unwanted keywords in the sentence
    # This matches the sentence that contains the unwanted keyword and everything after it
    pattern = r"([^.]*\b(?:{})\b[^.]*\.).*".format("|".join(re.escape(keyword) for keyword in unwanted_keywords))

    # Substitute the sentence containing the unwanted keyword and everything after it with an empty string
    content = re.sub(pattern, "", content, flags=re.IGNORECASE)

    # Remove everything after "Related:" or "related content:" (case-insensitive)
    content = re.sub(r"([^.]*\b(?:Related:|related content:)\b[^.]*\.).*|(\b(?:Related:|related content:)\b.*)", "", content, flags=re.IGNORECASE)



    return content.strip()


def extract_articles(url, unwanted_keywords):
    """
    Extracts article content and publication date from a given URL using Trafilatura.
    Cleans content by removing newline characters, ensures no redirects, no external URLs, and language is English.
    Removes content after any unwanted keywords.
    """
    try:
        # Modify the config settings directly before use
        my_config = deepcopy(DEFAULT_CONFIG)

        # Disable external URLs and no redirects
        my_config['DEFAULT']['EXTERNAL_URLS'] = 'off'  # Disable external URL extraction
        my_config['DEFAULT']['MAX_REDIRECTS'] = '0'    # Disable URL redirection

        # Set download timeout and sleep time
        my_config['DEFAULT']['DOWNLOAD_TIMEOUT'] = '120'  # Set timeout to 120 seconds
        my_config['DEFAULT']['SLEEP_TIME'] = '5'         # Set sleep time between requests to 5 seconds

        # Fetch the content from the original URL using the modified config
        downloaded_html = trafilatura.fetch_url(url, config=my_config)

        if not downloaded_html:
            raise ValueError("Failed to download content")

        # Extract content using Trafilatura with the custom config
        extracted = trafilatura.extract(downloaded_html, output_format="json", with_metadata=True, config=my_config)
        if not extracted:
            raise ValueError("Trafilatura extraction failed")

        data = json.loads(extracted)

        # Clean and format the content
        content = data.get("text", "").replace("\n", " ").strip()


        # Remove content after the unwanted keywords
        content = remove_unwanted_content(content, unwanted_keywords)

        # Remove ad-related content based on EasyList rules
        easylist_rules = load_easylist_rules()  # Load EasyList rules
        content = remove_ad_content(content, easylist_rules)


        if not content or len(content)<1000: #1000 characters roughly 200 words
            raise ValueError("Content too short")

        # Manually detect language (ensure it's English)
        try:
            detected_language = detect(content)  # Returns 'en' for English
            if detected_language != 'en':  # If not English, return empty content
                print(f"Warning: Non-English content detected. Skipping URL.")
                return {"content": "", "publish_date": None}
        except Exception as e:
            print(f"Error detecting language: {e}")
            return {"content": "", "publish_date": None}


        # Continue to extract publication date
        publish_date_str = data.get("date", None)
        publish_date = dateparser.parse(publish_date_str) if publish_date_str else None

        formatted_date = publish_date.strftime("%d-%m-%Y") if publish_date else None

        return {"content": content, "publish_date": formatted_date}

    except Exception as e:
        print(f"Error: {e}")
        return {"content": "", "publish_date": None}

unwanted_keywords = ["disclaimer", "terms and conditions", "for more information", "privacy policy", "cookies", "contact us", "cookie policy","FAQ", "unsubscribe", "user agreement","site policy", "sign up", "stay updated"]




In [8]:
# Transformers installation
! pip install transformers datasets
! pip install --upgrade transformers
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
from datasets import Dataset
from sklearn.model_selection import train_test_split


Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m24.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.12.0-py3-none-any.

In [9]:
!pip install pycountry

Collecting pycountry
  Downloading pycountry-24.6.1-py3-none-any.whl.metadata (12 kB)
Downloading pycountry-24.6.1-py3-none-any.whl (6.3 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/6.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/6.3 MB[0m [31m76.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m6.3/6.3 MB[0m [31m100.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m63.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycountry
Successfully installed pycountry-24.6.1


In [11]:
#Define and initialize regression model from base
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import (
    AutoModelForSequenceClassification,
    PreTrainedModel,
    AutoConfig
)
from transformers.modeling_outputs import SequenceClassifierOutput

class DebertaForRegression(PreTrainedModel):
    """
    Outputs a scalar tone score in [-1, 1] by taking the expected value of
    the 3‑class sentiment probabilities:  (-1, 0, +1) · softmax(logits).
    """
    def __init__(self, config):
        super().__init__(config)
        self.classifier = AutoModelForSequenceClassification.from_pretrained(
            config._name_or_path, config=config
        )
        self.register_buffer("value_map", torch.tensor([-1.0, 0.0, 1.0]))

        self.loss_fn = nn.MSELoss()

    def forward(self, input_ids, attention_mask=None, labels=None, num_items_in_batch: int | None = None,  **kwargs ):
        out = self.classifier(
            input_ids=input_ids,
            attention_mask=attention_mask,
            **kwargs
        )
        logits = out.logits                      # (B, 3)
        probs  = F.softmax(logits, dim=-1)       # (B, 3)

        # Expected value: (B, 3) · (3,)  → (B,)
        tone   = (probs * self.value_map).sum(dim=-1)

        loss = None
        if labels is not None:
            loss = self.loss_fn(tone, labels.float())

        return SequenceClassifierOutput(
            loss   = loss,          # scalar or None
            logits = tone,          # (B,) continuous in [-1, 1]
            hidden_states = out.hidden_states,
            attentions    = out.attentions,
        )


In [26]:
import torch
import pycountry
from transformers import AutoTokenizer, AutoConfig
from itertools import combinations
from collections import defaultdict
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Load fine-tuned model
model_name = "yangheng/deberta-v3-base-absa-v1.1"
checkpoint_path = "/content/drive/MyDrive/deberta-regression-checkpoints/checkpoint-1008"

tokenizer = AutoTokenizer.from_pretrained(model_name)
config = AutoConfig.from_pretrained(model_name)
model = DebertaForRegression.from_pretrained(checkpoint_path, config=config)
model = model.to("cuda" if torch.cuda.is_available() else "cpu")

# Country normalization setup
countries_to_keep = {
    'CHN', 'MYS', 'USA', 'HKG', 'IDN', 'KOR', 'JPN', 'THA', 'AUS', 'VNM',
    'IND', 'PHL', 'DEU', 'FRA', 'CHE', 'NLD', 'SGP'
}

# Build name-to-ISO3 mapping
name_to_iso3 = {}
for country in pycountry.countries:
    iso3 = country.alpha_3
    if iso3 not in countries_to_keep:
        continue
    for key in ["name", "official_name", "common_name"]:
        val = getattr(country, key, None)
        if val:
            name_to_iso3[val.lower()] = iso3

# Manual aliases for informal names
manual_aliases = {
    "u.s.": "USA",
    "u.s": "USA",
    "america": "USA",
    "s. korea": "KOR",
    "south korea": "KOR",
    "hong kong sar": "HKG",
    "hong kong special administrative region": "HKG",
}

name_to_iso3.update({k.lower(): v for k, v in manual_aliases.items()})

# Trading bloc aliases
trading_bloc_aliases = {
    "apec": ["CHN", "MYS", "USA", "HKG", "IDN", "KOR", "JPN", "THA", "AUS", "VNM", "PHL", "SGP"],
    "eu": ["DEU", "FRA", "NLD"],
    "brics": ["CHN", "IND"],
    "nafta": ["USA"],
    "usmca": ["USA"],
    "asean": ["MYS", "IDN", "SGP", "THA", "VNM", "PHL"],
    "saarc": ["IND"],
}

# UN M49 region mapping
un_region_mapping = {
    "eastern asia": ["CHN", "HKG", "JPN", "KOR"],
    "south-eastern asia": ["MYS", "IDN", "THA", "VNM", "PHL", "SGP"],
    "southern asia": ["IND"],
    "western europe": ["DEU", "FRA", "CHE", "NLD"],
    "oceania": ["AUS"],
    "northern america": ["USA"]
}

# Normalize country name to ISO3
def normalize_country(name):
    return name_to_iso3.get(name.strip().lower(), None)


# Function to extract relevant countries from the article text
def extract_countries_from_text(text):
    found = set()
    lowered = text.lower()

    # Direct country match
    for name in name_to_iso3:
        if name in lowered:
            iso = normalize_country(name)
            if iso:
                found.add(iso)

    # Match trading blocs
    for bloc, members in trading_bloc_aliases.items():
        if bloc in lowered:
            found.update(members)

    # Match regions
    for region, members in un_region_mapping.items():
        if region in lowered:
            found.update(members)

    return sorted(found)

# Generate all unique 2-country combinations
def generate_country_pairs(countries):
    return ['-'.join(sorted(pair)) for pair in combinations(countries, 2)]

# Predict sentiment for all detected country pairs
def predict_all_pairs(text, debug=False):
    """
    Predict sentiment scores for each country pair detected in the given text.
    This function processes external input (article) and detects country pairs.
    """
    detected = extract_countries_from_text(text)
    pairs = generate_country_pairs(detected)

    if debug:
        print(f"\n Detected countries: {detected}")
        print(f" Generated country pairs: {pairs}")

    results = {}

    # Loop through country pairs and run inference on each pair
    for pair in pairs:
        # Tokenize the text with overflow handling
        inputs = tokenizer(
            text,                # Article text
            pair,                # Country pair
            return_tensors="pt",
            truncation=True,
            padding="max_length",
            max_length=512,      # Adjust max length as needed
            stride=128,          # Overlap for chunks
            return_overflowing_tokens=True
        )

        # Remove overflow_to_sample_mapping from the inputs
        if "overflow_to_sample_mapping" in inputs:
            del inputs["overflow_to_sample_mapping"]

        # Initialize variable to accumulate sentiment scores for the chunks
        total_score = 0
        num_chunks = 0

        # Iterate through each chunk in overflowed tokens
        for i in range(len(inputs['input_ids'])):
            # Get the chunk for this index
            chunk = {key: value[i:i+1] for key, value in inputs.items()}

            chunk = {k: v.to(model.device) for k, v in chunk.items()}

            with torch.inference_mode():
                outputs = model(**chunk)
                score = outputs["logits"].item()
                # Accumulate the score for the chunk
                total_score += score
                num_chunks += 1

        # Compute the average score across all chunks for this country pair
        avg_score = total_score / num_chunks if num_chunks > 0 else 0  # Avoid division by zero

        results[pair] = round(avg_score, 4)  # Store the averaged score for the current pair

    if debug:
        print(f" Predicted sentiment scores: {results}")

    return results


# Function to extract content from URL and then perform prediction
def extract_and_predict_from_url(url):
    # Extract article content from URL using trafilatura
    content = extract_articles(url, unwanted_keywords)["content"]

    if not content:
        print("No content extracted from URL.")
        return {}

    # Get sentiment predictions for country pairs in the article
    return predict_all_pairs(content)

# Example usage
url="https://www.channelnewsasia.com/business/trump-spares-smartphones-computers-other-electronics-his-125-china-tariffs-5062011"
extract_and_predict_from_url(url)


Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at /content/drive/MyDrive/deberta-regression-checkpoints/checkpoint-1008 and are newly initialized: ['classifier.bias', 'classifier.weight', 'deberta.embeddings.LayerNorm.bias', 'deberta.embeddings.LayerNorm.weight', 'deberta.embeddings.word_embeddings.weight', 'deberta.encoder.LayerNorm.bias', 'deberta.encoder.LayerNorm.weight', 'deberta.encoder.layer.0.attention.output.LayerNorm.bias', 'deberta.encoder.layer.0.attention.output.LayerNorm.weight', 'deberta.encoder.layer.0.attention.output.dense.bias', 'deberta.encoder.layer.0.attention.output.dense.weight', 'deberta.encoder.layer.0.attention.self.key_proj.bias', 'deberta.encoder.layer.0.attention.self.key_proj.weight', 'deberta.encoder.layer.0.attention.self.query_proj.bias', 'deberta.encoder.layer.0.attention.self.query_proj.weight', 'deberta.encoder.layer.0.attention.self.value_proj.bias', 'deberta.encoder.layer.0.attention.self.value_pr

{'CHN-DEU': -0.0088,
 'CHN-FRA': -0.0091,
 'CHN-IND': -0.0091,
 'CHN-NLD': -0.0088,
 'CHN-SGP': -0.0088,
 'CHN-USA': -0.0091,
 'DEU-FRA': -0.0091,
 'DEU-IND': -0.0091,
 'DEU-NLD': -0.0088,
 'DEU-SGP': -0.0088,
 'DEU-USA': -0.009,
 'FRA-IND': -0.0096,
 'FRA-NLD': -0.0091,
 'FRA-SGP': -0.0091,
 'FRA-USA': -0.0096,
 'IND-NLD': -0.0091,
 'IND-SGP': -0.0091,
 'IND-USA': -0.0096,
 'NLD-SGP': -0.0091,
 'NLD-USA': -0.0096,
 'SGP-USA': -0.0096}