In [None]:
import json as _hex_json

In [None]:
hex_scheduled = _hex_json.loads("false")

In [None]:
hex_user_email = _hex_json.loads('"example-user@example.com"')

In [None]:
hex_user_attributes = _hex_json.loads("{}")

In [None]:
hex_run_context = _hex_json.loads('"logic"')

In [None]:
hex_timezone = _hex_json.loads('"UTC"')

In [None]:
hex_project_id = _hex_json.loads('"01993942-1389-7000-8cd5-5d74ea1c200b"')

In [None]:
hex_project_name = _hex_json.loads('"SEC EDGAR Data Loader"')

In [None]:
hex_status = _hex_json.loads('""')

In [None]:
hex_categories = _hex_json.loads("[]")

In [None]:
hex_color_palette = _hex_json.loads(
    '["#4C78A8","#F58518","#E45756","#72B7B2","#54A24B","#EECA3B","#B279A2","#FF9DA6","#9D755D","#BAB0AC"]'
)

In [None]:
!pip install litellm>=1.77.3 numpy>=2.3.3 pandas>=2.3.2 python-dotenv>=1.1.1 requests>=2.32.5

[1mTry `uv pip` instead for faster package installs:
[32m!uv pip install litellm numpy pandas python-dotenv requests
[0mLearn more: https://learn.hex.tech/docs/explore-data/projects/environment-configuration/using-packages


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
import ast
import json
import logging
import os
import time
from datetime import datetime
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
import requests
from dotenv import load_dotenv
from litellm import embedding
from tqdm import tqdm

load_dotenv()

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

[92m16:25:14 - LiteLLM:DEBUG[0m: http_handler.py:580 - Using AiohttpTransport...
[92m16:25:14 - LiteLLM:DEBUG[0m: http_handler.py:637 - Creating AiohttpTransport...
[92m16:25:15 - LiteLLM:DEBUG[0m: litellm_logging.py:180 - [Non-Blocking] Unable to import GenericAPILogger - LiteLLM Enterprise Feature - No module named 'litellm_enterprise'
[92m16:25:15 - LiteLLM:DEBUG[0m: http_handler.py:580 - Using AiohttpTransport...
[92m16:25:15 - LiteLLM:DEBUG[0m: http_handler.py:637 - Creating AiohttpTransport...
[92m16:25:15 - LiteLLM:DEBUG[0m: http_handler.py:580 - Using AiohttpTransport...
[92m16:25:15 - LiteLLM:DEBUG[0m: http_handler.py:637 - Creating AiohttpTransport...
[92m16:25:16 - LiteLLM:DEBUG[0m: http_handler.py:580 - Using AiohttpTransport...
[92m16:25:16 - LiteLLM:DEBUG[0m: http_handler.py:637 - Creating AiohttpTransport...


In [None]:
class CacheService:
    """Service for managing cache of processed company data"""

    def __init__(self, cache_file: str = "cache.csv"):
        self.cache_file = cache_file
        self.cache = {}
        self._load_cache()

    def _load_cache(self):
        if os.path.exists(self.cache_file):
            try:
                df = pd.read_csv(self.cache_file, dtype={"cik": str, "padded_cik": str, "sic": str})

                if "embedded_description" in df.columns:
                    df["embedded_description"] = df["embedded_description"].apply(
                        lambda x: ast.literal_eval(x) if pd.notna(x) and x != "" else None
                    )

                for _, row in df.iterrows():
                    self.cache[str(row["cik"])] = row.to_dict()

                logger.info(f"Loaded cache with {len(self.cache)} entries")
            except Exception as e:
                logger.error(f"Error loading cache from CSV: {e}")
                self.cache = {}
        else:
            self.cache = {}
            logger.info("Starting with empty cache")

    def get(self, cik: str) -> Optional[Dict[str, Any]]:
        """Get cached data for a CIK

        Args:
            cik: Company CIK

        Returns:
            Cached company data or None if not found/invalid
        """
        cik_key = str(cik)
        if cik_key in self.cache:
            cached_data = self.cache[cik_key]
            if isinstance(cached_data, dict) and "cik" in cached_data:
                # Clean up NaN values from pandas
                cleaned_data = {}
                for k, v in cached_data.items():
                    if isinstance(v, (list, np.ndarray)):
                        cleaned_data[k] = v
                    elif pd.isna(v):
                        cleaned_data[k] = None
                    else:
                        cleaned_data[k] = v
                return cleaned_data
        return None

    def set(self, cik: str, data: Dict[str, Any]):
        """Set cached data for a CIK

        Args:
            cik: Company CIK
            data: Company data to cache
        """
        cik_key = str(cik)
        self.cache[cik_key] = data
        self.save()

    def save(self):
        """Save the cache to CSV file"""
        try:
            os.makedirs(os.path.dirname(self.cache_file), exist_ok=True)

            if self.cache:
                df = pd.DataFrame.from_dict(self.cache, orient="index")

                if "embedded_description" in df.columns:
                    df["embedded_description"] = df["embedded_description"].apply(
                        lambda x: str(x) if x is not None else None
                    )

                # Save to CSV
                temp_file = self.cache_file + ".tmp"
                df.to_csv(temp_file, index=False)
                os.replace(temp_file, self.cache_file)

                logger.debug(f"Cache saved with {len(self.cache)} entries to {self.cache_file}")
            else:
                pd.DataFrame().to_csv(self.cache_file, index=False)

        except Exception as e:
            logger.error(f"Failed to save cache to {self.cache_file}: {e}")

In [None]:
class EDGARExtractor:
    def __init__(self):
        self.headers = {
            "User-Agent": "Company Similarity Analysis contact@research.edu",
            "Accept-Encoding": "gzip, deflate",
            "Host": "data.sec.gov",
        }
        self.base_url = "https://data.sec.gov/api/xbrl"
        self.rate_limit = 0.1  # 10 requests per second
        self.session = requests.Session()
        self.session.headers.update(self.headers)

    def get_total_revenue(self, company_facts: dict) -> float:
        """Get total revenue from company facts"""
        possible_tags = [
            "SalesRevenueNet",
            "Revenues",
            "RevenueFromContractWithCustomerExcludingAssessedTax",
            "TotalRevenues",
        ]

        if not company_facts or "facts" not in company_facts:
            raise Exception("Invalid company facts")

        us_gaap = company_facts["facts"]["us-gaap"]

        annual_revenue = None
        for tag in possible_tags:
            if tag in us_gaap:
                historical_revenue = us_gaap.get(tag, {}).get("units", {}).get("USD", [])
                if historical_revenue:
                    annual_data = [data for data in historical_revenue if data.get("form") == "10-K"]
                    if annual_data:
                        annual_revenue = annual_data[-1]["val"]
                        break

        if not annual_revenue:
            raise Exception("No annual revenue found")

        return annual_revenue

    def get_company_sic(self, company_submissions: dict) -> str:
        """Get company SIC from company submissions"""
        if not company_submissions or "sic" not in company_submissions:
            raise Exception("Invalid company submissions")

        return company_submissions["sic"]

In [None]:
class EDGARService:
    def __init__(self):
        self.headers = {
            "User-Agent": "Company Similarity Analysis contact@research.edu",
            "Accept-Encoding": "gzip, deflate",
            "Host": "data.sec.gov",
        }
        self.base_url = "https://data.sec.gov/api/xbrl"
        self.rate_limit = 0.1  # 10 requests per second
        self.session = requests.Session()
        self.session.headers.update(self.headers)

    def get_company_facts(self, cik: str) -> Dict:
        """Get company facts from EDGAR API"""
        padded_cik = str(cik).zfill(10)
        url = f"{self.base_url}/companyfacts/CIK{padded_cik}.json"

        try:
            time.sleep(self.rate_limit)
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            raise Exception(f"Failed to fetch CIK {cik}: {e}") from e

    def get_company_submissions(self, cik: str) -> str:
        """Get company description from EDGAR API"""
        padded_cik = str(cik).zfill(10)
        url = f"https://data.sec.gov/submissions/CIK{padded_cik}.json"

        try:
            time.sleep(self.rate_limit)
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            raise Exception(f"Failed to fetch CIK {cik}: {e}") from e

In [None]:
class ErrorService:
    """Service for managing error tracking in CSV format"""

    def __init__(self, error_file: str = "errors.csv"):
        self.error_file = error_file
        self.errors = []
        self._load_errors()

    def _load_errors(self):
        """Load existing errors from CSV if file exists"""
        if os.path.exists(self.error_file):
            try:
                df = pd.read_csv(self.error_file, dtype={"cik": str})
                self.errors = df.to_dict("records")
            except Exception:
                self.errors = []
        else:
            self.errors = []
            self._save_to_csv()

    def add_error(self, company_name: str, cik: str, error_type: str, error_message: str = None):
        """Add an error and immediately save to CSV

        Args:
            company_name: Name of the company
            cik: Company CIK
            error_type: Type of error (e.g., 'revenue_extraction', 'fmp_info_extraction')
            error_message: Optional detailed error message
        """
        error_entry = {
            "timestamp": datetime.now().isoformat(),
            "company_name": company_name,
            "cik": str(cik),
            "error_type": error_type,
            "error_message": error_message or "",
        }

        self.errors.append(error_entry)
        self._save_to_csv()

    def _save_to_csv(self):
        """Save all errors to CSV"""
        try:
            if self.errors:
                df = pd.DataFrame(self.errors)
            else:
                df = pd.DataFrame(columns=["timestamp", "company_name", "cik", "error_type", "error_message"])

            temp_file = self.error_file + ".tmp"
            df.to_csv(temp_file, index=False)
            os.replace(temp_file, self.error_file)
        except Exception:
            pass

    def remove_errors_by_cik(self, cik: str):
        """Remove all errors for a specific CIK and save to CSV

        Args:
            cik: Company CIK to remove errors for
        """
        try:
            cik_str = str(cik)
            original_count = len(self.errors)
            self.errors = [e for e in self.errors if str(e.get("cik", "")) != cik_str]

            if len(self.errors) < original_count:
                self._save_to_csv()
        except Exception:
            pass

    def get_error_count(self) -> int:
        """Get the total number of errors"""
        return len(self.errors)

    def get_errors_by_type(self, error_type: str) -> List[dict]:
        """Get all errors of a specific type"""
        return [e for e in self.errors if e["error_type"] == error_type]

In [None]:
class FMPService:
    """Financial Modeling Prep API Service"""

    def __init__(self, api_key: Optional[str] = None):
        """Initialize FMP Service with API key

        Args:
            api_key: FMP API key. If not provided, will look for FINANCIAL_MODELLING_GROUP_API_KEY env var
        """
        self.api_key = api_key or os.getenv("FINANCIAL_MODELLING_GROUP_API_KEY")
        if not self.api_key:
            raise ValueError("FMP API key not provided and FINANCIAL_MODELLING_GROUP_API_KEY not found in environment")

        self.base_url = "https://financialmodelingprep.com"
        self.rate_limit = 0.2

    def get_company_info(self, cik: str) -> Dict:
        """Get company information from FMP API using CIK

        Args:
            cik: Company CIK (can be with or without leading zeros)

        Returns:
            dict: Company info with description, marketCap, and fullTimeEmployees
        """
        cik_no_zeros = str(int(cik))
        url = f"{self.base_url}/stable/profile-cik?cik={cik_no_zeros}&apikey={self.api_key}"

        try:
            time.sleep(self.rate_limit)

            response = requests.get(url, timeout=30)
            response.raise_for_status()
            data = response.json()

            if data and len(data) > 0:
                company_profile = data[0]
                return {
                    "description": company_profile.get("description", None),
                    "market_cap": company_profile.get("marketCap", None),
                    "full_time_employees": company_profile.get("fullTimeEmployees", None),
                }
            else:
                raise Exception(f"No data returned from FMP API for CIK {cik_no_zeros}")

        except requests.RequestException as e:
            raise Exception(f"Failed to fetch FMP profile: {e}") from e
        except Exception as e:
            raise Exception(f"Error processing FMP response: {e}") from e

In [None]:
class NormalizeRevenue:
    def __init__(self, cik_to_revenue: dict[int, float], number_of_buckets: int = 10):
        self.number_of_buckets = number_of_buckets
        self.cik_to_revenue = cik_to_revenue
        self.revenues = self._get_revenues()
        self.buckets = self._create_buckets()
        for cik, revenue in self.cik_to_revenue.items():
            idx = self._get_bucket_index(revenue)
            self.buckets[idx].append(cik)

    def normalize_all(self) -> list[int]:
        return [self.normalize(cik) for cik in self.cik_to_revenue.keys()]

    def normalize(self, cik: int) -> int:
        return self._get_bucket_index(self.cik_to_revenue[cik])

    def _create_buckets(self) -> list[list[int]]:
        return [[] for _ in range(self.number_of_buckets)]

    def _get_revenues(self) -> list[float]:
        return list(self.cik_to_revenue.values())

    def _get_min_revenue(self) -> float:
        return min(self.revenues)

    def _get_max_revenue(self) -> float:
        return max(self.revenues)

    def _get_bucket_size(self) -> float:
        return (
            (self._get_max_revenue() - self._get_min_revenue()) / self.number_of_buckets
            if self.number_of_buckets
            else 0.0
        )

    def _get_bucket_index(self, revenue: float) -> int:
        size = self._get_bucket_size()
        if size == 0:
            return 0
        if revenue >= self._get_max_revenue():
            return self.number_of_buckets - 1
        idx = int((revenue - self._get_min_revenue()) / size)
        if idx < 0:
            idx = 0
        if idx >= self.number_of_buckets:
            idx = self.number_of_buckets - 1
        return idx

    def _get_bucket_range(self, bucket_index: int) -> tuple[float, float]:
        size = self._get_bucket_size()
        low = self._get_min_revenue() + bucket_index * size
        high = (
            self._get_min_revenue() + (bucket_index + 1) * size
            if bucket_index < self.number_of_buckets - 1
            else self._get_max_revenue()
        )
        return low, high

In [None]:
def save_buckets_json(normalized_revenue, filepath: str = "buckets.json"):
    """Save revenue buckets to JSON file

    Args:
        normalized_revenue: NormalizeRevenue instance with bucket data
        filepath: Path to save the JSON file

    Returns:
        Number of buckets saved
    """
    buckets_data = []
    for bucket_idx in range(normalized_revenue.number_of_buckets):
        start, end = normalized_revenue._get_bucket_range(bucket_idx)
        cik_values = [cik for cik in normalized_revenue.buckets[bucket_idx]]

        buckets_data.append({"bucket": bucket_idx, "range": [start, end], "cik_values": cik_values})

    with open(filepath, "w") as f:
        json.dump(buckets_data, f, indent=2)

    return len(buckets_data)


def create_cik_to_revenue_dict(companies_data: List[Dict[str, Any]]) -> Dict[int, float]:
    """Create a dictionary mapping CIK to revenue from companies data

    Args:
        companies_data: List of company dictionaries

    Returns:
        Dictionary mapping CIK to revenue
    """
    cik_to_revenue = {}
    for company in companies_data:
        if company["total_revenue"] is not None:
            cik_to_revenue[company["cik"]] = company["total_revenue"]
    return cik_to_revenue

In [None]:
edgar_service = EDGARService()
fmp_service = FMPService()
extractor = EDGARExtractor()
cache_service = CacheService()
error_service = ErrorService()

In [None]:
company_cik = pd.read_csv("companyfacts_unique.csv")
company_cik.head(10)

Unnamed: 0,cik,entityName
0,1750,AAR CORP
1,1800,ABBOTT LABORATORIES
2,1961,WORLDS INC.
3,2034,ACETO CORP
4,2098,ACME UNITED CORP
5,2178,"ADAMS RESOURCES & ENERGY, INC."
6,2186,BK TECHNOLOGIES CORPORATION
7,2488,"ADVANCED MICRO DEVICES, INC"
8,2491,"BALLY TECHNOLOGIES, INC."
9,2969,"AIR PRODUCTS AND CHEMICALS, INC."


In [None]:
companies_data = []


for _index, (cik, company_name) in tqdm(
    enumerate(zip(company_cik["cik"], company_cik["entityName"], strict=False)),
    total=len(company_cik),
    desc="Processing companies"
):
    padded_cik = str(cik).zfill(10)

    cached_data = cache_service.get(cik)
    if cached_data:
        companies_data.append(cached_data)
        continue

    company_data = {
        "cik": cik,
        "padded_cik": padded_cik,
        "company_name": company_name,
        "description": None,
        "embedded_description": None,
        "total_revenue": None,
        "sic": None,
        "market_cap": None,
        "full_time_employees": None,
    }

    try:
        company_facts = edgar_service.get_company_facts(cik)
        company_submissions = edgar_service.get_company_submissions(cik)
    except Exception as e:
        logger.error(f"Error fetching data for {company_name}: {e}")
        error_service.add_error(company_name, cik, "data_fetch", str(e))
        continue

    try:
        total_revenue = extractor.get_total_revenue(company_facts)
        company_data["total_revenue"] = total_revenue
    except Exception as e:
        logger.error(f"Error getting total revenue for {company_name}: {e}")
        error_service.add_error(company_name, cik, "revenue_extraction", str(e))
        continue

    try:
        sic = extractor.get_company_sic(company_submissions)
        company_data["sic"] = sic
    except Exception as e:
        logger.error(f"Error getting SIC for {company_name}: {e}")
        error_service.add_error(company_name, cik, "sic_extraction", str(e))
        continue

    try:
        fmp_info = fmp_service.get_company_info(cik)
        company_data["description"] = fmp_info["description"]
        company_data["market_cap"] = fmp_info["market_cap"]
        company_data["full_time_employees"] = fmp_info["full_time_employees"]
    except Exception as e:
        logger.error(f"Error getting FMP info for {company_name}: {e}")
        error_service.add_error(company_name, cik, "fmp_info_extraction", str(e))
        continue

    try:
        company_data["embedded_description"] = embedding(
            model="openai/text-embedding-3-small", input=company_data["description"]
        )["data"][0]["embedding"]
    except Exception as e:
        logger.error(f"Error embedding description for {company_name}: {e}")
        error_service.add_error(company_name, cik, "embedding_description", str(e))
        continue

    log_data = {k: v for k, v in company_data.items() if k != "embedded_description"}
    logger.debug(f"Company information: {json.dumps(log_data, indent=4)}")

    cache_service.set(cik, company_data)
    companies_data.append(company_data)
    error_service.remove_errors_by_cik(cik)


cik_to_revenue = create_cik_to_revenue_dict(companies_data)

if cik_to_revenue:
    normalized_revenue = NormalizeRevenue(cik_to_revenue, number_of_buckets=10)
    normalized_revenue.normalize_all()
    save_buckets_json(normalized_revenue)
else:
    pass

logger.info(f"Processing complete. Total errors: {error_service.get_error_count()}")

Processing companies:   0%|          | 34/13912 [00:11<1:58:01,  1.96it/s][92m16:26:18 - LiteLLM:DEBUG[0m: utils.py:359 - 

[92m16:26:18 - LiteLLM:DEBUG[0m: utils.py:359 - [92mRequest to litellm:[0m
[92m16:26:18 - LiteLLM:DEBUG[0m: utils.py:359 - [92mlitellm.embedding(model='openai/text-embedding-3-small', input=None)[0m
[92m16:26:18 - LiteLLM:DEBUG[0m: utils.py:359 - 

[92m16:26:18 - LiteLLM:DEBUG[0m: litellm_logging.py:476 - self.optional_params: {}
[92m16:26:18 - LiteLLM:DEBUG[0m: utils.py:359 - SYNC kwargs[caching]: False; litellm.cache: None; kwargs.get('cache')['no-cache']: False
[92m16:26:18 - LiteLLM:DEBUG[0m: litellm_logging.py:476 - self.optional_params: {}
[92m16:26:18 - LiteLLM:DEBUG[0m: litellm_logging.py:941 - [92m

POST Request Sent from LiteLLM:
curl -X POST \
https://api.openai.com/v1 \
-d '{'model': 'text-embedding-3-small', 'input': None}'
[0m

[92m16:26:19 - LiteLLM:DEBUG[0m: litellm_logging.py:1014 - RAW RESPONSE:
Error code: 400 - {'error':