This notebook is an experiment for performing TABSA on news data. Using Named Entity Recognition (NER) from Spacy to automatically locate entities to perform TABSA (Targeted Aspect Based Sentiment Analysis on)

Necessary Imports

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import news_signals
import matplotlib.pyplot as plt 

In this step, we generate the financial dataset using yfinance. We group the closing prices by the *window_size* parameter and apply +1, -1 classification if the price has fluctuated by 3% up or down respectively, otherwise put the classification at 0

In [None]:
ticker = "TSLA"            # Change ticker if needed
start_date = "2023-01-01"    # Start date for historical data
end_date = "2023-12-31"      # End date for historical data
window_size = 3            # 3-day rolling window

# Download daily stock data
data = yf.download(ticker, start=start_date, end=end_date)
data.index = pd.to_datetime(data.index)

def classify_window(window):
    """
        +1 if cumulative return > %change and > volatility  (upward trend)
        -1 if cumulative return < -%change and < -volatility (downward trend)
         0 otherwise (neutral)
    """
    first_open = float(window['Open'].iloc[0])
    last_close = float(window['Close'].iloc[-1])
    cumulative_return = (last_close - first_open) / first_open
    daily_returns = (window['Close'] - window['Open']) / window['Open']
    volatility = float(daily_returns.std())
    
    if cumulative_return > 0.03 and cumulative_return > volatility:
        return 1
    elif cumulative_return < -0.03 and cumulative_return < -volatility:
        return -1
    else:
        return 0


# Apply a rolling window to classify the trend for each period
trend_results = []
dates = []
for i in range(window_size - 1, len(data)):
    window = data.iloc[i - window_size + 1 : i + 1]
    trend = classify_window(window)
    trend_results.append(trend)
    dates.append(data.index[i])

# Create a DataFrame with the trend classifications (using the last day of each window as the index)
rolling_trend_df = pd.DataFrame({'Trend': trend_results}, index=dates)
print(rolling_trend_df)

Here we use spacy in the extract_entities function to extract all entities from our news text data.

In [None]:
import spacy
import pandas as pd
from tqdm import tqdm

# Load spaCy's small English model
nlp = spacy.load("en_core_web_sm")

def extract_entities(text):
    """
    Extracts unique named entities using spaCy.
    Returns a list of entities from the text.
    """
    doc = nlp(text)
    entities = list(set(ent.text for ent in doc.ents if ent.label_ in ["ORG", "PERSON", "GPE", "PRODUCT"]))  # Extract relevant entities
    return entities

# Load CSV
news_df = pd.read_csv("entity_news_processed_azure_reduced.csv") # adjust path to your news text data
news_df["Processed_Article"] = news_df["Processed_Article"].fillna("").astype(str)

# Enable tqdm progress bar
tqdm.pandas(desc="Extracting Named Entities")

# Apply NER to extract entities
news_df["Entities"] = news_df["Processed_Article"].progress_apply(extract_entities)

# Display first few rows to verify entity extraction
#print(news_df[["published_at", "Entities"]].head())




Here is some vanity code in order to parse NER entities and to sort them

In [None]:
import pandas as pd
import ast

def robust_parse_entities(entities_str):
    """
    Parse a string representation of a list (e.g., "['A', 'B']") into an actual list,
    fixing some common quoting issues, and return a sorted list (alphabetically, case-insensitive).
    If parsing fails, returns an empty list.
    """
    if pd.isna(entities_str) or entities_str.strip() == "":
        return []
    try:
        # Replace problematic double-double quotes with a single quote.
        # For example:  ""Elon Musk's"" -> 'Elon Musk's'
        s = entities_str.replace('""', "'")
        parsed = ast.literal_eval(s)
        if isinstance(parsed, list):
            # Filter to keep only strings and sort them alphabetically (case-insensitive)
            sorted_list = sorted([item for item in parsed if isinstance(item, str)], key=lambda x: x.lower())
            return sorted_list
        else:
            return parsed
    except Exception as e:
        print("Error parsing:", entities_str, e)
        return []

# Load your NER output CSV
df = pd.read_csv("ner_output.csv")

# Process the Entities column
df["Entities"] = df["Entities"].apply(robust_parse_entities)

# Remove the published_at column if present
#if "published_at" in df.columns:
#    df = df.drop(columns=["published_at"])

# Save the results to a new CSV file for manual pruning
df.to_csv("ner_output.csv", index=False)
print("Sorted entities CSV saved to 'ner_output_sorted.csv'.")




Here we finally perform TABSA on our data. We use the pyabsa library (*pip install pyabsa* if import fails). Feel free to change the parameters in *apc_model*.

In [None]:
import pandas as pd
from pyabsa import APCCheckpointManager
from tqdm import tqdm
import logging

# Suppress verbose logging
logging.getLogger("pyabsa").setLevel(logging.CRITICAL)
logging.getLogger("transformers").setLevel(logging.CRITICAL)

# Load the ABSA model (with print_result and verbose turned off)
apc_model = APCCheckpointManager.get_sentiment_classifier(
    checkpoint="English",
    dataset="None",
    print_result=False,
    verbose=False
)

def targeted_aspect_sentiment(text, entities):
    """
    Runs targeted ABSA on a given text using pruned entities as aspects.
    Returns a dictionary mapping each entity to its sentiment score.
    
    Note: This function averages the sentiment for each entity **within the given text**.
    That is, if an entity is mentioned multiple times in the article, it sums all
    scores and then divides by the number of mentions.
    """
    # Initialize scores and counts for the provided entities
    aspect_scores = {entity: 0.0 for entity in entities}
    counts = {entity: 0 for entity in entities}

    if not entities or not isinstance(text, str) or not text.strip():
        return aspect_scores

    try:
        results = apc_model.predict(text, print_result=False)

        if isinstance(results, dict):
            results = [results]
        if not isinstance(results, list):
            return aspect_scores

        for result in results:
            if isinstance(result, dict):
                aspect_texts = result.get("aspect", [])
                sentiments = result.get("sentiment", [])

                if not isinstance(aspect_texts, list):
                    aspect_texts = [aspect_texts]
                if not isinstance(sentiments, list):
                    sentiments = [sentiments]

                for aspect_text, sentiment in zip(aspect_texts, sentiments):
                    aspect_text = aspect_text.lower()
                    sentiment = sentiment.lower()
                    score = {"positive": 1, "negative": -1, "neutral": 0}.get(sentiment, 0)

                    for entity in entities:
                        if entity.lower() in aspect_text:
                            aspect_scores[entity] += score
                            counts[entity] += 1
            else:
                continue

        # Average sentiment scores for each entity mentioned multiple times
        for entity in entities:
            if counts[entity] > 0:
                aspect_scores[entity] /= counts[entity]

    except RuntimeError:
        return aspect_scores

    return aspect_scores

# Enable progress bar for DataFrame apply
tqdm.pandas(desc="Running TABSA on Pruned Entities")

# Load your pruned entities CSV (assumed to have columns: published_at, Pruned_Entities)
pruned_df = pd.read_csv("ner_output.csv")
# Convert the pruned entity string to a Python list if needed.
# (Assumes you saved the list as a string representation; you might need to adjust this)
pruned_df["Pruned_Entities"] = pruned_df["Entities"].apply(eval)

# Merge the pruned entities back into your main news_df
# Here, we assume that published_at is unique or that you can merge on it.
# Otherwise, you might want to merge on an article ID.
news_df = news_df.drop(columns=["Entities"])  # Remove old entities if needed
news_df = news_df.merge(pruned_df[["published_at", "Pruned_Entities"]], on="published_at", how="left")

# If no pruned entities found for an article, default to an empty list
news_df["Pruned_Entities"] = news_df["Pruned_Entities"].apply(lambda x: x if isinstance(x, list) else [])

# Run TABSA on each article using the pruned entity list
news_df["Entity_Sentiments_TABSA"] = news_df.progress_apply(
    lambda row: targeted_aspect_sentiment(row["Processed_Article"], row["Pruned_Entities"]), axis=1
)

print("Completed running TABSA")
#print(news_df[["published_at", "Pruned_Entities", "Entity_Sentiments_TABSA"]].head())



Finally,  after getting our results from TABSA, we adjust it to our financial data and concatenate it to the rolling window period

In [None]:
import pandas as pd

# Ensure `published_at` is a datetime column and remove timezone info
news_df["published_at"] = pd.to_datetime(news_df["published_at"]).dt.tz_localize(None)

# Reset index and sort financial data
rolling_trend_df_reset = rolling_trend_df.reset_index().rename(columns={'index': 'Date'})
rolling_trend_df_reset["Date"] = pd.to_datetime(rolling_trend_df_reset["Date"]).dt.tz_localize(None)  # Ensure it's timezone-naive
rolling_trend_df_reset = rolling_trend_df_reset.sort_values("Date")

FINANCE_START_DATE = rolling_trend_df_reset["Date"].min()
ATTACHED_ASPECT_FEATURES = []

prev_date = FINANCE_START_DATE

# Ensure unique aspects exist to pre-fill default values
all_unique_aspects = set()

# Collect all unique aspects that appear in any row
for entity_dict in news_df["Entity_Sentiments_TABSA"].dropna():
    if isinstance(entity_dict, dict):
        all_unique_aspects.update(entity_dict.keys())

# Convert to a sorted list for consistent column ordering
all_unique_aspects = sorted(list(all_unique_aspects))

# Process each time window
for current_date in rolling_trend_df_reset["Date"]:
    # Select news articles published between prev_date and current_date
    mask = (news_df["published_at"] >= prev_date) & (news_df["published_at"] < current_date)
    window_news = news_df[mask]

    if not window_news.empty:
        # Extract only numerical aspect sentiment columns
        aspect_sentiment_columns = window_news["Entity_Sentiments_TABSA"].dropna().apply(pd.Series)

        # Compute the mean sentiment scores for each entity
        avg_scores = aspect_sentiment_columns.mean().to_dict()

        # Ensure all aspects exist in the output (even if missing in this window)
        avg_scores = {aspect: avg_scores.get(aspect, 0.0) for aspect in all_unique_aspects}
    else:
        # If no news articles exist in the window, fill with zeros for all aspects
        avg_scores = {aspect: 0.0 for aspect in all_unique_aspects}

    ATTACHED_ASPECT_FEATURES.append(avg_scores)
    prev_date = current_date

# Convert the aspect sentiment scores into a DataFrame
aspect_features_df = pd.DataFrame(ATTACHED_ASPECT_FEATURES)

# Merge with financial data
final_financial_df = pd.concat([rolling_trend_df_reset.reset_index(drop=True), aspect_features_df], axis=1)

# Save to CSV
OUTPUT_FILENAME = "financial_data_with_tabsa_sentiments.csv"
final_financial_df.to_csv(OUTPUT_FILENAME, index=False)

print(f"Saved updated financial data with aspect sentiment features to {OUTPUT_FILENAME}")


