In [1]:
from datasets import load_dataset,load_from_disk
from google.cloud import storage, bigquery
from google import genai
import os
import json
import re
import time
from tqdm.auto import tqdm
from datetime import datetime, timezone
from tenacity import retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type
from google.api_core.exceptions import ResourceExhausted, ServiceUnavailable
import json
from itertools import islice
from concurrent.futures import ThreadPoolExecutor, as_completed
from transformers import AutoTokenizer
import warnings

In [2]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/mnt/disks/data/diss_bucket_key.json"
# Turn off all warnings
warnings.filterwarnings('ignore')

In [3]:
# Set your parameters
BUCKET_NAME = "diss_market_data"
PROJECT_ID  = "bamboo-mercury-462915-f0"
BQ_DATASET  = "edgar_sentiment"
BQ_TABLE    = "news_scores_overlap"
REGION      = "europe-west2"
MAX_ARTICLES = 8000  # Limit for testing
BATCH_SIZE = 2

# Clients
storage_client = storage.Client(project=PROJECT_ID)
bq_client      = bigquery.Client(project=PROJECT_ID)
genai_client   = genai.Client(vertexai=True, project=PROJECT_ID, location="us-central1")
bucket = storage_client.bucket(BUCKET_NAME)

In [4]:
dataset = load_dataset("danidanou/Bloomberg_Financial_News")

In [5]:
# shuffle once
ds = dataset['train'].shuffle(seed=42)

In [6]:
# Ensure dataset exists
dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{BQ_DATASET}")
dataset_ref.location = "europe-west2"    # London region
bq_client.create_dataset(dataset_ref, exists_ok=True)

# === Prepare BigQuery Table ===
table_id = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}"
schema = [
    bigquery.SchemaField("name", "STRING"),
    bigquery.SchemaField("article", "STRING"),
    bigquery.SchemaField("sentiment_label", "STRING"),
    bigquery.SchemaField("sentiment_score", "FLOAT"),
    bigquery.SchemaField("explanation", "STRING"),  # âœ… New field
]
bq_client.create_table(bigquery.Table(table_id, schema=schema), exists_ok=True)

Table(TableReference(DatasetReference('bamboo-mercury-462915-f0', 'edgar_sentiment'), 'news_scores_overlap'))

In [7]:
# === Get existing processed names ===
query = f"SELECT DISTINCT name FROM `{table_id}`"
processed_names = {
    row["name"] for row in bq_client.query(query).result()
}

In [8]:
# Load FinBERT tokenizer
finbert_tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")

def chunk_article_with_overlap(article, tokenizer, max_tokens=512, stride=384):
    """
    Chunk text using overlapping sliding window of FinBERT tokens.
    """
    tokens = tokenizer(article, return_offsets_mapping=True, truncation=False)
    input_ids = tokens["input_ids"]
    chunks = []

    for start in range(0, len(input_ids), stride):
        end = start + max_tokens
        chunk_ids = input_ids[start:end]
        chunk_text = tokenizer.decode(chunk_ids, skip_special_tokens=True)
        chunks.append(chunk_text)
        if end >= len(input_ids):
            break

    return chunks

In [9]:
def analyze_sentiment_single_chunk(chunk):
    prompt = f"""
You're a financial sentiment analyst. For the following news chunk, classify the sentiment as **Positive**, **Neutral**, or **Negative**, give a **score** between -1.0 and +1.0, and explain your reasoning in one or two sentences.

Respond exactly in this format:

Sentiment: <label>  
Score: <score>  
Explanation: <brief rationale>

Chunk:
{chunk}
"""
    resp = genai_client.models.generate_content(
        model="gemini-2.0-flash", contents=[prompt]
    )
    return resp.text.strip()

In [10]:
def parse_single_response_with_explanation(resp):
    label_match = re.search(r"Sentiment:\s*(\w+)", resp)
    score_match = re.search(r"Score:\s*([-+]?\d*\.?\d+)", resp)
    explanation_match = re.search(r"Explanation:\s*(.+)", resp, re.DOTALL)

    label = label_match.group(1).capitalize() if label_match else "Unknown"
    score = float(score_match.group(1)) if score_match else 0.0
    explanation = explanation_match.group(1).strip() if explanation_match else "None"

    return label, score, explanation

In [11]:
rows_to_insert = []
BATCH_INSERT_SIZE = 20

for i in tqdm(range(0, min(len(ds), MAX_ARTICLES)), desc="Processing overlapping chunks"):
    article = ds[i]["Article"]
    name_base = f"news_{i}"
    chunks = chunk_article_with_overlap(article, tokenizer=finbert_tokenizer)

    for j, chunk in enumerate(chunks):
        chunk_name = f"{name_base}_chunk{j}"
        if chunk_name in processed_names:
            continue

        MAX_RETRIES = 5
        DELAY_SEC = 15
        for attempt in range(1, MAX_RETRIES + 1):
            try:
                resp = analyze_sentiment_single_chunk(chunk)
                label, score, explanation = parse_single_response_with_explanation(resp)
                break
            except Exception as e:
                if attempt < MAX_RETRIES:
                    print(f"[WARN] Retry {attempt} on chunk {chunk_name} failed: {e}")
                    time.sleep(DELAY_SEC)
                else:
                    raise

        rows_to_insert.append({
            "name": chunk_name,
            "article": chunk,
            "sentiment_label": label,
            "sentiment_score": score,
            "explanation": explanation,
        })

        if len(rows_to_insert) >= BATCH_INSERT_SIZE:
            bq_client.insert_rows_json(table_id, rows_to_insert)
            processed_names.update(row["name"] for row in rows_to_insert)
            rows_to_insert = []

# Final flush
if rows_to_insert:
    bq_client.insert_rows_json(table_id, rows_to_insert)
    print(f"Inserted final {len(rows_to_insert)} rows.")


Processing overlapping chunks:   0%|          | 0/8000 [00:00<?, ?it/s]

[WARN] Retry 1 on chunk news_5041_chunk0 failed: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.', 'status': 'RESOURCE_EXHAUSTED'}}
[WARN] Retry 2 on chunk news_5041_chunk0 failed: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.', 'status': 'RESOURCE_EXHAUSTED'}}
[WARN] Retry 1 on chunk news_5094_chunk1 failed: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.', 'status': 'RESOURCE_EXHAUSTED'}}
[WARN] Retry 2 on chunk news_5094_chunk1 failed: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource exhausted