In [1]:
import pandas as pd
from sec_edgar_downloader import Downloader
import time
import glob
import requests
from pathlib import Path
import re
import html2text
import codecs
from sec_edgar_downloader._utils import (
    download_filings,
    get_filing_urls_to_download,
    get_number_of_unique_filings,
    is_cik,
    validate_date_format,
    FilingMetadata,
     generate_random_user_agent
)

In [10]:
def download_and_save_filing(   
    download_folder: Path,
    text_maker: html2text.HTML2Text,
    ticker_or_cik: str,
    accession_number: str,
    # filing_type: str,
    download_url: str,
    file_name: str,
):
    success = False
    
    client = requests.Session()
    headers = {
        "User-Agent": generate_random_user_agent(),
        "Accept-Encoding": "gzip, deflate",
        "Host": "www.sec.gov",
    }
    resp = client.get(download_url, headers=headers)
    resp.raise_for_status()
    filing_text = resp.content

    # Create all parent directories as needed and write content to file  not needed
#     save_path = (
#         download_folder
#         / ticker_or_cik
#         / filing_type
#         / accession_number
#         / file_name
#     )
#     save_path.parent.mkdir(parents=True, exist_ok=True)
#     save_path.write_bytes(filing_text)
    
    # get html

    #search_html = re.search('<FILENAME>(.+)\s?<DESCRIPTION>', resp.text)
    search_html = re.search('<FILENAME>(.*[.]html?)', resp.text)
    if search_html:
        html_url = search_html.group(1).strip()
        #print(html_url)
        download_url = download_url.replace(file_name, html_url)
        #print(download_url)
    
        headers = {
        "User-Agent": generate_random_user_agent(),
        "Accept-Encoding": "gzip, deflate",
        "Host": "www.sec.gov",
        }
        resp = client.get(download_url, headers=headers)
        resp.raise_for_status()
        #print(resp.status_code)
        filing_html = resp.content
        filing_text = text_maker.handle(filing_html.decode('utf-8'))
    

        # Create all parent directories as needed and write content to file. We need the text from html
        save_path = (
            download_folder
            / ticker_or_cik
            / file_name
        )
        save_path.parent.mkdir(parents=True, exist_ok=True)
        save_path.write_text(filing_text)
        
        success = True
        
    else:
        print("html not found")
        
    return success


In [11]:
filing = pd.read_csv("edgar_10K_filing_2009_2021.csv")
start_year = 2020
end_year = 2023
start = time.time()
cnt = 0

# dl = Downloader("10K")
# complete_ciks = [int(f.split("/")[-1]) for f in glob.glob("10K/sec-edgar-filings/*")]
# print(f"complete ciks: {len(complete_ciks)}")

#ciks = set(ciks) - set(complete_ciks)

cnt = 0

batch = 0
complete_rows = []
start = time.time()

# reports to be downloaded
to_download = filing[(~filing.sic.isin([6021, 6022])) & (filing.fy>=start_year) & (filing.fy<end_year) & (filing.form == '10-K')].iloc[:1]

# if restart, change this index
start_idx = 0  

text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.bypass_tables = False

for idx, row in to_download.loc[start_idx:].iterrows():

    download_folder = Path("10-K").expanduser().resolve()
    ticker_or_cik = str(row['cik'])
    accession_number = row["adsh"]
    # filing_type = "10-K"

    download_url= "https://www.sec.gov/Archives/edgar/data/" \
                  + ticker_or_cik \
                  + "/" + accession_number.replace("-","") \
                  + "/" + accession_number \
                  + ".txt"
    #print(download_url)
    file_name  = accession_number + ".txt"

    success = download_and_save_filing(   
                                download_folder,
                                text_maker,
                                ticker_or_cik,
                                accession_number,
                                # filing_type,
                                download_url,
                                file_name)

    complete_rows.append((idx, success))

    if not success:
        print(f"failed: {idx}, {ticker_or_cik}, {download_url}")

    cnt += 1

    batch += 1
    if batch == 5:

        batch == 0

        time.sleep(1)

    if cnt%100==0:

        print(f"completed {cnt} reports in {(time.time()-start):.2f}")

        pd.DataFrame(complete_rows, columns=["rid","status"]).to_csv("task_status.csv", header=True, index=False)
            
pd.DataFrame(complete_rows, columns=["rid","status"]).to_csv("task_status.csv", header=True, index=False)