In [47]:
import requests
import json
import time
import re
from bs4 import BeautifulSoup
import os
from google import genai
from google.genai import types
from google.cloud import aiplatform 
from dotenv import load_dotenv
from google.cloud import firestore
import warnings

warnings.filterwarnings(
    "ignore",
    message = """Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. See the following page for troubleshooting: https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds. 
  warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)"""
)

# get and clean sec filings

In [29]:
HEADERS = {'User-Agent': 'YourName YourCompany your.email@example.com'}

TICKER = "SBET"
CIK = "0001981535"
url = f"https://data.sec.gov/submissions/CIK{CIK}.json"
response = requests.get(url, headers = HEADERS)
response.raise_for_status() 
data = response.json()

In [30]:
def parse_sec_data(data, target_forms = ['10-K', '10-Q']):

    filing_data = data.get("filings", {}).get("recent", {})
    if not filing_data:
        raise Exception("No filing data found.")

    extracted_data = []
    num_filings = len(filing_data.get("form", []))
    if num_filings:
        for i in range(num_filings):
            form_type = filing_data.get("form", [])[i]
            if form_type in target_forms:
                try:
                    accession_number = filing_data.get('accessionNumber', [])[i]
                    primary_document = filing_data.get('primaryDocument', [])[i]
                    filing_date = filing_data.get('filingDate', [])[i]
                    report_date = filing_data.get('reportDate', [])[i]

                    filing_details = {
                        'form_type': form_type,
                        'accession_number': accession_number,
                        'primary_document': primary_document,
                        'filing_date': filing_date,
                        'report_date': report_date
                    }
                    extracted_data.append(filing_details)

                except IndexError:
                    print(f"Warning: Data inconsistency at index {i}. Skipping this filing.")
                    continue
    return extracted_data

def construct_sec_url(filing_details):

    DOMAIN = "https://www.sec.gov/Archives/edgar/data"
    target_cik = CIK.lstrip("0")
    accession_number = filing_details.get("accession_number", "").replace("-", "")
    primary_document = filing_details.get("primary_document", "")
    
    if accession_number and primary_document:
        url = f"{DOMAIN}/{target_cik}/{accession_number}/{primary_document}"
    else:
        raise Exception("Info is missing.")

    return url

def get_html_text(url):

    res = requests.get(url, headers = HEADERS)
    res.raise_for_status()
    html_text = res.text

    return html_text

def html_table_to_markdown(table_tag):
    """
    Converts a BeautifulSoup table tag into a Markdown formatted string.
    This helps preserve the structure of financial data for the LLM.
    """

    markdown_lines = []
    # process table headers
    headers = [th.get_text(strip = True).replace("\n", "") for th in table_tag.find_all("th")]

    # calculate the space between cells?
    if headers:
        markdown_lines.append("| " + " | ".join(headers) + " |")
        markdown_lines.append("| " + " | ".join(["---"] * len(headers)) + " |")

    # process table rows
    for row in table_tag.find_all("tr"):
        cells = [td.get_text(strip = True).replace("\n", "") for td in row.find_all(["td", "th"])]
        # only add rows that have content and match the header count if headers exist
        if cells and (not headers or len(cells) == len(headers)):
            markdown_lines.append("| " + " | ".join(cells) + " |")

    return "\n\n" + "\n".join(markdown_lines) + "\n\n" 

def clean_html(html_text):

    if not html_text:
        return ""

    soup = BeautifulSoup(html_text, "lxml")

    # decompose (completely remove) all script, style, and other non-content tags
    for tag in soup(['script', 'style', 'header', 'footer', 'nav']):
        tag.decompose()

    # convert tables to Markdown and replace the original table tag
    for table in soup.find_all("table"):
        markdown_text_tag = soup.new_string(html_table_to_markdown(table))
        table.replace_with(markdown_text_tag)

    text = soup.get_text(separator = "\n", strip = True)
    # remove excessive blank lines to make it more readable
    cleaned_text = re.sub(r'\n\n+', '\n\n', text)

    return cleaned_text

def chunk_filing_by_section(cleaned_text, metadata):

    pattern = r'(?i)(item\s*\d+[a-z]?\.?)'
    parts = re.split(pattern, cleaned_text)

    chunks = []

    intro_content = parts[0].strip()
    if len(intro_content.split()) > 20:
        chunks.append({
            "content": intro_content,
            "metadata": {**metadata, "section": "Introduction"}
        })

    # The rest of the list is ['Item 1.', 'Content of Item 1...', 'Item 1A.', 'Content of 1A...']
    # We iterate through them in pairs.
    for i in range(1, len(parts), 2):
        header = parts[i].strip()
        content = parts[i + 1].strip() if (i + 1) < len(parts) else ""

        chunk_content = f"{header}\n\n{content}"

        if len(content.split()) > 20:
            chunk_obj = {
                "content": chunk_content,
                "metadata": {**metadata, "section": header}
            }
            chunks.append(chunk_obj)

    return chunks



In [31]:
extracted_data = parse_sec_data(data)

# test 1 filing first
if extracted_data:
    for filing in extracted_data:
        url = construct_sec_url(filing)
        filing["url"] = url
        html_text = get_html_text(url)
        cleaned_text = clean_html(html_text)
        chunks = chunk_filing_by_section(cleaned_text, filing)
        break



# google cloud and gemini configurations

In [38]:
load_dotenv()

PROJECT_ID = os.environ["PROJECT_ID"]
REGION = "europe-west2"
GEMINI_API_KEY = os.environ["GEMINI_API_KEY"]
GEMINI_EMBEDDING_MODEL = "models/embedding-001"
COLLECTION_NAME = "sec-filing"
INDEX_NAME = os.environ["INDEX_NAME"]
INDEX_ENDPOINT_NAME = os.environ["INDEX_ENDPOINT_NAME"]

client = genai.Client(api_key = GEMINI_API_KEY)
aiplatform.init(project = PROJECT_ID, location = REGION)

"https://ai.google.dev/gemini-api/docs/text-generation"

response = client.models.generate_content(
    model = "gemini-2.5-flash",
    contents = "Explain how AI works in a few words",
)

print(response.text)

In [53]:
def store_chunk_in_firestore(chunk_id, chunk_data, project_id = PROJECT_ID, collection_name = COLLECTION_NAME):

    try:
        db = firestore.Client(project_id)
        doc_ref = db.collection(collection_name).document(chunk_id)
        doc_ref.set(chunk_data)

        print(f"Successfully stored document with ID: {chunk_id} in collection '{collection_name}'")

        return True
    except Exception as e:
        print(f"Error storing document {chunk_id}: {e}")

        return False

def create_and_deploy_vector_index(index_name, endpoint_name, chunks_for_index):
    """
    Creates a Vertex AI Vector Search Index, an Index Endpoint, and deploys the index.
    This is a one-time setup process.
    """

    # Create the index
    try:
        # Gemini embedding-001 model has 768 dimensions
        vector_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
            display_name = index_name,
            dimensions = 768,
            approximate_neighbors_count = 150,
            distance_measure_type = "DOT_PRODUCT_DISTANCE"
        )

    except Exception as e:
        if "already exists" in str(e):
            print(f"Index '{index_name}' already exists. Reusing it.")
        else:
            raise Exception(e)
    
    # Create an Index Endpoint
    try:
        index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
            display_name = endpoint_name, public_endpoint_enabled = True
        )
    except Exception as e:
        if "already exists" in str(e):
            print(f"Endpoint '{endpoint_name}' already exists. Reusing it.")
            index_endpoint = aiplatform.MatchingEngineIndexEndpoint.list(filter = f'display_name="{endpoint_name}"')[0]
        else:
            raise Exception(e)

    # Deploy the Index to the Endpoint
    try:
        # A unique ID for this deployment
        deployed_index_id = f"gemini_deployed_{int(time.time())}" 
        index_endpoint.deploy_index(
            index = vector_index, deployed_index_id = deployed_index_id
        )
    except Exception as e:
        if "has been deployed" in str(e):
            print("Index is already deployed to this endpoint.")
        else:
            raise Exception(e)

    return vector_index, index_endpoint

def generate_embeddings_and_prepare_datapoints(chunks):
    """
    Takes a list of chunk dictionaries, generates an embedding for each using Gemini,
    and formats them for uploading to Vertex AI Vector Search.
    """

    datapoints = []
    for i, chunk in enumerate(chunks):
        try:
            response = client.models.embed_content(
                model = GEMINI_EMBEDDING_MODEL,
                contents = chunk["content"],
                config = types.EmbedContentConfig(task_type = "RETRIEVAL_DOCUMENT"),
                # title = f"SEC Filing Chunk: {chunk['metadata']['section']}"
            ).embeddings
            embedding_vector = response["embedding"]

            # Create the datapoint structure required by Vertex AI
            # store unique id and embedding only
            datapoint = {
                "id": f"{TICKER}-{chunk['metadata']['form_type']}-{chunk['metadata']['accession_number']}-{i}",
                "embedding": embedding_vector
            }
            datapoints.append(datapoint)
        except Exception as e:
            print(f"Error generating embedding for chunk {i}: {e}")
            continue

    return datapoints

def upload_datapoints_to_vertex_ai(index_resource_name, datapoints):

    try:
        index = aiplatform.MatchingEngineIndex(index_name = index_resource_name)
        index.upsert_datapoints(datapoints = datapoints)
    except Exception as e:
        raise Exception(e)

In [None]:
for i, chunk in enumerate(chunks):
    id = f"{TICKER}-{chunk['metadata']['form_type']}-{chunk['metadata']['accession_number']}-{i}"
    store_chunk_in_firestore(chunk_id = id, chunk_data = chunk)

In [None]:
# one-time setup
# create_and_deploy_vector_index(INDEX_NAME, INDEX_ENDPOINT_NAME, chunks)

datapoints = generate_embeddings_and_prepare_datapoints(chunks)

# if datapoints:
#     index_resource_name = f"projects/{PROJECT_ID}/locations/{REGION}/indexes/{INDEX_NAME}" 
#     upload_datapoints_to_vertex(index_resource_name, datapoints)

Error generating embedding for chunk 0: list indices must be integers or slices, not str
Error generating embedding for chunk 1: list indices must be integers or slices, not str
Error generating embedding for chunk 2: list indices must be integers or slices, not str
Error generating embedding for chunk 3: list indices must be integers or slices, not str
Error generating embedding for chunk 4: list indices must be integers or slices, not str
Error generating embedding for chunk 5: list indices must be integers or slices, not str
Error generating embedding for chunk 6: list indices must be integers or slices, not str
Error generating embedding for chunk 7: list indices must be integers or slices, not str
Error generating embedding for chunk 8: list indices must be integers or slices, not str
Error generating embedding for chunk 9: list indices must be integers or slices, not str
Error generating embedding for chunk 10: list indices must be integers or slices, not str
Error generating emb

In [57]:
for i, chunk in enumerate(chunks):
    print(type(chunk['content']))
    break

<class 'str'>
