1. Code to Process and Store 60 Days Historical Merged Data to DynamoDB

In [16]:
import pandas as pd
import boto3
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from torch.nn.functional import softmax
from tqdm import tqdm
from decimal import Decimal

# --- AWS Configuration ---
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("realtimedata")

# --- S3 Configuration ---
s3 = boto3.client("s3")
bucket = "stock-market-data-uofc"
stock_key = "historical-data/stock_60d_5m.csv"
news_key = "news-data/news_history_1yr.csv"

# --- Load Data ---
stock_df = pd.read_csv(s3.get_object(Bucket=bucket, Key=stock_key)["Body"])
news_df = pd.read_csv(s3.get_object(Bucket=bucket, Key=news_key)["Body"])

# --- Preprocessing ---
stock_df["Date"] = pd.to_datetime(stock_df["Date"], utc=True)
stock_df["Date"] = stock_df["Date"].dt.tz_convert("America/New_York").dt.tz_localize(None)
stock_df["Date"] = stock_df["Date"].dt.floor("min")

news_df["publishedDate"] = pd.to_datetime(news_df["publishedDate"], errors="coerce")
news_df["publishedDate"] = news_df["publishedDate"].dt.floor("min")
news_df.rename(columns={"symbol": "Stock"}, inplace=True)

# --- Filter news in stock date range ---
min_date = stock_df["Date"].min().date()
max_date = stock_df["Date"].max().date()
news_df = news_df[
    (news_df["publishedDate"].dt.date >= min_date) &
    (news_df["publishedDate"].dt.date <= max_date)
]

# --- Assign latest news to each stock row ---
stock_df.sort_values(["Stock", "Date"], inplace=True)
news_df.sort_values(["Stock", "publishedDate"], inplace=True)

assigned_news = []
for symbol, group in stock_df.groupby("Stock"):
    stock_times = group["Date"].values
    news_times = news_df[news_df["Stock"] == symbol][["publishedDate", "text"]].values
    latest_news = None
    news_index = 0
    for stock_time in stock_times:
        while news_index < len(news_times) and news_times[news_index][0] <= stock_time:
            latest_news = news_times[news_index][1]
            news_index += 1
        assigned_news.append(latest_news)

stock_df["latest_news"] = assigned_news

# --- Sentiment Analysis (FinBERT) ---
tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")
model = AutoModelForSequenceClassification.from_pretrained("yiyanghkust/finbert-tone")

def get_sentiment(text):
    try:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
            probs = softmax(outputs.logits, dim=1)
            scores = probs[0].tolist()
            return -1 * scores[0] + 0 * scores[1] + 1 * scores[2]
    except:
        return 0.0

unique_news = stock_df["latest_news"].dropna().unique()
news_sentiment_map = {news: get_sentiment(news) for news in tqdm(unique_news)}

# --- Sentiment Fading ---
faded_scores = []
fade_factor = 0.9
prev_news = None
prev_score = 0.0
repeat_count = 0

for news in stock_df["latest_news"]:
    if news != prev_news:
        sentiment = news_sentiment_map.get(news, 0.0)
        repeat_count = 0
    else:
        repeat_count += 1
        sentiment = prev_score * (fade_factor ** repeat_count)
    faded_scores.append(round(sentiment, 4))
    prev_news = news
    prev_score = news_sentiment_map.get(news, 0.0)

stock_df["faded_sentiment_score"] = faded_scores

# --- Upload to DynamoDB ---
print("Uploading 60-day data to DynamoDB...")
for _, row in stock_df.iterrows():
    try:
        table.put_item(Item={
            "Stock": row["Stock"],
            "Timestamp": row["Date"].strftime("%Y-%m-%dT%H:%M:%S"),
            "Open": Decimal(str(row["Open"])),
            "High": Decimal(str(row["High"])),
            "Low": Decimal(str(row["Low"])),
            "Close": Decimal(str(row["Close"])),
            "Volume": int(row["Volume"]),
            "faded_sentiment_score": Decimal(str(row["faded_sentiment_score"]))
        })
    except Exception as e:
        print(f"Error inserting row: {e}")

print("✅ Historical 60-day data uploaded.")


# --- Save Final Dataset to S3 for Dashboard ---
print("Uploading merged CSV for dashboarding...")

csv_buffer = stock_df.to_csv(index=False)
dashboard_key = "dashboard-data/stock_60d_faded.csv"

s3.put_object(Bucket=bucket, Key=dashboard_key, Body=csv_buffer)

print(f"✅ CSV uploaded to S3 at s3://{bucket}/{dashboard_key}")


2025-04-11 16:26:28.157270: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
100%|██████████| 2096/2096 [06:36<00:00,  5.29it/s]


Uploading 60-day data to DynamoDB...
✅ Historical 60-day data uploaded.
Uploading merged CSV for dashboarding...
✅ CSV uploaded to S3 at s3://stock-market-data-uofc/dashboard-data/stock_60d_faded.csv


Updated Real-Time Ingestion Script (every 5 min)

In [22]:
# =====================================
# ✅ Real-Time Fading Sentiment Integration
# =====================================

# --- STEP 1: Create sentiment_tracker table ---
# In your DynamoDB console or via CLI, create:
# Table name: sentiment_tracker
# Partition key: Stock (String)

# Each entry will store:
# - Stock
# - LastNews (string)
# - LastScore (float)
# - RepeatCount (int)

# --- STEP 2: Modify Real-Time Ingestion Script ---
# Final Real-Time Ingestion + Sentiment Fading + Prediction Script

import yfinance as yf
import pandas as pd
import boto3
import requests
import torch
import joblib
from datetime import datetime, timedelta
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.nn.functional import softmax
from decimal import Decimal
import pytz
import os
from sklearn.preprocessing import StandardScaler

# AWS Setup
session = boto3.Session()
dynamodb = session.resource("dynamodb")
data_table = dynamodb.Table("realtimedata")
tracker_table = dynamodb.Table("news_tracker")
sentiment_table = dynamodb.Table("sentiment_tracker")
pred_table = dynamodb.Table("realtime_predictions")
s3 = boto3.client("s3")

# Constants
STOCKS = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA"]
API_KEY = "hDdU807PlusyraTBnNgkkm2gFuPtkZ9F"
S3_BUCKET = "stock-market-data-uofc"
MODEL_PREFIX = "models/sarimax"
LOCAL_MODEL_PATH = "/tmp"

# FinBERT Setup
tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")
model = AutoModelForSequenceClassification.from_pretrained("yiyanghkust/finbert-tone")

# Utility Functions
def is_market_open():
    est = pytz.timezone('US/Eastern')
    now = datetime.now(est)
    return now.weekday() < 5 and datetime.strptime("09:30", "%H:%M").time() <= now.time() <= datetime.strptime("16:00", "%H:%M").time()

def fetch_stock_data():
    if not is_market_open():
        return pd.DataFrame()
    all_data = []
    for stock in STOCKS:
        df = yf.Ticker(stock).history(period="1d", interval="5m")
        if df.empty: continue
        df.reset_index(inplace=True)
        df["Stock"] = stock
        df.rename(columns={"Datetime": "Date"}, inplace=True)
        all_data.append(df[["Stock", "Date", "Open", "High", "Low", "Close", "Volume"]])
    return pd.concat(all_data, ignore_index=True)

def fetch_news():
    all_news = []
    for stock in STOCKS:
        try:
            item = tracker_table.get_item(Key={"Stock": stock, "LastPublished": "latest"}).get("Item")
            last_date = pd.to_datetime(item["LastDate"])
            last_title = item["LastTitle"]
        except:
            last_date = datetime.utcnow() - timedelta(days=1)
            last_title = ""
        res = requests.get("https://financialmodelingprep.com/api/v3/stock_news", params={
            "tickers": stock,
            "from": last_date.strftime('%Y-%m-%d'),
            "to": datetime.utcnow().strftime('%Y-%m-%d'),
            "limit": 50,
            "apikey": API_KEY
        })
        if res.status_code == 200:
            articles = res.json()
            new_articles = [a for a in articles if pd.to_datetime(a["publishedDate"]) > last_date or a["title"] != last_title]
            if new_articles:
                latest = pd.to_datetime(new_articles[0]["publishedDate"])
                tracker_table.put_item(Item={
                    "Stock": stock,
                    "LastPublished": "latest",
                    "LastDate": latest.isoformat(),
                    "LastTitle": new_articles[0]["title"]
                })
                all_news.extend(new_articles)
    return pd.DataFrame(all_news)

def get_sentiment(text):
    try:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
            probs = softmax(outputs.logits, dim=1)
            return -1 * probs[0][0].item() + probs[0][2].item()
    except:
        return 0.0

def get_faded_score(stock, latest_news, score, fade_factor=0.9):
    try:
        entry = sentiment_table.get_item(Key={"Stock": stock}).get("Item")
        prev_news = entry.get("LastNews")
        prev_score = float(entry.get("LastScore"))
        repeat_count = int(entry.get("RepeatCount"))
        if latest_news == prev_news:
            repeat_count += 1
            faded_score = prev_score * (fade_factor ** repeat_count)
        else:
            repeat_count = 0
            faded_score = score
    except:
        faded_score = score
        repeat_count = 0
    sentiment_table.put_item(Item={
        "Stock": stock,
        "LastNews": latest_news,
        "LastScore": Decimal(str(round(score, 4))),
        "RepeatCount": repeat_count
    })
    return round(faded_score, 4)

def add_features(df):
    df["EMA_5"] = df["Close"].ewm(span=5).mean()
    df["EMA_12"] = df["Close"].ewm(span=12).mean()
    df["Volatility_30min"] = df["Close"].rolling(window=6).std()
    df["Price_Change_Pct"] = df["Close"].pct_change()
    df["Lag_Close"] = df["Close"].shift(1)
    df["Weekday"] = df["Date"].dt.weekday
    return df

def load_model_from_s3(stock):
    local_path = os.path.join(LOCAL_MODEL_PATH, f"{stock}.pkl")
    s3.download_file(S3_BUCKET, f"{MODEL_PREFIX}/{stock}.pkl", local_path)
    return joblib.load(local_path)

def make_prediction_row(stock, latest_row, model_obj):
    features = model_obj["features"]
    scaler = model_obj["scaler"]
    model = model_obj["model"]

    X = scaler.transform(latest_row[features])
    pred_15 = model.forecast(steps=3, exog=X.repeat(3, axis=0)).mean()
    pred_30 = model.forecast(steps=6, exog=X.repeat(6, axis=0)).mean()
    pred_60 = model.forecast(steps=12, exog=X.repeat(12, axis=0)).mean()

    return {
        "Stock": stock,
        "Timestamp": latest_row["Date"].iloc[0].isoformat(),
        "Current_Close": round(float(latest_row["Close"]), 4),
        "Pred_15min": round(float(pred_15), 4),
        "Pred_30min": round(float(pred_30), 4),
        "Pred_60min": round(float(pred_60), 4)
    }

def process_all(stock_df, news_df):
    news_df = news_df.sort_values("publishedDate")
    sentiment_scores = {}
    for stock in STOCKS:
        latest_news = news_df[news_df["symbol"] == stock].sort_values("publishedDate").tail(1)
        if latest_news.empty:
            sentiment_scores[stock] = Decimal("0.0")
            continue
        text = latest_news.iloc[0]["text"]
        raw_score = get_sentiment(text)
        faded = get_faded_score(stock, text, raw_score)
        sentiment_scores[stock] = Decimal(str(faded))

    prediction_rows = []
    for stock in STOCKS:
        sdf = stock_df[stock_df["Stock"] == stock].copy()
        if sdf.empty: continue
        score = sentiment_scores.get(stock, Decimal("0.0"))
        sdf["faded_sentiment_score"] = float(score)
        sdf = add_features(sdf)
        sdf.dropna(inplace=True)
        if sdf.empty: continue

        # Use last row for Dynamo + Prediction
        latest = sdf.tail(1).copy()
        item = {
            "Stock": stock,
            "Timestamp": latest["Date"].iloc[0].isoformat(),
            "Open": Decimal(str(latest["Open"].iloc[0])),
            "High": Decimal(str(latest["High"].iloc[0])),
            "Low": Decimal(str(latest["Low"].iloc[0])),
            "Close": Decimal(str(latest["Close"].iloc[0])),
            "Volume": int(latest["Volume"].iloc[0]),
            "faded_sentiment_score": Decimal(str(latest["faded_sentiment_score"].iloc[0]))
        }
        data_table.put_item(Item=item)

        try:
            model_obj = load_model_from_s3(stock)
            pred_row = make_prediction_row(stock, latest, model_obj)
            pred_table.put_item(Item={k: (Decimal(str(v)) if isinstance(v, float) else v) for k, v in pred_row.items()})
        except:
            continue

# Main
if __name__ == "__main__":
    print("🚀 Ingestion + Prediction job started")
    stock_data = fetch_stock_data()
    if not stock_data.empty:
        news_data = fetch_news()
        process_all(stock_data, news_data)
        print("✅ Data ingested & predictions made")
    else:
        print("⚠️ Market closed or no stock data")





🚀 Ingestion + Prediction job started
✅ Data ingested & predictions made


final model training script

In [4]:
import pandas as pd
import numpy as np
import boto3
import joblib
import os
from sklearn.preprocessing import StandardScaler
from statsmodels.tsa.statespace.sarimax import SARIMAX
from pmdarima import auto_arima
from decimal import Decimal
import warnings

warnings.filterwarnings("ignore")

# --- Config ---
LOCAL_MODEL_DIR = "/tmp/trained_models"
S3_BUCKET = "stock-market-data-uofc"
S3_PREFIX = "models/sarimax"
DYNAMO_TABLE = "realtimedata"

# Create local model directory
os.makedirs(LOCAL_MODEL_DIR, exist_ok=True)

# --- Load Data from DynamoDB ---
def load_data_from_dynamodb():
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table(DYNAMO_TABLE)

    response = table.scan()
    items = response["Items"]

    # Keep paginating if needed
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        items.extend(response["Items"])

    df = pd.DataFrame(items)

    # Convert to correct types
    df["Date"] = pd.to_datetime(df["Timestamp"])
    for col in ["Open", "High", "Low", "Close", "faded_sentiment_score"]:
        df[col] = df[col].astype(float)
    df["Volume"] = df["Volume"].astype(int)
    return df

# --- Feature Engineering ---
def add_intraday_features(df):
    df = df.copy()
    df["EMA_5"] = df["Close"].ewm(span=5, adjust=False).mean()
    df["EMA_12"] = df["Close"].ewm(span=12, adjust=False).mean()
    df["Volatility_30min"] = df["Close"].rolling(window=6).std()
    df["Price_Change_Pct"] = df["Close"].pct_change()
    df["Lag_Close"] = df["Close"].shift(1)
    df["Weekday"] = df["Date"].dt.weekday
    return df

# --- Train & Upload Model ---
def train_and_save_model(df, stock, s3_client):
    stock_df = df[df["Stock"] == stock].copy()
    stock_df = add_intraday_features(stock_df)
    stock_df.set_index("Date", inplace=True)
    stock_df.sort_index(inplace=True)

    feature_cols = [
        "Volume",
        "faded_sentiment_score",
        "EMA_5",
        "EMA_12",
        "Volatility_30min",
        "Price_Change_Pct",
        "Lag_Close",
        "Weekday"
    ]

    ts_df = stock_df[["Close"] + feature_cols].dropna()
    if len(ts_df) < 100:
        return

    endog = ts_df["Close"]
    exog = ts_df[feature_cols]

    scaler = StandardScaler()
    exog_scaled = pd.DataFrame(scaler.fit_transform(exog), columns=feature_cols, index=exog.index)

    try:
        arima_model = auto_arima(
            endog, exogenous=exog_scaled,
            seasonal=False, stepwise=True,
            suppress_warnings=True, max_order=6, maxiter=30
        )
        order = arima_model.order
    except:
        order = (2, 1, 2)

    try:
        model = SARIMAX(endog, exog=exog_scaled, order=order, enforce_stationarity=False)
        result = model.fit(disp=False, maxiter=50)

        local_path = os.path.join(LOCAL_MODEL_DIR, f"{stock}.pkl")
        joblib.dump({
            "model": result,
            "scaler": scaler,
            "features": feature_cols,
            "order": order
        }, local_path)

        s3_key = f"{S3_PREFIX}/{stock}.pkl"
        s3_client.upload_file(local_path, S3_BUCKET, s3_key)
    except:
        pass

# --- Main ---
if __name__ == "__main__":
    print("🚀 Loading data from DynamoDB...")
    df = load_data_from_dynamodb()
    s3 = boto3.client("s3")
    stocks = df["Stock"].unique()
    for stock in stocks:
        train_and_save_model(df, stock, s3)
    print("✅ Model training & upload complete.")


🚀 Loading data from DynamoDB...


ValueError: unconverted data remains when parsing with format "%Y-%m-%dT%H:%M:%S": "-04:00", at position 3307. You might want to try:
    - passing `format` if your strings have a consistent format;
    - passing `format='ISO8601'` if your strings are all ISO8601 but not necessarily in exactly the same format;
    - passing `format='mixed'`, and the format will be inferred for each element individually. You might want to use `dayfirst` alongside this.

In [2]:
pip install pmdarima

Collecting pmdarima
  Downloading pmdarima-2.0.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl.metadata (7.8 kB)
Collecting Cython!=0.29.18,!=0.29.31,>=0.29 (from pmdarima)
  Downloading Cython-3.0.12-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.3 kB)
Downloading pmdarima-2.0.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m52.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Cython-3.0.12-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m141.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Cython, pmdarima
Successfully installed Cython-3.0.12 pmdarima-2.0.4
Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install yfinance

Collecting yfinance
  Using cached yfinance-0.2.55-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Using cached multitasking-0.0.11-py3-none-any.whl.metadata (5.5 kB)
Collecting peewee>=3.16.2 (from yfinance)
  Using cached peewee-3.17.9-cp311-cp311-linux_x86_64.whl
Using cached yfinance-0.2.55-py2.py3-none-any.whl (109 kB)
Using cached multitasking-0.0.11-py3-none-any.whl (8.5 kB)
Installing collected packages: peewee, multitasking, yfinance
Successfully installed multitasking-0.0.11 peewee-3.17.9 yfinance-0.2.55
Note: you may need to restart the kernel to use updated packages.


In [1]:
import pandas as pd
import numpy as np
import boto3
import joblib
import os
from sklearn.preprocessing import StandardScaler
from statsmodels.tsa.statespace.sarimax import SARIMAX
from pmdarima import auto_arima
from decimal import Decimal
import warnings

warnings.filterwarnings("ignore")

# --- Config ---
LOCAL_MODEL_DIR = "/tmp/trained_models"
S3_BUCKET = "stock-market-data-uofc"
S3_PREFIX = "models/sarimax"
DYNAMO_TABLE = "realtimedata"

# Create local model directory
os.makedirs(LOCAL_MODEL_DIR, exist_ok=True)

# --- Load Data from DynamoDB ---
def load_data_from_dynamodb():
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table(DYNAMO_TABLE)

    response = table.scan()
    items = response["Items"]

    # Keep paginating if needed
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        items.extend(response["Items"])

    df = pd.DataFrame(items)

    # ✅ Fixed datetime parsing with timezone and error handling
    df["Date"] = pd.to_datetime(df["Timestamp"], errors="coerce", utc=True)
    df = df[df["Date"].notna()]  # Drop rows with invalid datetime

    # Convert to correct types
    for col in ["Open", "High", "Low", "Close", "faded_sentiment_score"]:
        df[col] = df[col].astype(float)
    df["Volume"] = df["Volume"].astype(int)
    return df

# --- Feature Engineering ---
def add_intraday_features(df):
    df = df.copy()
    df["EMA_5"] = df["Close"].ewm(span=5, adjust=False).mean()
    df["EMA_12"] = df["Close"].ewm(span=12, adjust=False).mean()
    df["Volatility_30min"] = df["Close"].rolling(window=6).std()
    df["Price_Change_Pct"] = df["Close"].pct_change()
    df["Lag_Close"] = df["Close"].shift(1)
    df["Weekday"] = df["Date"].dt.weekday
    return df

# --- Train & Upload Model ---
def train_and_save_model(df, stock, s3_client):
    stock_df = df[df["Stock"] == stock].copy()
    stock_df = add_intraday_features(stock_df)
    stock_df.set_index("Date", inplace=True)
    stock_df.sort_index(inplace=True)

    feature_cols = [
        "Volume",
        "faded_sentiment_score",
        "EMA_5",
        "EMA_12",
        "Volatility_30min",
        "Price_Change_Pct",
        "Lag_Close",
        "Weekday"
    ]

    ts_df = stock_df[["Close"] + feature_cols].dropna()
    if len(ts_df) < 100:
        return

    endog = ts_df["Close"]
    exog = ts_df[feature_cols]

    scaler = StandardScaler()
    exog_scaled = pd.DataFrame(scaler.fit_transform(exog), columns=feature_cols, index=exog.index)

    try:
        arima_model = auto_arima(
            endog, exogenous=exog_scaled,
            seasonal=False, stepwise=True,
            suppress_warnings=True, max_order=6, maxiter=30
        )
        order = arima_model.order
    except:
        order = (2, 1, 2)

    try:
        model = SARIMAX(endog, exog=exog_scaled, order=order, enforce_stationarity=False)
        result = model.fit(disp=False, maxiter=50)

        local_path = os.path.join(LOCAL_MODEL_DIR, f"{stock}.pkl")
        joblib.dump({
            "model": result,
            "scaler": scaler,
            "features": feature_cols,
            "order": order
        }, local_path)

        s3_key = f"{S3_PREFIX}/{stock}.pkl"
        s3_client.upload_file(local_path, S3_BUCKET, s3_key)
    except:
        pass

# --- Main ---
if __name__ == "__main__":
    print("🚀 Loading data from DynamoDB...")
    df = load_data_from_dynamodb()
    s3 = boto3.client("s3")
    stocks = df["Stock"].unique()
    for stock in stocks:
        train_and_save_model(df, stock, s3)
    print("✅ Model training & upload complete.")


🚀 Loading data from DynamoDB...
✅ Model training & upload complete.


In [23]:
import boto3
import csv
import os

# AWS Config
dynamodb = boto3.resource("dynamodb")
s3 = boto3.client("s3")

# S3 Bucket Info
bucket_name = "stock-market-data-uofc"
s3_prefix = "dashboard-data/"  # Optional subfolder

# List of tables to export
tables_to_export = [
    "realtimedata",
    "realtime_predictions",
    "news_tracker",
    "sentiment_tracker"
]

# Function to export table
def export_table_to_csv(table_name):
    print(f"📦 Exporting {table_name}...")

    table = dynamodb.Table(table_name)
    response = table.scan()
    data = response['Items']

    if not data:
        print(f"⚠️ No data in {table_name}")
        return

    keys = list(data[0].keys())
    file_name = f"/tmp/{table_name}today.csv"

    # Write CSV locally
    with open(file_name, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=keys)
        writer.writeheader()
        writer.writerows(data)

    # Upload to S3
    s3_key = f"{s3_prefix}{table_name}.csv"
    s3.upload_file(file_name, bucket_name, s3_key)
    print(f"✅ Uploaded {s3_key} to S3.")

# Loop through all tables
for table in tables_to_export:
    export_table_to_csv(table)


📦 Exporting realtimedata...
✅ Uploaded dashboard-data/realtimedata.csv to S3.
📦 Exporting realtime_predictions...
✅ Uploaded dashboard-data/realtime_predictions.csv to S3.
📦 Exporting news_tracker...
✅ Uploaded dashboard-data/news_tracker.csv to S3.
📦 Exporting sentiment_tracker...
✅ Uploaded dashboard-data/sentiment_tracker.csv to S3.


In [5]:
import yfinance as yf
import pandas as pd
import boto3
from datetime import datetime

# AWS S3 Configuration
S3_BUCKET = "stock-market-data-uofc"
S3_KEY = "historical-data/stock_history_1y.csv"
#S3_BUCKET = "data608-2025-stock-market-data"
#S3_KEY = "historical-stock-data/stock_1y_1d.csv"
s3_client = boto3.client("s3")

# List of stocks to fetch
stocks = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA"]

# Fetch stock data from a given start date
def fetch_stock_data(start_date):
    all_stock_data = []
    for stock in stocks:
        print(f"Fetching {stock} from {start_date}...")
        ticker = yf.Ticker(stock)
        history = ticker.history(start=start_date, interval="1d")
        history.reset_index(inplace=True)
        history["Stock"] = stock
        history = history[["Stock", "Date", "Open", "High", "Low", "Close", "Volume"]]
        all_stock_data.append(history)
    return pd.concat(all_stock_data, ignore_index=True)

# Load data from S3
def load_existing_data():
    try:
        response = s3_client.get_object(Bucket=S3_BUCKET, Key=S3_KEY)
        existing_data = pd.read_csv(response['Body'], parse_dates=["Date"])
        print(f"Loaded existing data: {len(existing_data)} rows")
        return existing_data
    except s3_client.exceptions.NoSuchKey:
        print("No existing data found. Fetching fresh...")
        return None

# Save DataFrame to S3
def upload_to_s3(df):
    csv_buffer = df.to_csv(index=False)
    s3_client.put_object(Bucket=S3_BUCKET, Key=S3_KEY, Body=csv_buffer)
    print(f"Uploaded to S3: {S3_KEY}")

# Main logic
existing_df = load_existing_data()

if existing_df is not None:
    last_date = existing_df["Date"].max().date()
    next_date = last_date + pd.Timedelta(days=1)
    new_data = fetch_stock_data(start_date=next_date)
    combined_df = pd.concat([existing_df, new_data], ignore_index=True)
    combined_df.drop_duplicates(subset=["Stock", "Date"], inplace=True)
else:
    start_date = (datetime.now() - pd.Timedelta(days=365)).date()
    combined_df = fetch_stock_data(start_date=start_date)

upload_to_s3(combined_df)


No existing data found. Fetching fresh...
Fetching AAPL from 2024-04-11...
Fetching MSFT from 2024-04-11...
Fetching AMZN from 2024-04-11...
Fetching GOOGL from 2024-04-11...
Fetching META from 2024-04-11...
Fetching NVDA from 2024-04-11...
Fetching TSLA from 2024-04-11...
Uploaded to S3: historical-data/stock_history_1y.csv


In [9]:
import yfinance as yf
import pandas as pd
import boto3
from datetime import datetime, timedelta
from datetime import timezone
import pytz
start_time = datetime.now(pytz.timezone("America/New_York")).replace(tzinfo=None) - timedelta(days=60)


# AWS S3 Configuration
#S3_BUCKET = "data608-2025-stock-market-data"
#bucket = "stock-market-data-uofc"
#stock_key = "historical-data/stock_60d_5m.csv"
S3_BUCKET = "stock-market-data-uofc"
S3_KEY = "historical-data/stock_60d_5m.csv"
s3_client = boto3.client("s3")

# Stocks to fetch
stocks = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA"]

# Function to fetch 5-minute interval data for the last 60 days
def fetch_stock_data(start_datetime):
    all_data = []
    for stock in stocks:
        print(f"Fetching {stock} from {start_datetime}...")
        ticker = yf.Ticker(stock)
        history = ticker.history(period="60d", interval="5m")
        history.reset_index(inplace=True)
        history["Stock"] = stock
        history = history.rename(columns={"Datetime": "Date"})
        history["Date"] = history["Date"].dt.tz_localize(None)  # Remove timezone info
        history = history[["Stock", "Date", "Open", "High", "Low", "Close", "Volume"]]
        history = history[history["Date"] > start_datetime]
        all_data.append(history)
    return pd.concat(all_data, ignore_index=True)

# Load existing data from S3
def load_existing_data():
    try:
        response = s3_client.get_object(Bucket=S3_BUCKET, Key=S3_KEY)
        existing_df = pd.read_csv(response['Body'], parse_dates=["Date"])
        print(f"Loaded existing data: {len(existing_df)} rows")
        return existing_df
    except s3_client.exceptions.NoSuchKey:
        print("No existing data found in S3.")
        return None

# Upload DataFrame to S3
def upload_to_s3(df):
    csv_buffer = df.to_csv(index=False)
    s3_client.put_object(Bucket=S3_BUCKET, Key=S3_KEY, Body=csv_buffer)
    print(f"Uploaded updated dataset to S3: {S3_KEY}")

# Main logic
existing_df = load_existing_data()

if existing_df is not None:
    last_time = existing_df["Date"].max()
    new_data = fetch_stock_data(start_datetime=last_time)
    combined_df = pd.concat([existing_df, new_data], ignore_index=True)
    combined_df.drop_duplicates(subset=["Stock", "Date"], inplace=True)
else:
    print("First-time fetch for last 60 days (5m)...")
    start_time = datetime.now() - timedelta(days=60)
    combined_df = fetch_stock_data(start_datetime=start_time)

upload_to_s3(combined_df)


No existing data found in S3.
First-time fetch for last 60 days (5m)...
Fetching AAPL from 2025-02-10 16:03:48.371494...
Fetching MSFT from 2025-02-10 16:03:48.371494...
Fetching AMZN from 2025-02-10 16:03:48.371494...
Fetching GOOGL from 2025-02-10 16:03:48.371494...
Fetching META from 2025-02-10 16:03:48.371494...
Fetching NVDA from 2025-02-10 16:03:48.371494...
Fetching TSLA from 2025-02-10 16:03:48.371494...
Uploaded updated dataset to S3: historical-data/stock_60d_5m.csv


In [10]:
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
import boto3
from io import StringIO

# AWS S3 Setup
#S3_BUCKET = "data608-2025-stock-market-data"
#S3_KEY = "news-data/historical_news_1yr.csv"
S3_BUCKET = "stock-market-data-uofc"
S3_KEY = "news-data/news_history_1yr.csv"
s3_client = boto3.client("s3")

# API Setup
API_KEY = 'hDdU807PlusyraTBnNgkkm2gFuPtkZ9F'
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "TSLA"]
tickers_str = ','.join(tickers)

def load_existing_news():
    try:
        response = s3_client.get_object(Bucket=S3_BUCKET, Key=S3_KEY)
        existing_df = pd.read_csv(response['Body'], parse_dates=["publishedDate"])
        print(f"Loaded existing news data: {len(existing_df)} articles")
        return existing_df
    except s3_client.exceptions.NoSuchKey:
        print("No existing news file found in S3.")
        return None

def upload_to_s3(df):
    buffer = StringIO()
    df.to_csv(buffer, index=False)
    s3_client.put_object(Bucket=S3_BUCKET, Key=S3_KEY, Body=buffer.getvalue())
    print(f"Updated news data uploaded to S3: {S3_KEY}")

def fetch_news(from_date, to_date):
    page = 0
    limit = 100
    all_news = []

    print(f"Fetching news from {from_date} to {to_date}")
    while True:
        page += 1
        print(f"Fetching page {page}")
        url = "https://financialmodelingprep.com/api/v3/stock_news"
        params = {
            "tickers": tickers_str,
            "from": from_date,
            "to": to_date,
            "limit": limit,
            "page": page,
            "apikey": API_KEY
        }

        response = requests.get(url, params=params)
        if response.status_code == 200:
            news_batch = response.json()
            if not news_batch:
                break
            all_news.extend(news_batch)
            time.sleep(1)
        else:
            print(f"Failed with status {response.status_code}")
            break

    if all_news:
        df = pd.DataFrame(all_news)
        df["publishedDate"] = pd.to_datetime(df["publishedDate"])
        df = df.sort_values(by=["symbol", "publishedDate"])
        return df
    else:
        print("No news returned.")
        return pd.DataFrame()

# Main logic
existing_news = load_existing_news()

if existing_news is not None:
    last_date = existing_news["publishedDate"].max()
    start_date = last_date.strftime('%Y-%m-%d')
    print(f"Updating from last saved date: {start_date}")
else:
    start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
    print(f"First-time fetch from: {start_date}")
    
end_date = datetime.now().strftime('%Y-%m-%d')

# Fetch new data
new_news_df = fetch_news(start_date, end_date)

# Merge & upload
if not new_news_df.empty:
    if existing_news is not None:
        combined_df = pd.concat([existing_news, new_news_df], ignore_index=True)
        combined_df.drop_duplicates(subset=["symbol", "publishedDate", "title"], inplace=True)
    else:
        combined_df = new_news_df

    upload_to_s3(combined_df)
else:
    print("News already up to date. No new records.")

No existing news file found in S3.
First-time fetch from: 2024-04-11
Fetching news from 2024-04-11 to 2025-04-11
Fetching page 1
Fetching page 2
Fetching page 3
Fetching page 4
Fetching page 5
Fetching page 6
Fetching page 7
Fetching page 8
Fetching page 9
Fetching page 10
Fetching page 11
Fetching page 12
Fetching page 13
Fetching page 14
Fetching page 15
Fetching page 16
Fetching page 17
Fetching page 18
Fetching page 19
Fetching page 20
Fetching page 21
Fetching page 22
Fetching page 23
Fetching page 24
Fetching page 25
Fetching page 26
Fetching page 27
Fetching page 28
Fetching page 29
Fetching page 30
Fetching page 31
Fetching page 32
Fetching page 33
Fetching page 34
Fetching page 35
Fetching page 36
Fetching page 37
Fetching page 38
Fetching page 39
Fetching page 40
Fetching page 41
Fetching page 42
Fetching page 43
Fetching page 44
Fetching page 45
Fetching page 46
Fetching page 47
Fetching page 48
Fetching page 49
Fetching page 50
Fetching page 51
Fetching page 52
Fetching pag

In [15]:
import boto3

# List of DynamoDB table names
TABLE_NAMES = [
    "realtimedata",
    "news_tracker",
    "sentiment_tracker",
    "realtime_predictions"
]

# Initialize DynamoDB
dynamodb = boto3.resource("dynamodb")

def delete_all_items(table_name):
    table = dynamodb.Table(table_name)
    print(f"🧹 Deleting all items from: {table_name}...")

    # Scan all items
    response = table.scan()
    items = response.get("Items", [])

    # Loop and delete items one by one
    with table.batch_writer() as batch:
        for item in items:
            # Assume the first key is the primary key
            key_schema = table.key_schema
            key = {k['AttributeName']: item[k['AttributeName']] for k in key_schema}
            batch.delete_item(Key=key)

    print(f"✅ Deleted {len(items)} items from {table_name}.")

if __name__ == "__main__":
    for table in TABLE_NAMES:
        delete_all_items(table)


🧹 Deleting all items from: realtimedata...
✅ Deleted 0 items from realtimedata.
🧹 Deleting all items from: news_tracker...
✅ Deleted 0 items from news_tracker.
🧹 Deleting all items from: sentiment_tracker...
✅ Deleted 0 items from sentiment_tracker.
🧹 Deleting all items from: realtime_predictions...
✅ Deleted 0 items from realtime_predictions.


In [24]:
import boto3

# DynamoDB setup
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("realtimedata")

# Scan the table and collect stock symbols
def get_unique_stocks():
    unique_stocks = set()
    response = table.scan(ProjectionExpression="Stock")
    items = response.get("Items", [])
    
    for item in items:
        unique_stocks.add(item["Stock"])
    
    # Handle pagination
    while "LastEvaluatedKey" in response:
        response = table.scan(
            ProjectionExpression="Stock",
            ExclusiveStartKey=response["LastEvaluatedKey"]
        )
        items = response.get("Items", [])
        for item in items:
            unique_stocks.add(item["Stock"])

    return unique_stocks

# Run and print
stocks = get_unique_stocks()
print(f"✅ Total Unique Stocks in 'realtimedata': {len(stocks)}")
print(f"🧾 Stock Symbols: {sorted(stocks)}")


✅ Total Unique Stocks in 'realtimedata': 7
🧾 Stock Symbols: ['AAPL', 'AMZN', 'GOOGL', 'META', 'MSFT', 'NVDA', 'TSLA']


In [25]:
import boto3
import pandas as pd
from io import StringIO

# AWS Setup
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("realtimedata")

s3 = boto3.client("s3")
S3_BUCKET = "stock-market-data-uofc"
S3_KEY = "dashboard-data/realtimedata_export.csv"

# Fetch all rows from DynamoDB
def fetch_all_data():
    items = []
    response = table.scan()
    items.extend(response["Items"])

    # Handle pagination
    while "LastEvaluatedKey" in response:
        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
        items.extend(response["Items"])

    return items

# Convert and upload
def export_to_s3():
    print("📥 Fetching data from DynamoDB...")
    data = fetch_all_data()
    if not data:
        print("⚠️ No data found.")
        return

    df = pd.DataFrame(data)
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)

    print("📤 Uploading to S3...")
    s3.put_object(Bucket=S3_BUCKET, Key=S3_KEY, Body=csv_buffer.getvalue())
    print(f"✅ Data exported to s3://{S3_BUCKET}/{S3_KEY}")

# Run export
if __name__ == "__main__":
    export_to_s3()


📥 Fetching data from DynamoDB...
📤 Uploading to S3...
✅ Data exported to s3://stock-market-data-uofc/dashboard-data/realtimedata_export.csv
