In [55]:
import hopsworks
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import os

In [56]:
import hsfs

# 1. Login
project = hopsworks.login()

# 2. Get the Feature Store (This triggers the metadata check)
try:
    fs = project.get_feature_store("A1ID2223")
    print(f"Successfully connected to Feature Store: {fs.name}")
except Exception as e:
    print(f"Feature Store Connection Error: {e}")

# 3. Check versions
print(f"HSFS Version: {hsfs.__version__}")

2025-12-29 12:10:56,655 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-12-29 12:10:56,658 INFO: Initializing external client
2025-12-29 12:10:56,658 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-12-29 12:10:58,468 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1267871
Successfully connected to Feature Store: a1id2223_featurestore
HSFS Version: 4.2.10


%6|1767006663.613|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/1]: ssl://51.161.81.188:9093/1: Disconnected: SSL connection closed by peer (after 50118ms in state UP, 1 identical error(s) suppressed)


In [57]:
# Get feature groups
sentiment_fg = fs.get_feature_group("sentiments", version=2)
opening_price_fg = fs.get_feature_group("opening_prices", version=2)

In [58]:
# Get yahoo ticker for news and stock price
ticker = yf.Ticker("AAPL")
sentiments = ticker.news
price = ticker.history(period="1d")

In [59]:
cleaned_sentiments = []
# Process sentiment -> [(title, summary)]
for sentiment in sentiments:
    content = sentiment["content"]
    title = content["title"]
    summary = content["summary"]
    cleaned_sentiments.append((title, summary))

In [60]:
import torch
from transformers import pipeline

# Quick check for device
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

# Simple FinBERT sentiment pipeline
classifier = pipeline("text-classification", model="ProsusAI/finbert")

# Optional: alias for compatibility with other cells
sentiment_nlp = classifier

# Initialise the sentiment scores (if needed later)
sentiment_neg, sentiment_pos, sentiment_neu = 0, 0, 0

Using device: mps


Device set to use mps:0


In [61]:
# Load FinBERT sentiment pipeline
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from transformers import pipeline as hf_pipeline

model_name = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
sentiment_nlp = hf_pipeline(
    "sentiment-analysis",
    model=model,
    tokenizer=tokenizer,
    return_all_scores=True
)

# Try to move model to Apple Silicon MPS if available
if torch.backends.mps.is_available():
    try:
        model.to("mps")
        print("FinBERT model moved to MPS device")
    except Exception as e:
        print(f"Could not move model to mps: {e}")

Device set to use mps:0



FinBERT model moved to MPS device


In [62]:
import pandas as pd
from datetime import timezone

rows = []
# Score each article with FinBERT and collect per-article probabilities
for item in sentiments or []:
    content = item.get("content", {})
    title = content.get("title") or ""
    summary = content.get("summary") or ""
    text = f"{title}. {summary}".strip()
    print(title)
    if not text:
        continue

    # Derive publish date from providerPublishTime (unix seconds)
    ts = item.get("providerPublishTime")
    if ts is None:
        dt = pd.Timestamp.utcnow().normalize()
    else:
        # Convert to UTC, drop timezone, normalize to date
        dt = pd.to_datetime(ts, unit="s", utc=True).tz_convert(None).normalize()

    # Run FinBERT and get probabilities for all classes
    all_scores = sentiment_nlp(text)[0]  # [{label: 'positive'|'negative'|'neutral', score: float}, ...]
    score_map = {s["label"].lower(): s["score"] for s in all_scores}

    pos = score_map.get("positive", 0.0)
    neg = score_map.get("negative", 0.0)
    neu = score_map.get("neutral", 0.0)
    polarity = pos - neg

    rows.append({
        "date": dt,
        "sentiment_pos": pos,
        "sentiment_neg": neg,
        "sentiment_neu": neu,
        "sentiment_polarity": polarity,
    })

article_df = pd.DataFrame(rows)
print(f"Scored {len(article_df)} articles")

Investors know about the AI bubble. They're buying AI stock anyway.
Apple CEO Tim Cook Just Gave Nike Investors 3 Million Reasons to Cheer
Major shop closures across the UK, mapped
Financial resolutions for the New Year to help you make the most of your money
Should You Buy the Best-Performing "Magnificent Seven" Stock of 2025?
Dow Jones Futures Waver With Market At Highs; Tesla, Nvidia In Buy Areas
A guide to choosing the right Apple Watch
The Next Stock-Split Stock That Could Make You Rich
Globalstar (GSAT) Gains Strategic Value From Apple Direct-to-Device Partnership
Jim Cramer Repeats His “Own It, Don’t Trade It” Mantra on Apple
Scored 10 articles


In [64]:
# Aggregate to daily means to match backfill features
if not article_df.empty:
    article_df["date"] = pd.to_datetime(article_df["date"]).dt.normalize()
    sentiment_daily = (
        article_df.groupby("date").agg({
            "sentiment_polarity": "mean",
            "sentiment_neg": "mean",
            "sentiment_neu": "mean",
            "sentiment_pos": "mean",
        })
    )
    # Ensure timezone-naive index named 'date'
    sentiment_daily.index = sentiment_daily.index.tz_localize(None)
    sentiment_daily.index.name = "date"
    print(sentiment_daily.head())
else:
    sentiment_daily = pd.DataFrame(
        columns=["sentiment_polarity", "sentiment_neg", "sentiment_neu", "sentiment_pos"]
    )
    sentiment_daily.index.name = "date"
    print("No articles found for daily aggregation")

            sentiment_polarity  sentiment_neg  sentiment_neu  sentiment_pos
date                                                                       
2025-12-29            0.069065       0.125002       0.680932       0.194067


In [65]:
# Insert aggregated sentiments into Hopsworks Feature Store
if sentiment_daily is not None and not sentiment_daily.empty:
    df_to_insert = sentiment_daily.reset_index()
    df_to_insert.columns = df_to_insert.columns.str.lower()

    # Ensure feature group alignment with backfill
    fg = fs.get_or_create_feature_group(
        name="sentiments",
        description="AAPL stock sentiments",
        version=2,
        primary_key=["date"],
        event_time="date",
    )
    fg.insert(df_to_insert, wait=True)
    print("Inserted daily sentiments into feature store")
else:
    print("No sentiment data to insert today")

Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: sentiments_2_offline_fg_materialization


%6|1767006714.831|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/2]: ssl://51.161.81.208:9093/2: Disconnected: SSL connection closed by peer (after 50122ms in state UP, 1 identical error(s) suppressed)


KeyboardInterrupt: 

In [29]:
# Read all data from sentiments feature group
sentiments_fg = fs.get_feature_group("sentiments", version=2)
sentiments_df = sentiments_fg.read()

# Sort by date and show the most recent entries
latest = sentiments_df.sort_values('date', ascending=False).head(5)
print("Latest sentiment entries:")
print(latest)

# Verify today's date is present
today = pd.Timestamp.utcnow().normalize()
today_data = sentiments_df[sentiments_df['date'] == today]
print(f"\nToday's sentiment ({today.date()}):")
print(today_data)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.29s) 
Latest sentiment entries:
                          date  sentiment_polarity  sentiment_neg  \
1    2025-12-29 00:00:00+00:00            0.069065       0.125002   
0    2025-12-23 00:00:00+00:00            0.298535       0.131626   
1513 2024-11-27 00:00:00+00:00            0.000000       0.000000   
574  2024-11-26 00:00:00+00:00            0.000000       0.000000   
1555 2024-11-25 00:00:00+00:00            0.000000       0.000000   

      sentiment_neu  sentiment_pos  
1          0.680932       0.194067  
0          0.438214       0.430160  
1513       1.000000       0.000000  
574        1.000000       0.000000  
1555       1.000000       0.000000  

Today's sentiment (2025-12-29):
                       date  sentiment_polarity  sentiment_neg  sentiment_neu  \
1 2025-12-29 00:00:00+00:00            0.069065       0.125002       0.680932   

   sentiment_pos  
1       0.194067  


%6|1767005165.591|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/0]: ssl://51.161.80.189:9093/0: Disconnected: SSL connection closed by peer (after 50119ms in state UP, 1 identical error(s) suppressed)


In [None]:
# # Insert today's stock price into Hopsworks Feature Store
# if not price.empty:
#     # Prepare stock data to match backfill format
#     stock_today = price[['Open']].copy()
    
#     # Remove timezone and normalize to date only
#     stock_today.index = stock_today.index.tz_convert(None).normalize()
#     stock_today.index.name = 'date'
    
#     # Reset index to get date as column and lowercase column names
#     stock_insert = stock_today.reset_index()
#     stock_insert.columns = stock_insert.columns.str.lower()
    
#     print("Stock data to insert:")
#     print(stock_insert)
    
#     # Get or create the feature group (should already exist from backfill)
#     opening_fg = fs.get_or_create_feature_group(
#         name="opening_prices",
#         description="AAPL opening prices",
#         version=1,
#         primary_key=["date"],
#         event_time="date",
#     )
#     opening_fg.insert(stock_insert, wait=True)
#     print("Inserted today's opening price into feature store")
# else:
#     print("No stock price data available for today")

In [30]:
# Insert today's stock price into opening_prices v2 (with placeholder target)
if not price.empty:
    stock_today = price[["Open"]].copy()
    stock_today.index = stock_today.index.tz_convert(None).normalize()
    stock_today.index.name = 'date'

    stock_insert = stock_today.reset_index()
    stock_insert.columns = stock_insert.columns.str.lower()  # ['date','open']

    # Match FG schema: include target_open as unknown for today
    stock_insert['target_open'] = pd.NA

    opening_fg = fs.get_or_create_feature_group(
        name="opening_prices",
        description="AAPL opening prices with next-day target",
        version=2,
        primary_key=["date"],
        event_time="date",
    )
    opening_fg.insert(stock_insert, wait=True)
    print("Inserted today's opening price (with target_open NA) into feature store")
else:
    print("No stock price data available for today")

Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: opening_prices_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1267871/jobs/named/opening_prices_2_offline_fg_materialization/executions
2025-12-29 11:46:32,623 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-12-29 11:46:48,697 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED


%6|1767005216.807|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/1]: ssl://51.161.81.188:9093/1: Disconnected: SSL connection closed by peer (after 50159ms in state UP, 1 identical error(s) suppressed)
%6|1767005310.659|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/1]: ssl://51.161.81.188:9093/1: Disconnected: SSL connection closed by peer (after 92794ms in state UP, 1 identical error(s) suppressed)


2025-12-29 11:48:38,232 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-12-29 11:48:38,404 INFO: Waiting for log aggregation to finish.
2025-12-29 11:48:50,491 INFO: Execution finished successfully.
Inserted today's opening price (with target_open NA) into feature store


In [31]:
stock_today = price[["Open"]].copy()
print(stock_today.head(1))

                                 Open
Date                                 
2025-12-26 00:00:00-05:00  274.160004


%6|1767005361.334|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/0]: ssl://51.161.80.189:9093/0: Disconnected: SSL connection closed by peer (after 50121ms in state UP, 1 identical error(s) suppressed)
%6|1767005412.525|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/0]: ssl://51.161.80.189:9093/0: Disconnected: SSL connection closed by peer (after 50115ms in state UP, 1 identical error(s) suppressed)
%6|1767005463.795|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/1]: ssl://51.161.81.188:9093/1: Disconnected: SSL connection closed by peer (after 50173ms in state UP, 1 identical error(s) suppressed)
%6|1767005514.994|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/0]: ssl://51.161.80.189:9093/0: Disconnected: SSL connection closed by peer (after 50165ms in state UP, 1 identical error(s) suppressed)
%6|1767005610.663|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/0]: ssl://51.161.80.189:9093/0: Disconnected: SSL connection closed by peer (after

In [None]:
# Manual prediction since FS insert failed
import xgboost
import os

# Get model
mr = project.get_model_registry()
model = mr.get_model("sentiment_stock_price_model_AAPL", version=1)
model_dir = model.download()
model_path = os.path.join(model_dir, "model.json")

xgb_model = xgboost.XGBRegressor()
xgb_model.load_model(model_path)

# Prepare input
# We need: sentiment_polarity, sentiment_neg, sentiment_neu, sentiment_pos, opening_prices_open
# sentiment_daily has the sentiment columns.
# price has the open price.

# Get today's open price
today_open = price['Open'].iloc[-1]
print(f"Today's Open: {today_open}")

# Get today's sentiment
today_sent = sentiment_daily.iloc[-1]
print(f"Today's Sentiment: \n{today_sent}")

# Create feature vector
import pandas as pd
X_new = pd.DataFrame({
    'sentiment_polarity': [today_sent['sentiment_polarity']],
    'sentiment_neg': [today_sent['sentiment_neg']],
    'sentiment_neu': [today_sent['sentiment_neu']],
    'sentiment_pos': [today_sent['sentiment_pos']],
    'opening_prices_open': [today_open]
})

# Predict
pred = xgb_model.predict(X_new)
print(f"\nPrediction for next trading day (30th): {pred[0]}")

Downloading: 0.000%|          | 0/441617 elapsed<00:00 remaining<?

Today's Open: 274.1600036621094irs, 1 files)... DONE
Today's Sentiment: 
sentiment_polarity    0.069065
sentiment_neg         0.125002
sentiment_neu         0.680932
sentiment_pos         0.194067
Name: 2025-12-29 00:00:00, dtype: float64

Prediction for next trading day (30th): 191.22506713867188


%6|1767006064.208|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/2]: ssl://51.161.81.208:9093/2: Disconnected: SSL connection closed by peer (after 50124ms in state UP, 1 identical error(s) suppressed)
%6|1767006115.401|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/0]: ssl://51.161.80.189:9093/0: Disconnected: SSL connection closed by peer (after 50117ms in state UP, 1 identical error(s) suppressed)
