In [56]:
!pip install -q langchain
!pip install -q langchain-nvidia-ai-endpoints
!pip install -q langchain-community langchain-core
!pip install -q --upgrade langchain
!pip install -q pandas

# Test out your locally deployed NIM first

In [5]:
from langchain.schema import SystemMessage, HumanMessage
from langchain_nvidia_ai_endpoints import ChatNVIDIA

llm = ChatNVIDIA(
    model="nvidia/llama-3.3-nemotron-super-49b-v1",
    base_url="http://localhost:8000/v1",   # your local NIM /v1
)

messages = [
    # Change this to on if you want to use reasoning
    SystemMessage(content="detailed thinking off"),
    HumanMessage(content="Write a limerick about the wonders of GPU computing.")
]

response = llm(messages)
print(response)

  response = llm(messages)


content='Here is a limerick about the wonders of GPU computing:\n\nThere once was a GPU so bright,\nCompute tasks devoured with pure delight.\nIt processed with a flair,\nThrough threads beyond compare,\nAnd made even the hardest simulations take flight.' additional_kwargs={} response_metadata={'role': 'assistant', 'reasoning_content': None, 'content': 'Here is a limerick about the wonders of GPU computing:\n\nThere once was a GPU so bright,\nCompute tasks devoured with pure delight.\nIt processed with a flair,\nThrough threads beyond compare,\nAnd made even the hardest simulations take flight.', 'tool_calls': [], 'token_usage': {'prompt_tokens': 31, 'total_tokens': 80, 'completion_tokens': 49, 'prompt_tokens_details': None}, 'finish_reason': 'stop', 'model_name': 'nvidia/llama-3.3-nemotron-super-49b-v1'} id='run--7db3f500-d1b4-469a-b545-190d9b297bcb-0' usage_metadata={'input_tokens': 31, 'output_tokens': 49, 'total_tokens': 80} role='assistant'


# Classes and prompt definition for our financial domain

In [9]:
event_types_mapping = {"analyst rating": 0, "price targets": 1,
                       "earnings": 2, "labour related": 3, "mergers and acquisitions": 4, "dividends": 5,
                       "regulatory": 6, "stock price movement": 7, "credit ratings": 8, "products-services": 9,
                       "product approval": 10, "guidance": 11, "other": 12
                      }

In [36]:
prompt = """
You are a helpful AI assistant that analyses financial news headlines and identifies what event type is described.
You will classify event types into one of the following categories (in square brackets)

- [Analyst Rating]: An entity such as a bank, asset manager, etc. gives a classification/rating/downgrade/upgrade/opinion to an asset.
                    If there is no specified analyst and company given, it's not Analyst Rating and should be classified as OTHER.
- [Price Targets]: A mention of a price target (PT) is given by an entity such as bank, asset manager, etc.
                   This takes priority over any other class, so if a price target is present use this class!
- [Earnings]: Reports of quarterly, monthly, etc. concrete values of revenue, ESP, etc. Percentage fluctations.
              Expected values are not Earnings and should be Guidance instead!
- [Labour Issues]: Mentions of layoffs, union action, strikes, rising cost of labour, bonuses for execs, etc. Important personal change, e.g. CEO, CFO, VPs, etc.
- [Mergers and Acquisitions]: Whenever merging or acquisition of entities, not just companies, is mentioned.
                              Things like partnerships do not belong to this class! Takes priority over other classes.
- [Dividends]: Mentions of dividend performance, dividend per share, decisions not to issue dividends, etc.
- [Regulatory]: Mentions corporate position focused on environmental affairs, government regulation, international treaties, geopolitics, debt repayment, licenses, patents, etc.
                Any executive decisions taken by the government are Regulatory. Takes priority over other classes.
- [Stock price movement]: Includes pricing of public offerings, daily, monthly and yearly movements, highs and lows, options trade and alerts, etc.
                          Only relevant when a specific entity/industry or set of entities/industries mentioned.
                          Quantities such as sales, are not stock price movements. Stock splits do not count as stock movements.
                          Takes priority over Earnings, Stock Price Movement, Credit Ratings
- [Credit Ratings]: Adjustments of company's borrowing capacity, changes in debt values, changes in ratings, etc.
- [Products-Services]: Mentions of a company's particular product, forward-looking product directions, disruption, government and private contracts, etc.
                       Any sort of delay regarding a product roll-out, etc.
- [Product Approval]: Mentions FDA approvals, environmental approvals, acceptance for review. Any time an entity approves the rollout of a corporations product.
- [Guidance]: Forward looking statements issued by the company's themselves regarding Revenue, EPS, potential sales going up/down, number of contracts, etc.
              Similar talk to earnings but about projections rather than realised.
              
If the headline doesn't match any of the classes, classify it as OTHER.
ATTENTION:
  - Only assign a category if the headline meets all the criteria listed for that category. Otherwise use OTHER
  - Encourage precise matching rather than assigning categories based on partial or superficial similarities
  - OTHER is the default category when in doubt
  - If there are no specific companies mentioned, use OTHER

A few examples:

1. Cornerstone OnDemand Higher as Barclays Upgraded to Overweight -> [Analyst Rating]
2. HC Wainwright & Co. Maintains Buy on Balchem, Lowers Price Target to $104 -> [Price Targets]
3. Sina Reports Q4 Adj. EPS $0.24 vs $0.18 Est., Sales $211.1M vs $207.6M Est.; Sees FY15 Sales $800M-$900M vs $884.6M Est. -> [Earnings]
4. Delta To Buy Out Employees, Offer Early Retirement (DAL) -> [Labour Issues]
5. Constant Contact Announces Deal to Be Purchased by Endurance Int'l at $32/Share -> [Mergers and Acquisitions]
6. Ameriprise Financial Announces 12% Qtr. Dividend Increase to $0.58/Share -> [Dividends]
7. Ultratech Achieves ISO 9001 and 14001 Certification for Singapore Operations and Recertification for U.S. Facility  -> [Regulatory]
8. Mid-Afternoon Market Update: Dow Up Over 200 Points; Lakeland Industries Shares Spike Higher -> [Stock price movement]
9. Moody's Affirms Ratings on Nokia; Outlook Revised from Negative to Developing -> [Credit Ratings]
10. NOVAVAX Awarded HHS-BARDA Contract Valued at up to $179 Million  -> [Products-Services]
11. Sanofi's Genzyme Announces Lemtrada Resubmission Accepted for Review by FDA  -> [Product Approval]
12. Dot Hill Systems Announces Re-alignment of Software Development Initiatives; Sees Q4 EPS of $(0.02)-(0.03) vs $0.03-(0.03) Prior  -> [Guidance]
13. Kopin Chairman Fan Buys 116,400 Shares @$2.83/Share -Form 4 -> [OTHER]

Given the following headline:

### START HEADLINE ###

{headline}

### END HEADLINE ###

What event type best classifies it? Answer only with your predicted class and give it inside double square brackets, like [[class]]
"""

Let's look at some model predictions

In [37]:
import pickle
from collections import Counter


with open('manually_labelled_testset.pkl', 'rb') as f:
    test_labels = pickle.load(f)

Counter(list(test_labels.values()))

Counter({'Analyst Rating': 96,
         'OTHER': 93,
         'Mergers and Acquisitions': 88,
         'Labour Issues': 88,
         'Regulatory': 85,
         'Guidance': 83,
         'Stock price movement': 76,
         'Earnings': 75,
         'Product Approval': 72,
         'Dividends': 71,
         'Price Targets': 66,
         'Credit Ratings': 61,
         'Products-Services': 49,
         'Management changes': 22})

In [38]:
import nest_asyncio
nest_asyncio.apply()

In [39]:
import asyncio
import threading


completed_tasks = 0  # Variable to track completed tasks
texts = [text for text in test_labels]
total_tasks = len(texts)
lock = asyncio.Lock()  # To protect shared variables during concurrent access
predictions = {}


def run(text):
    return llm.invoke(prompt.format(headline=text)).content


def save_item(item_to_save, file_name):
    with open(file_name, 'wb') as f:
        pickle.dump(item_to_save, f)
        

async def run_with_timeout(text, semaphore):
    global completed_tasks
    attempts = 0
    while True:
        attempts += 1
        try:
            async with semaphore:
                # Run the synchronous `run` function in a thread with a timeout
                result = await asyncio.wait_for(asyncio.to_thread(run, text), timeout=30)
            predictions[text] = result
            async with lock:
                completed_tasks += 1
                if completed_tasks % 20 == 0:
                    print(f"Completed {completed_tasks}/{total_tasks} tasks.")
            break  # Exit loop on success
        except asyncio.TimeoutError:
            print(f"Task for '{text}' timed out. Retrying (attempt {attempts})...")
            continue
        except Exception as e:
            # Handle other exceptions if necessary
            print(f"Task for '{text}' failed with exception {e}. Retrying (attempt {attempts})...")
            continue

async def main():
    semaphore = asyncio.Semaphore(5)
    tasks = [asyncio.create_task(run_with_timeout(text, semaphore)) for text in texts]
    await asyncio.gather(*tasks)

asyncio.run(main())

Completed 20/1025 tasks.
Completed 40/1025 tasks.
Completed 60/1025 tasks.
Completed 80/1025 tasks.
Completed 100/1025 tasks.
Completed 120/1025 tasks.
Completed 140/1025 tasks.
Completed 160/1025 tasks.
Completed 180/1025 tasks.
Completed 200/1025 tasks.
Completed 220/1025 tasks.
Completed 240/1025 tasks.
Completed 260/1025 tasks.
Completed 280/1025 tasks.
Completed 300/1025 tasks.
Completed 320/1025 tasks.
Completed 340/1025 tasks.
Completed 360/1025 tasks.
Completed 380/1025 tasks.
Completed 400/1025 tasks.
Completed 420/1025 tasks.
Completed 440/1025 tasks.
Completed 460/1025 tasks.
Completed 480/1025 tasks.
Completed 500/1025 tasks.
Completed 520/1025 tasks.
Completed 540/1025 tasks.
Completed 560/1025 tasks.
Completed 580/1025 tasks.
Completed 600/1025 tasks.
Completed 620/1025 tasks.
Completed 640/1025 tasks.
Completed 660/1025 tasks.
Completed 680/1025 tasks.
Completed 700/1025 tasks.
Completed 720/1025 tasks.
Completed 740/1025 tasks.
Completed 760/1025 tasks.
Completed 780/10

In [40]:
predictions

{'Best Dividend Stocks In The Market(JNJ, MMM, MCD, PG, ABT)': '[[Dividends]]',
 'Mueller Water Increases Qtr. Dividend From $0.02 To $0.03/Share': '[[Dividends]]',
 'Oppenheimer Reiterates Outperform Rating On Chesapeake Energy': '[[Analyst Rating]]',
 'Varonis Sees FY19 Adj. EPS $0.04-$0.16 vs $0.32 Estimate, Sales $297M-$305M vs $318.5M Est.': '[[Guidance]]',
 'Allied Nevada Appoints Director, Corporate Environmental Affairs': '[[Regulatory]]',
 'InnerWorkings Postpones Annual Shareholder Meeting As Co. In In The Process OF Restating Its Financial Statements': '[[OTHER]]',
 'XL Insurance and Jorgensen & Company Announce Agreement': '[[Mergers and Acquisitions]]',
 'Trump Believes Russia Did Not Adhere To Open Skies Treaty': '[[Regulatory]]',
 'DaVita Q1 Adj. EPS $1.830 Beats $1.470 Estimate, Sales $2.841B Beat $2.800B Estimate': '[[Earnings]]',
 'Here comes the RBA announcement and statement … due at 0330GMT via ForexLive': '[[OTHER]]',
 'RTI Surgical Sees FY17 Adj. EPS $0.05-$0.07 

In [47]:
import re
from collections import Counter
import random


def clean_llm_prediction(pred):
    # try:
    matches = re.search(r'\[\[[^\]]+\]\]', pred)
    try:
        pred = matches[0]
        pred = pred.replace(']]', '')
    except:
        pred = "other"
    pred = pred.lower() 
    for k in event_types_mapping:
        if k.lower() in pred:
            pred = k.lower()
            return pred
    return pred


correct = 0
total = 0
y_pred = []
y_true = []
majority_counts = 0

relevant_classes = {
                   # "management changes",
                   # "labour issues",
                   # "regulatory",
                   # "stock price movement",
                   # "products-services",
                    "other"
                  }


for text in test_labels:
    pred = clean_llm_prediction(predictions[text])
    clean_label = test_labels[text].lower()
    if pred != clean_label:
        print(text)
        print(predictions[text])
        print(pred)
        print(test_labels[text].lower())
        print()
        print('-----')
    else:
        correct += 1
    total += 1

    y_true.append(event_types_mapping.get(test_labels[text].lower(), 13))
    y_pred.append(event_types_mapping.get(pred, 13))

# print(correct /total), majority_counts, total

Allied Nevada Appoints Director, Corporate Environmental Affairs
[[Regulatory]]
regulatory
management changes

-----
InnerWorkings Postpones Annual Shareholder Meeting As Co. In In The Process OF Restating Its Financial Statements
[[OTHER]]
other
regulatory

-----
Deutsche Bank May Have Hired BofA's Jeff Rose - WSJ
[[Labour Issues]]
[[labour issues
management changes

-----
Jefferies Says The 'Transition Continues' At Rackspace
[[Analyst Rating]]
analyst rating
other

-----
Mad Money Lightning Round: Cramer Likes Buffalo Wild Wings, Alcoa (BWLD, IDCC, IBKR, AA, PNRA, CMG, HCBK, FNFG)
[[Analyst Rating]]
analyst rating
other

-----
Lumos Networks Announces CFO Harold Covert Resigning for 'Family Reasons,' Johan Broekhuysen Named as Interim CFO
[[Labour Issues]]
[[labour issues
management changes

-----
Buffett: Corporate Taxes Not 'Strangling' US Competitiveness -CNBC
[[Regulatory]]
regulatory
other

-----
UPDATE: Mizuho On United Health Also Notes Co's 'Revenue Diversification at Optum 

In [52]:
import numpy as np

np.mean(np.array(y_true) == np.array(y_pred))

np.float64(0.8751219512195122)

In [53]:
from sklearn.metrics import f1_score

# Should be 0.914 for straight classification
f1 = f1_score(y_true, y_pred, average='macro')
f1

0.8796566600275575

In [54]:
f1 = f1_score(y_true, y_pred, average=None)

for idx, k in enumerate(event_types_mapping):
    print(k)
    print(f1[idx])

analyst rating
0.9082125603864735
price targets
0.9924812030075187
earnings
0.9166666666666666
labour related
0.8862275449101796
mergers and acquisitions
1.0
dividends
0.8108108108108109
regulatory
0.7375
stock price movement
0.9482758620689655
credit ratings
0.7962962962962963
products-services
0.9007633587786259
product approval
0.9440993788819876
guidance
0.6666666666666666
other
0.927536231884058


In [67]:
import pandas as pd

total_df = pd.read_csv("news_headlines_single_prediction_and_cot_legal_approved.csv")

total_df.head()

Unnamed: 0.2,Unnamed: 0.1,Unnamed: 0,headline,cot,cot_label,direct_label
0,0,0,Avago Technologies Announces $0.11 Dividend,The headline mentions a specific dividend amou...,dividends,dividends
1,1,1,Humana and Allscripts Form Alliance to Advan...,This headline mentions a partnership between t...,other,other
2,2,2,2011 Investor Day: UBS Updates its Strategy a...,The headline mentions UBS updating its strateg...,guidance,guidance
3,3,3,24/7 Wall St. CEOs Who Need to be Fired: Nine...,The headline mentions CEOs who need to be fire...,other,labour related
4,4,4,"3 ETFs Being Driven By Russia, Time Warner An...",The headline mentions specific entities (Russi...,other,other


In [68]:
texts = total_df['headline'].to_list()
predictions = {}

In [69]:
import nest_asyncio
nest_asyncio.apply()

In [None]:
import asyncio
import threading


completed_tasks = 0  # Variable to track completed tasks
sub_texts = [text for text in texts if text not in predictions]
total_tasks = len(sub_texts)
lock = asyncio.Lock()  # To protect shared variables during concurrent access
predictions_accumulator = []
print(f'Total tasks: {total_tasks}')

def run(text):
    return llm.invoke(prompt.format(headline=text)).content


def save_item(item_to_save, file_name):
    with open(file_name, 'wb') as f:
        pickle.dump(item_to_save, f)
        

async def run_with_timeout(text, semaphore):
    global completed_tasks
    attempts = 0
    while True:
        attempts += 1
        try:
            async with semaphore:
                # Run the synchronous `run` function in a thread with a timeout
                result = await asyncio.wait_for(asyncio.to_thread(run, text), timeout=20)
            predictions[text] = result
            async with lock:
                completed_tasks += 1
                if completed_tasks % 500 == 0:
                    print(f"Completed {completed_tasks}/{total_tasks} tasks.")
                    save_item(predictions, "nemotron_super_49b_predictions.pkl")
            break  # Exit loop on success
        except asyncio.TimeoutError:
            # Retry if the request times out
            print(f"Task for '{text}' timed out. Retrying (attempt {attempts})...")
            continue
        except Exception as e:
            # Handle other exceptions if necessary
            print(f"Task for '{text}' failed with exception {e}. Retrying (attempt {attempts})...")
            continue

async def main():
    semaphore = asyncio.Semaphore(10)  # Limit to 10 concurrent tasks
    # tasks = [asyncio.create_task(run_with_timeout(text, semaphore)) for text in texts]
    tasks = [asyncio.create_task(run_with_timeout(text, semaphore)) for text in sub_texts]
    await asyncio.gather(*tasks)

asyncio.run(main())

Total tasks: 153511
Completed 500/153511 tasks.
Completed 1000/153511 tasks.
Completed 1500/153511 tasks.
Completed 2000/153511 tasks.
Completed 2500/153511 tasks.
Completed 3000/153511 tasks.
Completed 3500/153511 tasks.
Completed 4000/153511 tasks.
Completed 4500/153511 tasks.
Completed 5000/153511 tasks.
Completed 5500/153511 tasks.
Completed 6000/153511 tasks.
Completed 6500/153511 tasks.
Completed 7000/153511 tasks.
Completed 7500/153511 tasks.
Completed 8000/153511 tasks.
Completed 8500/153511 tasks.
Completed 9000/153511 tasks.
Completed 9500/153511 tasks.
Completed 10000/153511 tasks.
Completed 10500/153511 tasks.
Completed 11000/153511 tasks.
Completed 11500/153511 tasks.
Completed 12000/153511 tasks.


In [None]:
save_item(predictions, "nemotron_super_49b_predictions.pkl")

In [None]:
headlines = []
labels = []
cots = []


def clean_llm_prediction(pred):
    # try:
    pattern = r"\[\[(.*?)\]\]"
    # Find matches
    matches = re.findall(pattern, pred)
    try:
        pred = matches[0]
        pred = pred.replace(']]', '')
    except:
        pred = "other"
    pred = pred.lower() 
    for k in event_types_mapping:
        if k.lower() in pred:
            pred = k.lower()
            return pred
    return pred


for idx, t in enumerate(predictions):
    if t in dedup_headlines:
        headlines.append(t)
        labels.append(clean_llm_prediction(predictions[t], idx, t))
        cots.append(predictions[t])

# Check all the labels available for sanity check
set(labels)

In [None]:
final_df = pd.DataFrame({'headline': headlines, 'label': labels, 'cot': cots})

In [None]:
final_df.head()

In [None]:
final_df.to_csv("nemotron_event_type_fuzzy_deduplicated_cot.csv")

In [None]:
# Merge two dataframes -> single_label, cot_label, cot, headline