In [1]:
from datetime import date
import time

SEARCHAPIENDPOINT = "https://efts.sec.gov/LATEST/search-index/" 
ARCHIVESBASEURL = "https://www.sec.gov/Archives/edgar/data/"

#SEARCHAPIENDPOINT = "https://data.sec.gov/submissions/"
#ARCHIVESBASEURL = "https://www.sec.gov/Archives/edgar/daily-index/"

SLEEPTIME = 0.2
MAXRETRIES = 10
DATE_FORMAT_TOKENS = "%Y-%m-%d"
AFTER_DATE = date(2000, 1, 1)
BEFORE_DATE = date.today()

## First step
obtain the URLs of the filing we want to download

In [2]:
from collections import namedtuple
from pathlib import Path
from typing import List

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pandas as pd
from fake_useragent import UserAgent
from faker import Faker


retries = Retry(
    total=MAXRETRIES,
    backoff_factor=SLEEPTIME,
    status_forcelist=[403, 500, 502, 503, 504],
)

ROOT_SAVE_FOLDER_NAME = "test"
FILING_FULL_SUBMISSION_FILENAME = "full-submission.txt"
FILING_DETAILS_FILENAME_STEM = "filing-details"

FilingMetadata = namedtuple(
    "FilingMetadata",
    [ "cik",
      "file_date",
      "period_end",
      "accession_number",
      "full_submission_url",
      "filing_details_url",
      "filing_details_filename",
    ],
)

fake = Faker()

def form_request(
    ticker: str,
    filing_types: List[str],
    start_date: str,
    end_date: str,
    start_index: int,
    query: str,
) -> dict:
    request = {
        "dateRange": "custom",
        "startdt": start_date,
        "enddt": end_date,
        "entityName": ticker,
        "forms": filing_types,
        "from": start_index,
        "q": query,
    }
    return request

def user_agent() -> str:
    return f"{fake.first_name()} {fake.last_name()} {fake.email()}"

def filing_metadata(hit: dict) -> FilingMetadata:
    accession_number, filing_details_filename = hit["_id"].split(":", 1)
    cik = hit["_source"]["ciks"][-1]
    file_date= hit["_source"]["file_date"]
    period_ending=hit["_source"]["period_ending"]
    accession_number_no_dashes = accession_number.replace("-", "", 2)
    submission_base_url = (
        f"{ARCHIVESBASEURL}/{cik}/{accession_number_no_dashes}"
    )
    full_submission_url = f"{submission_base_url}/{accession_number}.txt"
    filing_details_url = f"{submission_base_url}/{filing_details_filename}"
    filing_details_filename_extension = Path(filing_details_filename).suffix.replace(
        "htm", "html"
    )
    filing_details_filename = (
        f"{FILING_DETAILS_FILENAME_STEM}{filing_details_filename_extension}"
    )
 
    return FilingMetadata(
        cik,
        file_date,
        period_ending,
        accession_number=accession_number,
        full_submission_url=full_submission_url,
        filing_details_url=filing_details_url,
        filing_details_filename=filing_details_filename,
    )
 
filings_to_download: List[FilingMetadata] = []
 
def get_filing_urls(    #obtain the filing urls we want to download
    filing_type: str,
    ticker: str,
    num_filings_to_download: int,
    after_date: str,
    before_date: str,
    include_amends: bool,
    query: str = "",
) -> List[FilingMetadata]:

    start_index = 0
    # create a session to connect to the API
    client = requests.Session()   
    client.mount("http://", HTTPAdapter(max_retries=retries))

    try:
            while len(filings_to_download)<num_filings_to_download:
                payload = form_request(
                    ticker,
                    [filing_type],
                    after_date,
                    before_date,
                    start_index,
                    query,
                )
                headers = {
                    "User-Agent": user_agent(),
                    "Accept-Encoding": "gzip, deflate",
                    "Host": "efts.sec.gov",
                }
                resp = client.post(  # send the request and data to the server
                SEARCHAPIENDPOINT, json=payload, headers=headers # 為了要精確的搜尋，所以要指定 1.API的網址 2.要傳送的資料 3.要傳送的標頭 
                )
                resp.raise_for_status()
                queryresults = resp.json() # return the json object of the result
                print(queryresults) # the result is stored in the "queryresults" variable

                queryhits = queryresults["hits"]["hits"]
                print(queryhits)

                if not queryhits:
                    break

                for hit in queryhits:
                    filing_type = hit["_source"]["file_type"]

                    is_amend = filing_type[-2:] == "/A"
                    if not include_amends and is_amend:
                        continue

                    if not is_amend and filing_type != filing_type:
                        continue

                metadata = filing_metadata(hit)  # the filing we want to download is stored in the "metadata" variable
                filings_to_download.append(metadata)

                if len(filings_to_download) == num_filings_to_download:
                    return filings_to_download


            query_size = queryresults["query"]["size"]
            start_index += query_size

            time.sleep(SLEEPTIME)
    finally:
            client.close()

            return filings_to_download
    

## return URLs
for all 10-K filings

In [3]:
get_filing_urls(
    "10-K",
    "AAPL",
    999999999999999999999999,
    "2012-01-01",
    "2022-01-01",
    include_amends=True,
)

[]

In [6]:
from datetime import date
import time
from pathlib import Path
from typing import List
from urllib.parse import urljoin

import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter

def resolve_relative_urls(filing_text: str, download_url: str) -> str:
     soup = BeautifulSoup(filing_text, "lxml")
     base_url = f"{download_url.rsplit('/', 1)[0]}/"

     for url in soup.find_all("a", href=True):
         if url["href"].startswith("#") or url["href"].startswith("http"):
            continue
         url["href"] = urljoin(base_url, url["href"])

     for image in soup.find_all("img", src=True):
         image["src"] = urljoin(base_url, image["src"])

     if soup.original_encoding is None:
         return soup

     return soup.encode(soup.original_encoding)

def download_filings(   # download the 10-K filings
    download_folder: Path,
    ticker: str,
    filing_type: str,
    num_filings_to_download: int,
    after_date: str,
    before_date: str,
    include_filing_details: bool,
) -> None:
    client = requests.Session()
    client.mount("http://", HTTPAdapter(max_retries=retries))
    client.mount("https://", HTTPAdapter(max_retries=retries)) # call another function to download the filing
    try:
        for filing in filings_to_download: # the URLs is stored in the "filings_to_download" list
            try:
                download(
                    client,
                    download_folder,
                    ticker,
                    filing.accession_number,
                    filing_type,
                    filing.full_submission_url,
                    FILING_FULL_SUBMISSION_FILENAME, # download TXT filings , HTML filings depending on the format of the filing
                )
            except requests.exceptions.HTTPError as e:
                print(
                    "Skipping full submission download for "
                    f"'{filing.accession_number}' due to network error: {e}."
                )

            if include_filing_details:
                try:
                    download(
                        client,
                        download_folder,
                        ticker,
                        filing.accession_number,
                        filing_type,
                        filing.filing_details_url,
                        filing.filing_details_filename,
                        resolve_urls=True,
                    )   
                except requests.exceptions.HTTPError as e:
                    print(
                        f"Skipping filing detail download for "
                        f"'{filing.accession_number}' due to network error: {e}."
                    )
    finally:
        client.close()

def download(
    client: requests.Session,
    download_folder: Path,
    ticker: str,
    accession_number: str,
    filing_type: str,
    download_url: str,
    save_filename: str,
    *,
    resolve_urls: bool = False,
) -> None:
     headers = {
         "User-Agent": user_agent(),
         "Accept-Encoding": "gzip, deflate",
         "Host": "www.sec.gov",
    }
     resp = client.get(download_url, headers=headers) # get the response and the status from the URLs
     resp.raise_for_status()
     filing_text = resp.content  # return the content of the response in bytes

     if resolve_urls and Path(save_filename).suffix == ".html":
       filing_text = resolve_relative_urls(filing_text, download_url)

     save_path = (
        download_folder
        / ROOT_SAVE_FOLDER_NAME
        / ticker
        / filing_type
        / accession_number
        / save_filename
    )
     save_path.parent.mkdir(parents=True, exist_ok=True)
     save_path.write_bytes(filing_text)

     time.sleep(SLEEPTIME)

cwd = Path.cwd()
downloader=cwd/"/Users/andrewhsu/Documents/fintech_10_K"
download_filings(
    downloader,
    "AAPL",
    "10-K",
    10,
    "2012-01-01",
    "2023-01-01",
    include_filing_details="TRUE")