<a href="https://colab.research.google.com/github/Veena24-hub/Intern-Phase-1/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### =========================================
##  DS TEAM: ALERT ENGINE
### =========================================
### Purpose: Demonstrate full alert pipeline (StockTwits → FinBERT → Alerts)
### Environment: Google Colab / Jupyter Notebook
### =========================================



In [84]:
# --- Step 0: Install dependencies ---
!pip install transformers torch requests emoji python-dotenv -q

In [85]:
# --- Step 1: Import libraries ---
import requests
import re
import emoji
import time
import json
from datetime import datetime
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch


In [86]:
{
  "stock_symbol": "AAPL",
  "alert_type": "Social Sentiment",
  "sentiment": "Negative",
  "confidence": 96,
  "summary": "Apple facing supply chain headwinds this quarter",
  "source": {
    "type": "Social Media",
    "name": "StockTwits - @marketwatcher",
    "url": "https://stocktwits.com/marketwatcher/message/999002",
    "timestamp": "2025-10-25T01:50:46Z"
  }
}


{'stock_symbol': 'AAPL',
 'alert_type': 'Social Sentiment',
 'sentiment': 'Negative',
 'confidence': 96,
 'summary': 'Apple facing supply chain headwinds this quarter',
 'source': {'type': 'Social Media',
  'name': 'StockTwits - @marketwatcher',
  'url': 'https://stocktwits.com/marketwatcher/message/999002',
  'timestamp': '2025-10-25T01:50:46Z'}}

In [87]:
# --- Step 2: Configurations ---
STOCK_SYMBOL = "AAPL"
STOCKTWITS_URL = f"https://api.stocktwits.com/api/2/streams/symbol/{STOCK_SYMBOL}.json"
CONFIDENCE_THRESHOLD = 75
MONITORING_INTERVAL = 15 * 60  # 15 minutes in seconds
MOCK_BACKEND_FILE = "mock_backend_alerts.json"

In [88]:
# --- Step 3: Initialize model (FinBERT) ---
print("[INIT] Loading FinBERT model... (this may take ~30s)")
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
print("[INIT] ✅ FinBERT loaded successfully!\n")


[INIT] Loading FinBERT model... (this may take ~30s)
[INIT] ✅ FinBERT loaded successfully!



In [89]:
# --- Step 4: Initialize duplicate tracker ---
seen_post_ids = set()

In [90]:
# ============================================================
# Pipeline 1: Data Ingestion & Cleaning
# ============================================================
def fetch_stocktwits_posts():
    """Fetch latest 30 posts from StockTwits."""
    try:
        response = requests.get(STOCKTWITS_URL, timeout=10)
        data = response.json()
        return data.get("messages", [])
    except Exception as e:
        print(f"[Pipeline 1] ❌ Error fetching StockTwits data: {e}")
        return []


def validate_post(post):
    """Check if post has required fields and enough text."""
    try:
        required_fields = ["id", "body", "created_at", "user"]
        if not all(field in post for field in required_fields):
            return False
        if not post["body"] or len(post["body"]) < 10:
            return False
        if "username" not in post["user"]:
            return False
        return True
    except:
        return False


def clean_text(text):
    """Clean StockTwits text for FinBERT."""
    text = emoji.replace_emoji(text, replace='')  # remove emojis
    text = re.sub(r"http\S+|www.\S+", "", text)  # remove URLs
    text = re.sub(r"<.*?>", "", text)  # remove HTML tags
    text = re.sub(r"[^A-Za-z0-9$%\s\.\-\+]", "", text)  # remove special chars
    text = re.sub(r"\s+", " ", text).strip()  # clean spaces
    text = text.replace("%", " percent")  # expand %
    return text


def pipeline1():
    """Run full data ingestion and cleaning pipeline."""
    raw_posts = fetch_stocktwits_posts()
    print(f"[Pipeline 1] Received {len(raw_posts)} posts")

    processed = []
    for post in raw_posts:
        if not validate_post(post):
            continue
        post_id = post["id"]
        if post_id in seen_post_ids:
            continue  # skip duplicates

        seen_post_ids.add(post_id)
        cleaned = clean_text(post["body"])
        username = post["user"]["username"]
        timestamp = post["created_at"]
        url = f"https://stocktwits.com/{username}/message/{post_id}"

        processed.append({
            "post_id": str(post_id),
            "stock_symbol": STOCK_SYMBOL,
            "cleaned_text": cleaned,
            "original_text": post["body"],
            "metadata": {
                "author": username,
                "timestamp": timestamp,
                "url": url,
                "source_type": "Social Media",
                "source_name": f"StockTwits - @{username}"
            }
        })

    print(f"[Pipeline 1] ✅ Valid posts: {len(processed)} (new)")
    return processed

In [91]:
# ============================================================
# Pipeline 2: Sentiment Analysis (FinBERT)
# ============================================================
def analyze_sentiment(text):
    """Run FinBERT sentiment analysis on cleaned text."""
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    with torch.no_grad():
        outputs = model(**inputs)
        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)[0]
    probs_dict = {
        "positive": float(probs[0]),
        "negative": float(probs[1]),
        "neutral": float(probs[2])
    }
    sentiment = max(probs_dict, key=probs_dict.get).capitalize()
    confidence = round(probs_dict[sentiment.lower()] * 100)
    return sentiment, confidence, probs_dict


def is_important(sentiment, confidence):
    """Filter only high-confidence Positive/Negative alerts."""
    if confidence <= CONFIDENCE_THRESHOLD:
        return False
    if sentiment in ["Positive", "Negative"]:
        return True
    return False


def pipeline2(posts):
    """Analyze each post and filter important alerts."""
    results = []
    for post in posts:
        text = post["cleaned_text"]
        sentiment, confidence, probs = analyze_sentiment(text)

        summary = text[:100]
        important = is_important(sentiment, confidence)

        result = {
            **post,
            "analysis": {
                "sentiment": sentiment,
                "confidence": confidence,
                "probabilities": probs
            },
            "summary": summary,
            "is_important": important
        }

        status = "✅ Important" if important else "🟡 Skipped"
        print(f"[Pipeline 2] {status} → {sentiment} ({confidence}%) | {summary[:50]}")
        if important:
            results.append(result)
    return results

In [92]:
# ============================================================
# Pipeline 3: Alert Packaging & Delivery (Mock Backend)
# ============================================================
def send_to_mock_backend(alert):
    """Simulate sending alert to backend (writes to JSON file)."""
    payload = {
        "stock_symbol": alert["stock_symbol"],
        "alert_type": "Social Sentiment",
        "sentiment": alert["analysis"]["sentiment"],
        "confidence": alert["analysis"]["confidence"],
        "summary": alert["summary"],
        "source": {
            "type": alert["metadata"]["source_type"],
            "name": alert["metadata"]["source_name"],
            "url": alert["metadata"]["url"],
            "timestamp": alert["metadata"]["timestamp"]
        }
    }

    print(f"[Pipeline 3] 🚀 Sending alert: {payload['sentiment']} ({payload['confidence']}%)")

    # Append to mock backend file
    try:
        with open(MOCK_BACKEND_FILE, "a") as f:
            f.write(json.dumps(payload) + "\n")
        print("[Pipeline 3] ✅ Alert logged successfully\n")
    except Exception as e:
        print(f"[Pipeline 3] ❌ Error logging alert: {e}")


def pipeline3(alerts):
    """Deliver alerts to mock backend."""
    for alert in alerts:
        send_to_mock_backend(alert)



In [93]:
def fetch_stocktwits_posts():
    """Fetch latest 30 posts from StockTwits with error handling and fallback."""
    try:
        response = requests.get(STOCKTWITS_URL, timeout=10)
        if response.status_code != 200:
            print(f"[Pipeline 1] ⚠️ StockTwits returned HTTP {response.status_code}")
            raise ValueError("Non-200 response")

        # Try to parse JSON safely
        try:
            data = response.json()
        except Exception:
            print("[Pipeline 1] ⚠️ Non-JSON response detected. Showing first 200 chars:")
            print(response.text[:200])
            raise ValueError("Invalid JSON response")

        return data.get("messages", [])

    except Exception as e:
        print(f"[Pipeline 1] ❌ Error fetching StockTwits data: {e}")
        print("[Pipeline 1] ↩️ Using mock fallback data for demonstration...")
        # ---- Mock fallback posts ----
        mock_posts = [
            {
                "id": 999001,
                "body": "AAPL just crushed earnings — huge revenue beat! 🚀",
                "created_at": str(datetime.utcnow()),
                "user": {"username": "finnews_bot"}
            },
            {
                "id": 999002,
                "body": "Apple facing supply chain headwinds this quarter 😕",
                "created_at": str(datetime.utcnow()),
                "user": {"username": "marketwatcher"}
            },
            {
                "id": 999003,
                "body": "Holding my $AAPL position for long term. Neutral outlook.",
                "created_at": str(datetime.utcnow()),
                "user": {"username": "investor_john"}
            }
        ]
        return mock_posts


In [94]:
# ============================================================
# ORCHESTRATOR: Controls the 3 pipelines
# ============================================================
def run_cycle():
    print("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
    print(f"[{datetime.utcnow()}] [Orchestrator] Starting new monitoring cycle")
    print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")

    # 1️⃣ Fetch & clean
    posts = pipeline1()
    if not posts:
        print("[Orchestrator] No new valid posts found. Skipping...")
        return

    # 2️⃣ Analyze sentiment
    important_alerts = pipeline2(posts)

    # 3️⃣ Send alerts
    if important_alerts:
        pipeline3(important_alerts)
    else:
        print("[Orchestrator] No important alerts to send.")

    print(f"[Orchestrator] ✅ Cycle complete — {len(important_alerts)} alert(s) sent.")




In [95]:
# ============================================================
# Run one cycle (manual demo)
# ============================================================
run_cycle()


━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[2025-10-25 01:53:09.977960] [Orchestrator] Starting new monitoring cycle
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[Pipeline 1] ⚠️ StockTwits returned HTTP 403
[Pipeline 1] ❌ Error fetching StockTwits data: Non-200 response
[Pipeline 1] ↩️ Using mock fallback data for demonstration...
[Pipeline 1] Received 3 posts
[Pipeline 1] ✅ Valid posts: 3 (new)
[Pipeline 2] 🟡 Skipped → Negative (51%) | AAPL just crushed earnings huge revenue beat


  print(f"[{datetime.utcnow()}] [Orchestrator] Starting new monitoring cycle")
  "created_at": str(datetime.utcnow()),
  "created_at": str(datetime.utcnow()),
  "created_at": str(datetime.utcnow()),


[Pipeline 2] ✅ Important → Negative (96%) | Apple facing supply chain headwinds this quarter
[Pipeline 2] 🟡 Skipped → Neutral (76%) | Holding my $AAPL position for long term. Neutral o
[Pipeline 3] 🚀 Sending alert: Negative (96%)
[Pipeline 3] ✅ Alert logged successfully

[Orchestrator] ✅ Cycle complete — 1 alert(s) sent.


In [25]:
!pip install transformers torch requests pandas




In [39]:
!pip install prefect

Collecting prefect
  Using cached prefect-3.4.25-py3-none-any.whl.metadata (13 kB)
Collecting aiosqlite<1.0.0,>=0.17.0 (from prefect)
  Using cached aiosqlite-0.21.0-py3-none-any.whl.metadata (4.3 kB)
Collecting apprise<2.0.0,>=1.1.0 (from prefect)
  Using cached apprise-1.9.5-py3-none-any.whl.metadata (56 kB)
Collecting asgi-lifespan<3.0,>=1.0 (from prefect)
  Using cached asgi_lifespan-2.1.0-py3-none-any.whl.metadata (10 kB)
Collecting asyncpg<1.0.0,>=0.23 (from prefect)
  Using cached asyncpg-0.30.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.0 kB)
Collecting coolname<3.0.0,>=1.0.4 (from prefect)
  Using cached coolname-2.2.0-py2.py3-none-any.whl.metadata (6.2 kB)
Collecting dateparser<2.0.0,>=1.1.1 (from prefect)
  Using cached dateparser-1.2.2-py3-none-any.whl.metadata (29 kB)
Collecting docker<8.0,>=4.0 (from prefect)
  Using cached docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting exceptiongroup>=1.0.0 (from prefect)
  Using cached exceptiongr

In [40]:
import requests
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from scipy.special import softmax
from prefect import flow, task
from datetime import datetime



In [41]:
# Load FinBERT (Pretrained Finance Model)
# This is a BERT model fine-tuned on financial text (headlines, filings, earnings calls). It predicts Positive / Negative / Neutral sentiment.

# Load FinBERT model and tokenizer
finbert_model = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(finbert_model)
model = AutoModelForSequenceClassification.from_pretrained(finbert_model)



In [43]:
@task
def fetch_sec_data(cik: str):
    url = f"https://data.sec.gov/submissions/CIK{cik}.json"
    headers = {"User-Agent": "VeenaChebrolu veena@example.com"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.json()
        recent = data["filings"]["recent"]
        df = pd.DataFrame({
            "accessionNumber": recent["accessionNumber"],
            "filingDate": recent["filingDate"],
            "form": recent["form"],
            "primaryDocDescription": recent["primaryDocDescription"]
        })
        return df
    else:
        raise Exception(f"Failed to fetch data: {response.status_code}")

In [44]:
# STEP 4 – Fetch Live SEC Filings


# Company: Apple
cik = "0000320193"
url = f"https://data.sec.gov/submissions/CIK{cik}.json"

headers = {"User-Agent": "VeenaChebrolu veenalucky727@gmail.com"}

response = requests.get(url, headers=headers)
data = response.json()

recent = data["filings"]["recent"]

df = pd.DataFrame({
    "accessionNumber": recent["accessionNumber"],
    "filingDate": recent["filingDate"],
    "form": recent["form"],
    "primaryDocDescription": recent["primaryDocDescription"]
})

df.head(5)

Unnamed: 0,accessionNumber,filingDate,form,primaryDocDescription
0,0002050912-25-000008,2025-10-17,4,FORM 4
1,0001631982-25-000009,2025-10-17,4,FORM 4
2,0001950047-25-008030,2025-10-16,144,
3,0001214156-25-000011,2025-10-03,4,FORM 4
4,0001767094-25-000009,2025-10-03,4,FORM 4


In [24]:
# WHAT THESE FORMS INDICATE

# Form Type	     Description	                                                                          Market Insight
# Form 4	      Filed by company insiders (CEOs, CFOs, Directors) when they buy or sell shares	      Often used to detect insider sentiment (bullish/bearish)

# Form 144	    Filed when an insider plans to sell restricted or control securities	                Indicates upcoming insider selling pressure

# 10-K / 10-Q	  Annual or quarterly reports (financial performance)	                                  Used for earnings and fundamental analysis

# 8-K	          Reports significant unscheduled corporate events (mergers, leadership change, etc.)	  Often market-moving news

In [45]:
@task
def filter_filings(df: pd.DataFrame):
    important_forms = ["4", "144", "8-K", "10-K", "10-Q"]
    return df[df["form"].isin(important_forms)].reset_index(drop=True)


In [46]:
# STEP 5 – Identify Key Filings (e.g., Insider Trades or Form 4). Now, let’s filter for the most market-sensitive forms.

# Filter for insider trading and key forms
filtered_df = df[df["form"].isin(["4", "144", "8-K", "10-K", "10-Q"])]
filtered_df.reset_index(drop=True, inplace=True)
filtered_df

Unnamed: 0,accessionNumber,filingDate,form,primaryDocDescription
0,0002050912-25-000008,2025-10-17,4,FORM 4
1,0001631982-25-000009,2025-10-17,4,FORM 4
2,0001950047-25-008030,2025-10-16,144,
3,0001214156-25-000011,2025-10-03,4,FORM 4
4,0001767094-25-000009,2025-10-03,4,FORM 4
...,...,...,...,...
772,0001181431-15-001898,2015-02-03,4,2015.02.01 WAGNER FORM 4 - RSU VESTING
773,0001193125-15-023732,2015-01-28,8-K,8-K
774,0001193125-15-023697,2015-01-28,10-Q,10-Q
775,0001181431-15-001513,2015-01-27,4,2015.01.23 RICCIO FORM 4 - 10B5.1 PLAN


In [47]:
@task
def analyze_sentiment_batch(df: pd.DataFrame):
    def analyze_text(text):
        inputs = tokenizer(text, return_tensors="pt", truncation=True)
        outputs = model(**inputs)
        scores = outputs[0][0].detach().numpy()
        scores = softmax(scores)
        labels = ['negative', 'neutral', 'positive']
        sentiment = labels[scores.argmax()]
        confidence = scores.max()
        return sentiment, confidence

    df["sentiment"], df["confidence"] = zip(*df["primaryDocDescription"].fillna("").map(analyze_text))
    return df


In [48]:
# STEP 6 – Apply FinBERT Sentiment to Descriptions. Now analyze how “positive” or “negative” the filing text appears (based on its description).

def analyze_sentiment(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True)
    outputs = model(**inputs)
    scores = outputs[0][0].detach().numpy()
    scores = softmax(scores)
    labels = ['negative', 'neutral', 'positive']
    sentiment = labels[scores.argmax()]
    confidence = scores.max()
    return sentiment, confidence

# Apply sentiment analysis
filtered_df["sentiment"], filtered_df["confidence"] = zip(
    *filtered_df["primaryDocDescription"].fillna("").map(analyze_sentiment)
)

filtered_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_df["sentiment"], filtered_df["confidence"] = zip(
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_df["sentiment"], filtered_df["confidence"] = zip(


Unnamed: 0,accessionNumber,filingDate,form,primaryDocDescription,sentiment,confidence
0,0002050912-25-000008,2025-10-17,4,FORM 4,positive,0.934362
1,0001631982-25-000009,2025-10-17,4,FORM 4,positive,0.934362
2,0001950047-25-008030,2025-10-16,144,,positive,0.424185
3,0001214156-25-000011,2025-10-03,4,FORM 4,positive,0.934362
4,0001767094-25-000009,2025-10-03,4,FORM 4,positive,0.934362
...,...,...,...,...,...,...
772,0001181431-15-001898,2015-02-03,4,2015.02.01 WAGNER FORM 4 - RSU VESTING,positive,0.952088
773,0001193125-15-023732,2015-01-28,8-K,8-K,positive,0.906052
774,0001193125-15-023697,2015-01-28,10-Q,10-Q,positive,0.897784
775,0001181431-15-001513,2015-01-27,4,2015.01.23 RICCIO FORM 4 - 10B5.1 PLAN,positive,0.947499


In [49]:
@task
def trigger_alerts(df: pd.DataFrame, company_name: str):
    for _, row in df.iterrows():
        if row["form"] in ["4", "144"] and row["sentiment"] == "negative" and row["confidence"] > 0.7:
            print(f"\n🚨 ALERT: Negative insider activity detected for {company_name}")
            print(f"   Form: {row['form']}")
            print(f"   Description: {row['primaryDocDescription']}")
            print(f"   Filing Date: {row['filingDate']}")
            print(f"   Confidence: {row['confidence']:.2f}")
            print(f"   Timestamp: {datetime.now()}")
            print("-" * 100)


In [50]:
# STEP 7 – Trigger Agentic Alerts. Now we’ll simulate the “Agent” — the logic layer that decides what’s worth reacting to.

# Example: alert if: The form is 4 or 144 (insider activity)
# Sentiment is negative
# Confidence > 0.7

def trigger_alerts(df):
    for _, row in df.iterrows():
        if row["form"] in ["4", "144"] and row["sentiment"] == "negative" and row["confidence"] > 0.7:
            print(f"🚨 ALERT: {row['form']} filing shows negative sentiment.")
            print(f"   → Description: {row['primaryDocDescription']}")
            print(f"   → Filing Date: {row['filingDate']}")
            print(f"   → Confidence: {row['confidence']:.2f}")
            print("-" * 80)

# Run alert module
trigger_alerts(filtered_df)

In [51]:
## Step 8 - Set Up a Polling Loop (for Dynamic Updates) ##

# Step 5: Poll every few minutes for updates (simulate streaming)
def poll_sec_feed(interval=300):  # interval in seconds (e.g. 5 minutes)
    print("Starting live SEC data polling...")
    while True:
        try:
            response = requests.get(url, headers=headers)
            if response.status_code == 200:
                data = response.json()
                filings = data.get("filings", {}).get("recent", {})
                df = pd.DataFrame({
                    "accessionNumber": filings.get("accessionNumber", []),
                    "filingDate": filings.get("filingDate", []),
                    "form": filings.get("form", []),
                })
                latest = df.head(1)
                print(f"\n[{datetime.now()}] Latest filing:")
                print(latest)
            else:
                print(f"Error {response.status_code}")
        except Exception as e:
            print(f"Error: {e}")
        time.sleep(interval)


###  WHAT’S GOING ON BEHIND THE SCENES: FinBERT model encodes the description text into numerical vectors → predicts how the market might interpret it.

The agent then combines context (form type + sentiment) to decide if it’s a signal.


### These signals can be fed into downstream systems like:

LLM-based reasoning agent

Dashboard alerts

Risk scoring pipelines

In [54]:
@flow
def sec_agentic_flow(cik: str, company_name: str):
    """
    Prefect flow to fetch, filter, analyze, and alert on SEC filings.
    """
    sec_data_df = fetch_sec_data(cik)
    if sec_data_df is not None:
        filtered_data_df = filter_filings(sec_data_df)
        sentiment_analyzed_df = analyze_sentiment_batch(filtered_data_df)
        trigger_alerts(sentiment_analyzed_df, company_name)