In [48]:
import os, requests
from pathlib import Path
from dotenv import load_dotenv

import pandas as pd

import time

from typing import Union, List, Dict, Tuple, Optional
from pydantic import BaseModel

import asyncio
from tqdm import tqdm

In [49]:
class RateLimiter:
    def __init__(self, max_calls, per_seconds):
        self.calls = []
        self.max_calls = max_calls
        self.per_seconds = per_seconds

    def wait(self):
        while len(self.calls) >= self.max_calls:
            if time.time() - self.calls[0] > self.per_seconds: self.calls.pop(0)
            else: time.sleep(0.1)

    def add_call(self): self.calls.append(time.time())
    def reset(self): self.calls = []

In [2]:
# load api key
load_dotenv();
API_KEY = os.environ.get('FMP_KEY')

In [3]:
FORMATS = ["csv", "json", "parquet", "feather", "pickle"]

In [4]:
# UTILS
def create_dir(fpath: str) -> None:
    if not os.path.exists(fpath): os.makedirs(fpath)

In [5]:
class Downloader:
    def __init__(self, root_path: str = "data", override: bool = False):
        self.root_path = root_path
        self.override = override
        self.meta_name = None

    def _request(self, ticker: str) -> requests.Response:
        url = self.base_url.format(ticker, self.api_key)
        return requests.get(url)

    def _save(self, data, ticker: str, format: str = 'csv') -> None:
        if format not in FORMATS: raise ValueError(f"format must be one of {FORMATS}")
        folder = f"{self.root_path}/{ticker}"
        create_dir(folder)
        fpath = f"{folder}/{ticker}_{self.base_name}{'_' + self.meta_name if self.meta_name else ''}.{format}"
        if os.path.exists(fpath) and not self.override: 
            print(f"File {fpath} already exists. Skipping...")
            return
        if not isinstance(data, pd.DataFrame): data = pd.DataFrame(data)
        if format == "csv": data.to_csv(fpath, index=False)
        elif format == "json": data.to_json(fpath, orient="records")
        elif format == "parquet": data.to_parquet(fpath, index=False)
        elif format == "feather": data.to_feather(fpath)
        elif format == "pickle": data.to_pickle(fpath)
        del data # free up memory

    def _download(self, ticker: str) -> Union[requests.Response.json, None]:
        try:
            response = self._request(ticker)
            if 200 != response.status_code: self.failures.append(ticker); return
            data = response.json()
            if not data: self.failures.append(ticker); return
            return data
        except Exception as e:
            self.failures.append(ticker)
            return

    def get(self, ticker, format: str = "csv", **kwargs) -> None:
        data = self._download(ticker, **kwargs)
        if not data: return
        self._save(data, ticker=ticker, format=format)

class Financials(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "financials_metrics"
        self.base_url = "https://financialmodelingprep.com/api/v3/key-metrics/{}?period={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)
    
    def _request(self, ticker: str, period: str = 'quarter') -> requests.Response:
        url = self.base_url.format(ticker, period, self.api_key)
        return requests.get(url)
    
    def _download(self, ticker, period: str = 'quarter') -> Union[requests.Response.json, None]:
        self.meta_name = period # for saving - not needed for all downloads
        try:
            response = self._request(ticker, period)
            if 200 != response.status_code: self.failures.append(ticker); return
            data = response.json()
            if not data: self.failures.append(ticker); return
            return data
        except Exception as e:
            self.failures.append(ticker)
            return
        
class StockPrice(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "quote"
        self.base_url = "https://financialmodelingprep.com/api/v3/quote/{}?apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class CompanyProfile(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "company_profile"
        self.base_url = "https://financialmodelingprep.com/api/v3/profile/{}?apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class ExecutiveHistory(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "executive_history"
        self.base_url = "https://financialmodelingprep.com/api/v4/governance/executive_compensation?symbol={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class Mergers(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "mergers_acquisitions"
        self.base_url = "https://financialmodelingprep.com/api/v4/mergers-acquisitions/search?name={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class Employee(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "employee"
        self.base_url = "https://financialmodelingprep.com/api/v4/historical/employee_count?symbol={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class KeyExecutives(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "key_executives"
        self.base_url = "https://financialmodelingprep.com/api/v3/key-executives/{}?apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class IncomeStatement(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "income_statement"
        self.base_url = "https://financialmodelingprep.com/api/v3/income-statement/{}?period={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

    def _request(self, ticker: str, period: str = 'quarter') -> requests.Response:
        url = self.base_url.format(ticker, period, self.api_key)
        return requests.get(url)
    
    def _download(self, ticker, period: str = 'quarter') -> Union[requests.Response.json, None]:
        self.meta_name = period # for saving - not needed for all downloads
        try:
            response = self._request(ticker, period)
            if 200 != response.status_code: self.failures.append(ticker); return
            data = response.json()
            if not data: self.failures.append(ticker); return
            return data
        except Exception as e:
            self.failures.append(ticker)
            return

class BalanceSheets(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "balance_sheets"
        self.base_url = "https://financialmodelingprep.com/api/v3/balance-sheet-statement/{}?period={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

    def _request(self, ticker: str, period: str = 'quarter') -> requests.Response:
        url = self.base_url.format(ticker, period, self.api_key)
        return requests.get(url)
    
    def _download(self, ticker, period: str = 'quarter') -> Union[requests.Response.json, None]:
        self.meta_name = period # for saving - not needed for all downloads
        try:
            response = self._request(ticker, period)
            if 200 != response.status_code: self.failures.append(ticker); return
            data = response.json()
            if not data: self.failures.append(ticker); return
            return data
        except Exception as e:
            self.failures.append(ticker)
            return
        
class CashFlow(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "cash_flow"
        self.base_url = "https://financialmodelingprep.com/api/v3/cash-flow-statement/{}?period={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

    def _request(self, ticker: str, period: str = 'quarter') -> requests.Response:
        url = self.base_url.format(ticker, period, self.api_key)
        return requests.get(url)
    
    def _download(self, ticker, period: str = 'quarter') -> Union[requests.Response.json, None]:
        self.meta_name = period # for saving - not needed for all downloads
        try:
            response = self._request(ticker, period)
            if 200 != response.status_code: self.failures.append(ticker); return
            data = response.json()
            if not data: self.failures.append(ticker); return
            return data
        except Exception as e:
            self.failures.append(ticker)
            return
        
class EPS(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "eps"
        self.base_url = "https://financialmodelingprep.com/api/v3/earnings/{}?apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)

class SenateTrading(Downloader):
    def __init__(self, api_key: str, root_path: str | None = None, override: bool = False):
        self.api_key = api_key
        self.base_name = "senate_trading"
        self.base_url = "https://financialmodelingprep.com/api/v4/senate-trading?symbol={}&apikey={}"
        self.failures = []
        super().__init__(root_path=root_path, override=override)


In [50]:
from concurrent.futures import ThreadPoolExecutor, as_completed

class DownloadManager:
    def __init__(self, downloaders, rate_limit=700, threads=5):
        self.downloaders = downloaders
        self.rate_limiter = RateLimiter(max_calls=rate_limit, per_seconds=60)
        self.threads = threads

    def _process_ticker(self, ticker: str, format: str):
        for downloader in self.downloaders:
            self.rate_limiter.wait()
            downloader.get(ticker=ticker, format=format)
            self.rate_limiter.add_call()

    def download(self, tickers: Union[str, List[str]], format: str = "csv") -> None:
        with ThreadPoolExecutor(max_workers=self.threads) as executor, tqdm(total=len(tickers)) as progress:
            futures = {executor.submit(self._process_ticker, ticker, format): ticker for ticker in tickers}
            for future in as_completed(futures):
                ticker = futures[future]
                future.result()  # Handle exceptions or get return value if necessary
                progress.update(1)
                progress.set_description(f"Processing {ticker}")

    def from_text(self, fpath: str, format: str = "csv") -> None:
        with open(fpath, "r") as f:
            tickers = f.read().replace(",", " ").split()
            self.dowload(tickers, format)


In [51]:
# Usage
downloaders = [
    Financials(api_key=API_KEY, root_path="data/fundatmentals"),
    StockPrice(api_key=API_KEY, root_path="data/fundatmentals"),
    CompanyProfile(api_key=API_KEY, root_path="data/fundatmentals"),
    ExecutiveHistory(api_key=API_KEY, root_path="data/fundatmentals"),
    Mergers(api_key=API_KEY, root_path="data/fundatmentals"),
    Employee(api_key=API_KEY, root_path="data/fundatmentals"),
    KeyExecutives(api_key=API_KEY, root_path="data/fundatmentals"),
    IncomeStatement(api_key=API_KEY, root_path="data/fundatmentals"),
    BalanceSheets(api_key=API_KEY, root_path="data/fundatmentals"),
    CashFlow(api_key=API_KEY, root_path="data/fundatmentals"),
    SenateTrading(api_key=API_KEY, root_path="data/fundatmentals"),
]

rate_limiter = RateLimiter(max_calls=750, per_seconds=60)
manager = DownloadManager(downloaders, rate_limiter)

In [None]:
tickers = DownloadManager.from_text('russel_2000.txt')
manager.download(tickers)