<a href="https://colab.research.google.com/github/SerbanA01/Financial-LLM/blob/main/DataStorage.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install sec-edgar-downloader
!pip install investpy
!pip install PyPDF2

In [5]:
import pandas as pd
import yfinance as yf
import os
import time
import random
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential
from pydantic import BaseModel, ValidationError
import pyarrow.parquet as pq
from tqdm import tqdm
from google.colab import drive
from bs4 import BeautifulSoup
import requests
from io import StringIO, BytesIO
import investpy
import json
from typing import List, Dict, Optional
import gzip
import sqlite3
from datetime import datetime, timedelta
import re
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter


In [None]:
drive.flush_and_unmount()
drive.mount('/content/drive')
#restart in case it does not pick the right file

Mounted at /content/drive


In [6]:
#maybe consider FTSE100
#don t forget about cryptos and forex

def get_sp500_tickers():
    """Get S&P 500 tickers with more reliable table parsing"""
    url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    table = soup.find('table', {'id': 'constituents'})
    df = pd.read_html(StringIO(str(table)))[0]
    return df['Symbol'].tolist()

def get_nasdaq_tickers():
    """Get NASDAQ-listed common stocks with proper column handling"""
    url = "https://www.nasdaqtrader.com/dynamic/SymDir/nasdaqlisted.txt"
    columns = [
        'Symbol', 'Security Name', 'Market Category', 'Test Issue',
        'Financial Status', 'Round Lot Size', 'ETF', 'NextShares'
    ]
    df = pd.read_csv(url, sep="|", names=columns)
    df = df[:-1]  # Remove summary row
    # Filter out ETFs and test issues
    df = df[(df['Test Issue'] == 'N') & (df['ETF'] == 'N')]
    return df['Symbol'].tolist()

def get_other_tickers():
    """Get NYSE/AMEX stocks with proper column handling"""
    url = "https://www.nasdaqtrader.com/dynamic/SymDir/otherlisted.txt"
    columns = [
        'ACT Symbol', 'Security Name', 'Exchange', 'CQS Symbol',
        'ETF', 'Round Lot Size', 'Test Issue', 'NASDAQ Symbol'
    ]
    df = pd.read_csv(url, sep="|", names=columns)
    df = df[:-1]  # Remove summary row
    # Filter out ETFs and test issues
    df = df[(df['Test Issue'] == 'N') & (df['ETF'] == 'N')]
    return df['ACT Symbol'].tolist()

def get_global_tickers():
    """Get global tickers with error handling"""
    try:
        stocks_df = investpy.stocks.get_stocks()
        # Filter for common stock types (adjust based on available data)
        if 'type' in stocks_df.columns:
            stocks_df = stocks_df[stocks_df['type'] == 'Stock']
        return stocks_df['symbol'].unique().tolist()
    except ImportError:
        print("Global tickers disabled: install investpy (pip install investpy)")
        return []
    except Exception as e:
        print(f"Error fetching global tickers: {str(e)}")
        return []

def get_all_tickers():
    """Combine all sources with error handling"""
    tickers = []

    try:
        tickers += get_sp500_tickers()
        print(f"Found {len(tickers)} S&P 500 tickers")
    except Exception as e:
        print(f"Error fetching S&P 500: {str(e)}")

    try:
        nasdaq = get_nasdaq_tickers()
        tickers += nasdaq
        print(f"Added {len(nasdaq)} NASDAQ tickers")
    except Exception as e:
        print(f"Error fetching NASDAQ: {str(e)}")

    try:
        other = get_other_tickers()
        tickers += other
        print(f"Added {len(other)} NYSE/AMEX tickers")
    except Exception as e:
        print(f"Error fetching NYSE/AMEX: {str(e)}")

    try:
        global_tickers = get_global_tickers()
        tickers += global_tickers
        print(f"Added {len(global_tickers)} global tickers")
    except Exception as e:
        print(f"Error fetching global tickers: {str(e)}")

    # Clean and deduplicate
    clean_tickers = list(set([t.strip().upper() for t in tickers if isinstance(t, str)]))
    return sorted(clean_tickers)



In [None]:
if __name__ == "__main__":
    all_tickers = get_all_tickers()
    print(f"\nTotal unique tickers: {len(all_tickers)}")



    #!!! separate the global tickers from the important markets so that the code for the filings are done on the important markets



    # Save to file
    # with open("all_tickers.txt", "w") as f:
    #     f.write("\n".join(all_tickers))

    #store_historical_data()



Found 503 S&P 500 tickers
Added 4002 NASDAQ tickers
Added 3158 NYSE/AMEX tickers
Global tickers disabled: install investpy (pip install investpy)
Added 0 global tickers

Total unique tickers: 7159


In [29]:
# --- Constants ---
SEC_EDGAR_SEARCH_URL = "https://www.sec.gov/cgi-bin/browse-edgar"
SEC_CIK_LOOKUP_URL = "https://www.sec.gov/files/company_tickers.json"
HEADERS = {
    "User-Agent": "University_Project_Financial_LLM (YOUR_EMAIL@EXAMPLE.COM)",
    "Accept-Encoding": "gzip, deflate",
    "Host": "www.sec.gov"
}
SEC_DELAY = 10

# --- Helper Functions ---
def get_cik(ticker: str) -> Optional[str]:
    """Convert stock ticker to SEC CIK number with better error handling."""
    try:
        response = requests.get(SEC_CIK_LOOKUP_URL, headers=HEADERS)
        response.raise_for_status()
        cik_data = response.json()

        # Handle updated SEC JSON structure
        for entry in cik_data.values():
            if isinstance(entry, dict) and entry.get("ticker") == ticker.upper():
                cik = str(entry["cik_str"]).zfill(10)
                print(f"Found CIK {cik} for {ticker}")
                return cik
        print(f"CIK not found for {ticker}")
        return None
    except Exception as e:
        print(f"CIK lookup failed: {str(e)}")
        return None



def extract_text_from_url(url: str) -> Optional[str]:
    """Extract cleaned text from SEC documents (supports HTML and PDF)."""
    try:
        time.sleep(SEC_DELAY)
        response = requests.get(url, headers=HEADERS)
        response.raise_for_status()

        # Handle PDF documents
        if url.lower().endswith(".pdf"):
            pdf_reader = PdfReader(BytesIO(response.content))
            text = "\n".join([page.extract_text() for page in pdf_reader.pages])
        else:
            # Handle HTML/text documents
            soup = BeautifulSoup(response.content, "html.parser")
            text = soup.get_text()

        # Clean SEC boilerplate
        text = re.sub(r'http[s]?://\S+', '', text)
        text = re.sub(r"UNITED STATES SECURITIES AND EXCHANGE COMMISSION.*?Washington, D\.C\.\s*\d+", "", text, flags=re.DOTALL)
        text = re.sub(r"Table of Contents.*?(?=Item\s+1)", "", text, flags=re.DOTALL|re.IGNORECASE)
        text = re.sub(r'[0-9]{10,}', '', text)
        text = re.sub(r'[a-fA-F0-9]{32,}', '', text)
        text = text.replace("\xa0", " ")
        text = re.sub(r'\n+', '\n', text).strip()
        return text
    except Exception as e:
        print(f"Text extraction failed for {url}: {str(e)}")
        return None

def fetch_filing_documents(filing_detail_url: str, filing_type: str) -> List[Dict]:
    """Return documents with text content instead of URLs."""
    try:
        time.sleep(SEC_DELAY)
        response = requests.get(filing_detail_url, headers=HEADERS)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, "html.parser")
        documents = []

        # Corrected: Look for documents table properly
        doc_table = soup.find("table", class_="tableFile", summary="Document Format Files")
        if doc_table:
            rows = doc_table.find_all("tr")[1:]  # Skip the header row
            for row in rows:
                cols = row.find_all("td")
                if len(cols) >= 4:
                    doc_type = cols[3].text.strip()
                    doc_href = cols[2].find("a")
                    if doc_href and doc_href.has_attr("href") and doc_type.lower() == filing_type.lower():
                        doc_url = f"https://www.sec.gov{doc_href['href']}"

                        # Avoid the inline XBRL viewer, go straight for HTML
                        if "ix?doc=" in doc_url:
                            doc_url = doc_url.replace("/ix?doc=", "")

                        text = extract_text_from_url(doc_url)
                        if text:
                            splitter = RecursiveCharacterTextSplitter(
                                chunk_size=1000,
                                chunk_overlap=200,
                                separators=["\n\n", "\n", ".", " ", ""]
                            )
                            chunks = splitter.split_text(text)
                            documents.append({
                                "url": doc_url,
                                "text": chunks,
                                "filing_type": filing_type,
                                "doc_type": doc_type
                            })
                        break  # Stop after finding the main document
        return documents
    except Exception as e:
        print(f"Document fetch failed: {str(e)}")
        return []


# --- Main Function ---
# --- Main Function ---
def fetch_sec_filings(
    ticker: str,
    filing_type: str = "10-K",
    max_results: int = 5,
    max_retries: int = 3,
    save_to_disk: bool = True  # New flag to control storage
) -> List[Dict]:
    """Fetch filings and return structured data with extracted text."""
    cik = get_cik(ticker)
    if not cik:
        return []

    filings = []
    params = {
        "action": "getcompany",
        "CIK": cik,
        "type": filing_type,
        "owner": "exclude",
        "count": "100"
    }

    try:
        for _ in range(max_retries):
            response = requests.get(SEC_EDGAR_SEARCH_URL, params=params, headers=HEADERS)
            if response.status_code == 200:
                soup = BeautifulSoup(response.content, "html.parser")
                table = soup.find("table", class_="tableFile2")

                if table:
                    for row in table.find_all("tr")[1:]:  # Skip header row
                        cols = row.find_all("td")
                        if len(cols) >= 5:
                            link = cols[1].find("a")
                            if link and link.has_attr("href"):
                                filing_detail_url = f"https://www.sec.gov{link['href']}"

                                # Modified: Get documents with extracted text
                                documents = fetch_filing_documents(filing_detail_url, filing_type)

                                filing_data = {
                                    "ticker": ticker,
                                    "filing_type": filing_type,
                                    "filing_date": cols[3].text.strip(),
                                    "documents": documents  # Now contains text chunks
                                }

                                # New: Save to disk/cloud
                                if save_to_disk:
                                    os.makedirs(f"data/{ticker}", exist_ok=True)
                                    filename = f"data/{ticker}/{filing_data['filing_date']}.json"
                                    with open(filename, "w") as f:
                                        json.dump(filing_data, f)

                                filings.append(filing_data)

                                if len(filings) >= max_results:
                                    return filings
                    break
            else:
                print(f"Retrying... (Status code {response.status_code})")
                time.sleep(30)

    except Exception as e:
        print(f"Fatal error: {str(e)}")

    return filings

In [33]:
# --- Test Execution ---
if __name__ == "__main__":
    filings = fetch_sec_filings("AAPL", filing_type='10-Q' ,max_results=1,save_to_disk=False)
    print(filings)
    print(filings[0]['documents'][0]['text'][2])



Found CIK 0000320193 for AAPL
[{'ticker': 'AAPL', 'filing_type': '10-Q', 'filing_date': '2025-01-31', 'documents': [{'url': 'https://www.sec.gov/Archives/edgar/data/320193/000032019325000008/aapl-20241228.htm', 'text': ['aapl-20241228false2025Q--09-27P1YP1YP1YP1YP421DP342Dxbrli:sharesiso4217:USDiso4217:USDxbrli:sharesxbrli:pureaapl:Customeraapl:Vendor-09-292024-12-us-gaap:CommonStockMember2024-09-292024-12-aapl:A0.000Notesdue2025Member2024-09-292024-12-aapl:A0.875NotesDue2025Member2024-09-292024-12-aapl:A1.625NotesDue2026Member2024-09-292024-12-aapl:A2.000NotesDue2027Member2024-09-292024-12-aapl:A1.375NotesDue2029Member2024-09-292024-12-aapl:A3.050NotesDue2029Member2024-09-292024-12-aapl:A0.500Notesdue2031Member2024-09-292024-12-aapl:A3', '.600NotesDue2042Member2024-09-292024-12--01-us-gaap:ProductMember2024-09-292024-12-us-gaap:ProductMember2023-10-012023-12-us-gaap:ServiceMember2024-09-292024-12-us-gaap:ServiceMember2023-10-012023-12--10-012023-12--12--09--09-us-gaap:CommonStockInclu

In [None]:

# --- Configuration ---
BASE_PATH = "content/drive/Mydrive/Licenta/SecFilings/"  # Using your D: drive
DATABASE_PATH = os.path.join(BASE_PATH, "filings.db")
HEADERS = {"User-Agent": "Financial Research Bot (your@email.com)"}  # Use real contact info

# --- Database Setup ---
def init_database():
    conn = sqlite3.connect(DATABASE_PATH)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS filings (
            id INTEGER PRIMARY KEY,
            ticker TEXT,
            type TEXT,
            date TEXT,
            year INTEGER,
            quarter INTEGER,
            file_path TEXT,
            compressed_size REAL,
            processed BOOLEAN DEFAULT 0
        )
    """)
    conn.commit()
    conn.close()

# --- Enhanced Filing Fetcher ---
def fetch_and_store_filings(ticker, years=6):
    """Store filings with directory structure: D:/filings/[ticker]/[type]/[year]/[file]"""
    conn = sqlite3.connect(DATABASE_PATH)
    cutoff = datetime.now() - timedelta(days=years*365)

    for filing_type in ["10-K", "10-Q"]:
        try:
            filings = fetch_sec_filings(ticker, filing_type)  # Use your existing function

            for filing in filings:
                filing_date = datetime.strptime(filing["filing_date"], "%Y-%m-%d")
                if filing_date < cutoff:
                    continue

                # Create directory structure
                dir_path = os.path.join(BASE_PATH, ticker, filing_type, str(filing_date.year))
                os.makedirs(dir_path, exist_ok=True)

                # Download and compress
                for doc_url in filing["documents"]:
                    file_name = f"{filing_date.date()}.html.gz"
                    file_path = os.path.join(dir_path, file_name)

                    if not os.path.exists(file_path):
                        response = requests.get(doc_url, headers=HEADERS)
                        with gzip.open(file_path, "wb") as f:
                            f.write(response.content)

                    # Store metadata
                    file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB
                    quarter = (filing_date.month - 1) // 3 + 1 if filing_type == "10-Q" else None

                    conn.execute("""
                        INSERT INTO filings (ticker, type, date, year, quarter, file_path, compressed_size)
                        VALUES (?, ?, ?, ?, ?, ?, ?)
                    """, (ticker, filing_type, filing_date.date(), filing_date.year, quarter, file_path, file_size))
                    conn.commit()

        except Exception as e:
            print(f"Failed {ticker} {filing_type}: {str(e)}")
            continue

    conn.close()

# --- Storage Estimator ---
def calculate_storage():
    conn = sqlite3.connect(DATABASE_PATH)
    cursor = conn.execute("""
        SELECT
            COUNT(*) as total_files,
            SUM(compressed_size) as total_size_mb
        FROM filings
    """)
    result = cursor.fetchone()
    print(f"Total filings: {result[0]:,}")
    print(f"Estimated storage: {result[1]:.2f} MB ({result[1]/1024:.2f} GB)")
    conn.close()

# --- Main Execution ---
if __name__ == "__main__":
    init_database()

    # Example ticker list - replace with your 8k tickers
    tickers = ["AAPL", "MSFT", "GOOGL"]  # Start with 3 for testing

    for ticker in tqdm(tickers):
        fetch_and_store_filings(ticker)
        time.sleep(10)  # SEC rate limit compliance

    calculate_storage()

OperationalError: unable to open database file

In [None]:
import gzip

with gzip.open('/content/drive/MyDrive/Licenta/sec_filings/AAPL/10-K/2021/2021-10-29.txt.gz', 'rt', encoding='utf-8') as f:
    if f is None:
        print("File not found")
    else:
      print("printing")
      for line in f:
          print(line.strip())


printing


In [None]:
# Step 1: Iterate over tickers and fetch data
tickers = get_all_tickers()  # Your function that returns a list of tickers

for ticker in tickers:
    filings = fetch_sec_filings(ticker,filing_type="10-K")  # Function to fetch filings from EDGAR
    transcripts = fetch_earnings_call_transcripts(ticker)  # Optionally, fetch transcripts
    # Save or process the data as needed

# Step 2: Preprocess & chunk text
for document in all_documents:
    clean_text = clean_document(document)
    chunks = chunk_text(clean_text, chunk_size=300)  # Example function to chunk text

# Step 3: Generate embeddings
embeddings = []
for chunk in chunks:
    vector = embed_text(chunk)  # Use a pre-trained model like Sentence-BERT
    embeddings.append({'ticker': ticker, 'text': chunk, 'vector': vector})

# Step 4: Build an index with FAISS
index = build_faiss_index([item['vector'] for item in embeddings])
# Optionally, store metadata separately or alongside

# Step 5: Query and retrieval
def retrieve(query, index, embeddings, top_k=5):
    query_vector = embed_text(query)
    retrieved_ids = search_index(index, query_vector, top_k)
    retrieved_chunks = [embeddings[i]['text'] for i in retrieved_ids]
    return retrieved_chunks

# Step 6: Integrate with your LLM for answer generation
user_query = "How was the profit for ticker XYZ in Q2 2023?"
relevant_chunks = retrieve(user_query, index, embeddings)
answer = generate_answer(user_query, relevant_chunks)  # Use your LLM to generate the answer
