In [1]:
# pyspark packages
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType

#other needed packages
import re
import os

# Set JAVA_HOME for PySpark
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17'

spark = SparkSession.builder \
    .appName("stock market preds") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/14 03:33:12 WARN Utils: Your hostname, Jeffreys-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.17 instead (on interface en0)
26/01/14 03:33:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/14 03:33:12 WARN Utils: Your hostname, Jeffreys-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.17 instead (on interface en0)
26/01/14 03:33:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust loggi

In [2]:
# functions to import data, run SQL from file and save back to file
# function to import and clean columns
def import_csv_to_table(table_name, file, format_cols):

    #read source files
    df = spark.read.csv(file, header=True, quote="\"",
                        escape="\"", multiLine=True, inferSchema=True)

    #clean column names
    if format_cols:
        cols_formatted = [re.sub(r"[^a-zA-Z0-9\s]", "", col_name).lower().replace(" ", "_") for col_name in df.columns]
        df = df.toDF(*cols_formatted)

    # create SQL view
    df.createOrReplaceTempView(f"{table_name}")
    return df

#run a SQL step
def sql_step(file):
    with open(file, 'r', encoding='utf-8') as file:
        sql_text = file.read()
    results = spark.sql(sql_text)
    return results

#run SQL and view output inline
def run_sql(file, rowstoshow, print_sql):
    with open(file, 'r', encoding='utf-8') as file:
        sql_text = file.read()
    results = spark.sql(sql_text)
    if print_sql == True: print(sql_text)
    results.show(rowstoshow, truncate=False)

# export data frame to csv
def export_csv(df, output_dir, final_file_name):
    df.coalesce(1).write.csv(output_dir, header=True, mode="overwrite")
    for file in os.listdir(output_dir):
        if file.startswith("part-") and file.endswith(".csv"):
            part_file_path = os.path.join(output_dir, file)
            break
    if part_file_path:
        os.rename(part_file_path, os.path.join(output_dir, final_file_name))
        print(f"CSV saved as: {final_file_name}")
    else:
        print("Error: Part file not found.")

In [3]:
news = import_csv_to_table("news", "raw_data/news_data.csv", False)
stocks = import_csv_to_table("stocks", "raw_data/stock_data.csv", False)

In [19]:
feature_set = sql_step("sql/sentiment_data_prep_v2.sql")
feature_set.show(300, truncate=False)

+---------------+------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+
|news_article_id|symbol|news_date |article_text                                                                                                                                                                                                                                                                                                                                                                                                                          

In [16]:
news.show()

+--------+--------------------+--------------------+-----------------+-------------------+-------------------+--------------------+--------------------+--------+
|      id|            headline|             summary|           author|         created_at|         updated_at|                 url|             symbols|  source|
+--------+--------------------+--------------------+-----------------+-------------------+-------------------+--------------------+--------------------+--------+
|49701666|Evercore ISI Grou...|                NULL|Benzinga Newsdesk|2026-01-05 10:50:55|2026-01-05 10:50:56|https://www.benzi...|                   A|benzinga|
|49391324|Barclays Upgrades...|                NULL|Benzinga Newsdesk|2025-12-15 06:49:28|2025-12-15 06:49:29|https://www.benzi...|                   A|benzinga|
|49342760|What's Driving th...|                    |Benzinga Insights|2025-12-11 11:00:38|2025-12-11 11:00:39|https://www.benzi...|                   A|benzinga|
|49276887|Goldman Sachs Ini.

In [20]:
# Convert to pandas and create 80/20 train/test split (no article_id leakage)
import pandas as pd
df = feature_set.toPandas()
unique_ids = df['news_article_id'].unique()
train_ids = pd.Series(unique_ids).sample(frac=0.8, random_state=42).values
train_df, test_df = df[df['news_article_id'].isin(train_ids)], df[~df['news_article_id'].isin(train_ids)]
print(f"Train: {len(train_df)}, Test: {len(test_df)}")

Train: 28264, Test: 7166


In [21]:
train_df.head()

Unnamed: 0,news_article_id,symbol,news_date,article_text,percent_daily_price_change
0,48346820,LII,2025-10-22,"Headline: Earnings Scheduled For October 22, 2...",-0.013263
1,48346820,SF,2025-10-22,"Headline: Earnings Scheduled For October 22, 2...",-0.000508
2,48931230,VRT,2025-11-18,Headline: 10 Industrials Stocks With Whale Ale...,-0.097841
3,47070626,AAON,2025-08-12,Headline: Mercury Systems Posts Better-Than-Ex...,-0.04196
4,49699047,TSLA,2026-01-05,Headline: EXCLUSIVE: Top 20 Most-Searched Tick...,-0.010301


In [None]:
# Setup PyTorch with MPS
import torch

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

  from .autonotebook import tqdm as notebook_tqdm


Using device: mps


In [None]:
# load finBERT and tokenizer
from transformers import AutoTokenizer, AutoModel
from tqdm import tqdm

tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
finbert = AutoModel.from_pretrained("ProsusAI/finbert").to(device)


In [None]:
# Fine-tuned finBERT for classification (up/down/neutral)
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

# Convert price change to classes: 0=down (<-0.5%), 1=neutral, 2=up (>0.5%)
def price_to_class(pct_change, threshold=0.005):
    if pct_change < -threshold: return 0  # down
    elif pct_change > threshold: return 1  # up
    else: return 2  # neutral

train_df['price_class'] = train_df['percent_daily_price_change'].apply(price_to_class)
test_df['price_class'] = test_df['percent_daily_price_change'].apply(price_to_class)
print(f"Class distribution (train): {train_df['price_class'].value_counts().to_dict()}")

class FinBERTClassifier(nn.Module):
    def __init__(self, finbert_model, num_classes=3, hidden_dims=[256, 64], dropout=0.3):
        super().__init__()
        self.finbert = finbert_model
        for param in self.finbert.parameters():
            param.requires_grad = False
        for param in self.finbert.encoder.layer[-2:].parameters():
            param.requires_grad = True
        
        layers = []
        in_dim = 768
        for h_dim in hidden_dims:
            layers.extend([nn.Linear(in_dim, h_dim), nn.ReLU(), nn.Dropout(dropout)])
            in_dim = h_dim
        layers.append(nn.Linear(in_dim, num_classes))
        self.classifier = nn.Sequential(*layers)
    
    def forward(self, input_ids, attention_mask, token_type_ids=None):
        outputs = self.finbert(input_ids=input_ids, attention_mask=attention_mask)
        return self.classifier(outputs.last_hidden_state[:, 0, :])

class TextClassDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.encodings = tokenizer(texts.tolist(), padding=True, truncation=True, max_length=max_len, return_tensors="pt")
        self.labels = torch.tensor(labels.values, dtype=torch.long)
    def __len__(self): return len(self.labels)
    def __getitem__(self, idx):
        return {k: v[idx] for k, v in self.encodings.items()}, self.labels[idx]

# Create datasets and model
train_dataset = TextClassDataset(train_df['article_text'], train_df['price_class'], tokenizer)
loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Handle class imbalance with weighted loss
class_counts = train_df['price_class'].value_counts().sort_index()
class_weights = torch.tensor([1.0 / c for c in class_counts], dtype=torch.float32).to(device)
class_weights = class_weights / class_weights.sum() * 3

model = FinBERTClassifier(finbert).to(device)
optimizer = torch.optim.AdamW([
    {'params': model.finbert.encoder.layer[-2:].parameters(), 'lr': 2e-5},
    {'params': model.classifier.parameters(), 'lr': 1e-3}
], weight_decay=0.01)
criterion = nn.CrossEntropyLoss(weight=class_weights)

best_loss, patience, patience_counter = float('inf'), 10, 0
loss_history, acc_history = [], []

pbar = tqdm(range(30), desc="Fine-tuning")
for epoch in pbar:
    model.train()
    epoch_loss, correct, total = 0, 0, 0
    for batch, labels in loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(**batch)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        epoch_loss += loss.item()
        correct += (outputs.argmax(1) == labels).sum().item()
        total += labels.size(0)
    avg_loss, acc = epoch_loss / len(loader), correct / total
    loss_history.append(avg_loss)
    acc_history.append(acc)
    pbar.set_postfix({'loss': f'{avg_loss:.4f}', 'acc': f'{acc:.3f}'})
    if avg_loss < best_loss:
        best_loss, patience_counter = avg_loss, 0
        torch.save(model.state_dict(), 'best_classifier.pt')
    else:
        patience_counter += 1
        if patience_counter >= patience:
            break

model.load_state_dict(torch.load('best_classifier.pt'))
print(f"Training complete. Best loss: {best_loss:.4f}, Final acc: {acc:.3f}")

In [None]:
# Visualize training loss and print key metrics
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(loss_history, label='Training Loss')
plt.axhline(y=best_loss, color='r', linestyle='--', label=f'Best Loss: {best_loss:.6f}')
plt.xlabel('Epoch'), plt.ylabel('MSE Loss'), plt.title('Training Loss Over Time')
plt.legend(), plt.grid(True), plt.tight_layout()
plt.show()

print(f"{'='*50}\nTraining Metrics Summary\n{'='*50}")
print(f"Total epochs: {len(loss_history)}")
print(f"Initial loss: {loss_history[0]:.6f}")
print(f"Final loss: {loss_history[-1]:.6f}")
print(f"Best loss: {best_loss:.6f}")
print(f"Loss reduction: {((loss_history[0] - best_loss) / loss_history[0] * 100):.2f}%")