In [1]:
%conda install --yes -c defaults -c conda-forge --update-all python=3.8 aiohttp aiodns bs4 cchardet Faker lxml openpyxl pandas PyPDF2 python-dateutil regex tenacity
%conda info

Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.

     active environment : UoA-ABS-RA
    active env location : C:\Users\laitingsheng\miniconda3\envs\UoA-ABS-RA
            shell level : 2
       user config file : C:\Users\laitingsheng\.condarc
 populated config files : C:\Users\laitingsheng\.condarc
          conda version : 4.10.3
    conda-build version : not installed
         python version : 3.8.10.final.0
       virtual packages : __cuda=11.4=0
                          __win=0=0
                          __archspec=1=x86_64
       base environment : C:\Users\laitingsheng\miniconda3  (writable)
      conda av data dir : C:\Users\laitingsheng\miniconda3\etc\conda
  conda av metadata url : None
           channel URLs : https://repo.anaconda.com/pkgs/main/win-64
                          https://repo.anaco

# SEC Constants

This section has been commented out since it doesn't need to be run frequently.

In [1]:
import json
import re

from aiohttp import ClientSession
headers = {"User-Agent":"a1835057@student.adelaide.edu.au",'Accept-Encoding':"text","Host":"www.sec.gov"}

async with ClientSession(raise_for_status=True,headers = headers) as c:
    async with c.get("https://www.sec.gov/edgar/search/js/edgar_full_text_search.js") as res:
        _script = await res.text()

    with open("constants.py", "w", encoding="utf-8") as f:
        f.write("_FORMS = ")
        json.dump({
            form.pop("form"): form
            for form in eval(re.search(
                R"^const forms = (\[\r?\n(?: {4}\{.*?\},*\r?\n)*(?: {4}\{.*?\})\r?\n\])\.sort",
                _script,
                re.MULTILINE
            )[1])
        }, f, indent=4)

        f.write('\n')

        f.write("_LOCATIONS = ")
        json.dump(dict(eval(re.search(
            R"^const locationsArray = (\[\r?\n(?: {4}\[.*?\],\r?\n)*(?: {4}\[.*?\])\r?\n\]);",
            _script,
            re.MULTILINE
        )[1])), f, indent=4)

        f.write('\n')

## Common Functions

The `chop_periods` function may be replaced by pd.interval_range once the functionality is enhanced in the future.

In [19]:
from datetime import date
from typing import Any, Callable, Dict, Generator, Optional, Tuple

from dateutil.relativedelta import relativedelta


def chop_periods(
    start_date: date,
    end_date: date,
    interval: Optional[Dict[str, Any]],
    _format: Callable[[date], str]
) -> Generator[Tuple[str, str], None, None]:
    if interval is None:
        yield _format(start_date), _format(end_date)
        return

    delta = relativedelta(**interval)
    # the end date of each period must have an offset of -1 day since the RESTful API is inclusive in both sides
    offset = relativedelta(days=1)
    next_date = start_date
    while (next_date := (curr_date := next_date) + delta) < end_date:
        yield _format(curr_date), _format(next_date - offset)
    yield _format(curr_date), _format(end_date)

## Primary Section

In [70]:
import asyncio
import json
import logging
from datetime import date
from itertools import zip_longest
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple, Union

import pandas as pd
from aiohttp import ClientSession
from faker import Faker
import regex
from tenacity import retry

from constants import _FORMS, _LOCATIONS
import parsers

import logging
from io import BytesIO


_CC_REGEX = regex.compile(R"[\p{Cc}\p{Cf}]+", regex.V1)

# parse the name and CIK from the result (same as the official JavaScript)
_DISPLAY_NAME_REGEX = regex.compile(R"(.*) \(CIK (\d{10})\)", regex.V1)

_FAKER = Faker()

_FORMAT_SPEC = {
    "csv": {
        "suffix": "csv",
        "function_suffix": "csv",
        "extra_args": {
            "encoding": "utf-8"
        }
    },
    "excel": {
        "suffix": "xlsx",
        "function_suffix": "excel",
        "extra_args": {
            "sheet_name": "output"
        }
    }
}

_LOGGER = logging.getLogger(__name__)

_RETRY_SC = {403, 500, 502, 503, 504}

# Replace by a single space in text
_WHITESPACE_REGEX = regex.compile(R"\s+", regex.V1)


def _concat_to_url(cik: str, adsh: str, filename: str) -> str:
    return f"https://www.sec.gov/Archives/edgar/data/{cik}/{adsh}/{filename}"


def _decode(b: bytes, e: str) -> str:
    return b.decode(e)


@retry
async def _download(client: ClientSession, semaphore: asyncio.Semaphore, url: str) -> Tuple[bytes, str]:
    async with semaphore, client.get(url) as res:
        await asyncio.sleep(1)
        if res.ok:
            return await res.read(), res.get_encoding()
        if res.status in _RETRY_SC:
            res.raise_for_status()
    _LOGGER.warning(f"{url} file will be skipped: ({res.status}) {res.reason}")
    return b'', "ascii"


@retry
async def _fetch(
    client: ClientSession,
    semaphore: asyncio.Semaphore,
    fixed_query: Dict[str, Any],
    ciks: Optional[List[str]],
    start_date: str,
    end_date: str
) -> Tuple[bytes, str]:
    async with semaphore, client.post("https://efts.sec.gov/LATEST/search-index", json={
        **fixed_query,
        "startdt": start_date,
        "enddt": end_date,
        "ciks": ciks
    }) as res:
    # async with semaphore, client.request(method='get',url="https://efts.sec.gov/LATEST/search-index",params ={
    #     **fixed_query,
    #     "startdt": start_date,
    #     "enddt": end_date,
    #     "ciks": ciks
    # }) as res:
    
        await asyncio.sleep(1)
        if res.ok:
            return await res.read(), res.get_encoding()
        if res.status in _RETRY_SC:
            res.raise_for_status()
        _LOGGER.warning(f"{ciks}-{start_date}-{end_date} query will be skipped: ({res.status}) {res.reason}")
        return b'', "ascii"


def _iso(d: date):
    return d.isoformat()


def _parse_display_name(s: str, cik: str):
    if s is not None and (m := _DISPLAY_NAME_REGEX.fullmatch(s)):
        if (scik := m[2]) != cik:
            _LOGGER.warning(f"mismatched CIK: {scik} (parsed from \"{s}\") v.s. {cik}")
        return m[1], scik
    return s, cik


def _parse_hit(hit: Dict[str, Any]):
    if not hit:
        return " "," "
    _id = hit["_id"]
    source = hit["_source"]

    adsh, filename = _id.split(':')
    filename_main, filename_ext = filename.rsplit('.', 1)
    xsl = source["xsl"]
    if xsl and filename_ext.lower() == "xml":
        filename_main = f"{xsl}/{filename_main}"
    filename = f"{filename_main}.{filename_ext}"

    file_nums = source["file_num"]
    film_nums = source["film_num"]
    rows = pd.DataFrame((
        [_id, *_parse_display_name(display_name, cik), loc, _LOCATIONS.get(code, code), file_num, film_num]
        for display_name, cik, loc, code, file_num, film_num in zip_longest(
            source["display_names"],
            source["ciks"],
            source["biz_locations"],
            source["inc_states"],
            file_nums if isinstance(file_nums, list) else [file_nums] if file_nums else (),
            film_nums if isinstance(film_nums, list) else [film_nums] if film_nums else ()
        )
    ), columns=["id", "entity_name", "cik", "located", "incorporated", "file_num", "film_num"], dtype=str, copy=False)

    form = source["form"]
    root_form = source["root_form"]
    form_title = ""
    if root_form in _FORMS:
        form_title = f" ({_FORMS[root_form]['title']})"
    file_type = source["file_type"]
    if not file_type:
        file_type = source["file_description"]
    if not file_type:
        file_type = filename
    ciks = rows["cik"]

    info = pd.Series({
        "id": _id,
        "form_file": f"{form}{form_title}{'' if form == file_type else f' {file_type}'}",
        "file_date": source["file_date"],
        "period_ending": source.get("period_ending", None),
        "file_ext": filename_ext,
        "url": _concat_to_url(ciks[ciks.notnull()].iloc[-1], adsh.replace('-', ''), filename),
        "parser": getattr(parsers, f"_parse_{filename_ext.lower()}", None)
    }, dtype=object, copy=False)
    return rows, info


def _rename(index):
    return f"paragraph{index + 1}"


def _unwrap(hits: Dict[str, Any]):
    print(hits)
    total_hits = hits["total"]
    if total_hits["relation"] == "gte":
        _LOGGER.warning(f"The query returns a result exceeding the 10k limit")
    #print(hits["hits"])
    #hits =  json.loads(hits.decode('utf-8'))["hits"]["hits"]
    #print(hits)
    return hits["hits"]
    #return hits


def chop_ciks(
    ciks: Optional[Union[Path, int, str, List[Any]]],
    ciks_per_query: int
) -> Generator[Optional[List[str]], None, None]:
    # defaults to None
    _ciks: Optional[List[str]] = None
    # if the provided parameter is a Path, read the CIKs from the file
    if isinstance(ciks, Path):
        try:
            with open(ciks, "r", encoding="UTF-8") as f:
                try:
                    _ciks = [f"{int(cik):010}" for cik in f.read().splitlines()]
                except ValueError as e:
                    raise ValueError(f"{ciks} contains invalid CIKs") from e
        except IOError as e:
            raise ValueError(f"{ciks} is not a valid file") from e
    # if it's an iterable of values, treat all values as CIKs
    elif isinstance(ciks, list):
        try:
            _ciks = [f"{int(cik):010}" for cik in ciks]
        except ValueError as e:
            raise ValueError(f"{ciks} is not a valid CIK list") from e
    # if it's a single string, consider it as a single CIK
    elif isinstance(ciks, str):
        try:
            _ciks = [f"{int(ciks):010}"]
        except ValueError as e:
            raise ValueError(f"{ciks} is not a valid CIK") from e
    # same as previous with the preferred (int) type
    elif isinstance(ciks, int):
        _ciks = [f"{ciks:010}"]

    if _ciks:
        for i in range(0, len(_ciks), ciks_per_query):
            yield _ciks[i:i + ciks_per_query]
    else:
        yield None


async def crawl(
    phrases: List[str],
    filing_types: List[str],
    start_date: date,
    end_date: date,
    interval: Optional[Dict[str, int]],
    ciks: Optional[Union[Path, int, str, List[Any]]],
    ciks_per_query: int,
    buffer_chunk_size: int,
    output_name: str,
    output_format: str,
    headers:dict
):
    fixed_query: Dict[str, Any] = {
        "q": " ".join(f"\"{phrase}\"" for phrase in phrases),
        "category": "custom",
        "forms": filing_types,
        "dateRange": "custom"
    }

    phrases_regex = regex.compile(
        "|".join(f"(?:{phrase})" for phrase in map(regex.escape, phrases)),
        regex.V1 | regex.IGNORECASE
    )

    semaphore = asyncio.Semaphore(10)
    async with ClientSession(headers=headers) as c:
        dfs, infos = zip(*[
            _parse_hit(hit) 
            for task in [
                asyncio.create_task(_fetch(c, semaphore, fixed_query, ciks, *period))
                for ciks in chop_ciks(ciks, ciks_per_query)
                for period in chop_periods(start_date, end_date, interval, _iso)
            ]
            for hit in _unwrap(json.loads(_decode(*await task))["hits"]) if _parse_hit(hit)
        ])

        
        df = pd.concat(dfs, ignore_index=True, copy=False)
        df.drop_duplicates(inplace=True, ignore_index=True)
        df.set_index(keys="id", inplace=True, verify_integrity=False)
        del dfs

        info = pd.DataFrame(infos, dtype=object, copy=False)
        info.dropna(subset=["parser"], inplace=True)
        info.drop_duplicates(subset="id", inplace=True, ignore_index=True)
        info.set_index(keys="id", inplace=True)
        del infos

        dl_info = info[["url", "parser"]]
        del info["parser"]
        downloaded = pd.DataFrame([
            pd.Series(filter(phrases_regex.search, (
                _CC_REGEX.sub("", _WHITESPACE_REGEX.sub(" ", s).strip())
                for s in parser(*await task).split("\n\n")
            )), copy=False)
            for div_info in (
                dl_info.iloc[s:s + buffer_chunk_size]
                for s in range(0, info.shape[0], buffer_chunk_size)
            )
            for task, parser in zip([
                asyncio.create_task(_download(c, semaphore, url))
                for url in div_info["url"]
            ], div_info["parser"])
        ], index=info.index, dtype=str, copy=False)
        downloaded.dropna(how="all", inplace=True)
        downloaded.rename(columns=_rename, copy=False, inplace=True)
        del dl_info

    format_spec = _FORMAT_SPEC[output_format]
    getattr(
        df.join(info, how="left").join(downloaded, how="left"),
        f"to_{format_spec['function_suffix']}"
    )(
        Path(f"{output_name}.{format_spec['suffix']}"),
        header=True,
        index=False,
        # index=True,
        **format_spec["extra_args"]
    )

## Parameters

This section defines all customisable parameters.

- **PHRASES** (`List[str]`): A list of keywords or phrases to search for. Can be an empty list.

- **DATE_START** & **DATE_END** (both `date`): As indicated by the name. But it should conform to the ISO time format, i.e., YYYY-MM-DD as shown in the example.

- **INTERVAL** (`Optional[Dict[str, int]]`): The interval of each period, `None` implies the whole period will be searched at once. Reducing the interval will result in more queires have to be made, but it will be useful if the number of results returned exceed the maximum capicity (10000) in one query.

- **FILING_TYPES** (`List[str]`): A list of filling types. I can add pre-check for this variable, but since we assume that all inputs are valid, the check was not added.

- **CIKS** (`Optional[Union[Path, int, str, List[Union[int, str]]]]`): A list of CIKs in no more than 10 digits, or it can be a path to the file containing all CIKs for the query.

- **CIKS_PER_QUERY** (`int`): Controls the number of CIKs included in one query. Recommended value is 5, but can be adjusted in case the number of results returned exceed the maximum capacity (10000) in one query.

- **BUFFER_CHUNK_SIZE**: The maximum number of files allowed to be cached in the memory.

- **OUTPUT_NAME**: The file name without the suffix of the output file.

- **OUTPUT_FORMAT**: The file format of the output file.

In [69]:
from datetime import date
from pathlib import Path
headers = {"User-Agent":"a1835057@student.adelaide.edu.au"}#,'Accept-Encoding':"text","Host":"www.sec.gov"

_PHRASES = ["data breach", "cyber security"]

_FILING_TYPES = [""]#"10-K", "10-Q"

_DATE_START = date.fromisoformat("2001-12-01")
_DATE_END = date.fromisoformat("2023-12-31")

# _INTERVAL = {
#     "years": 0,
#     "months": 1,
#     "weeks": 0,
#     "days": 0
# }
_INTERVAL = None # can be optional

# _CIKS = [1961, "0000003116"] # accept a plain list of the CIKs
#_CIKS = Path("sample_input_file1.txt") # accept a file path
# _CIKS = 1961 # accept a single CIK as an integer
_CIKS = "0000006201" # accept a single CIK as a string
# _CIKS = None # can be optional

_CIKS_PER_QUERY = 5 # will be ignored if no CIKs is provided

_BUFFER_CHUNK_SIZE = 100

_OUTPUT_NAME = "20231224.xlsx"

# _OUTPUT_FORMAT = "csv"
_OUTPUT_FORMAT = "excel"

await crawl(
    _PHRASES,
    _FILING_TYPES,
    _DATE_START,
    _DATE_END,
    _INTERVAL,
    _CIKS,
    _CIKS_PER_QUERY,
    _BUFFER_CHUNK_SIZE,
    _OUTPUT_NAME,
    _OUTPUT_FORMAT,
    headers
)

{'total': {'value': 1, 'relation': 'eq'}, 'max_score': 2.842336, 'hits': [{'_index': 'edgar_file', '_type': '_doc', '_id': '0001193125-23-085782:d261382dars.pdf', '_score': 2.842336, '_source': {'ciks': ['0000006201'], 'period_ending': '2022-12-31', 'root_form': 'ARS', 'file_num': ['001-08400'], 'display_names': ['American Airlines Group Inc.  (AAL)  (CIK 0000006201)'], 'xsl': None, 'sequence': '1', 'file_date': '2023-03-30', 'biz_states': ['TX'], 'sics': ['4512'], 'form': 'ARS', 'adsh': '0001193125-23-085782', 'film_num': ['23781867'], 'biz_locations': ['Fort Worth, TX'], 'file_type': 'ARS', 'file_description': 'ARS', 'inc_states': ['DE'], 'items': []}}]}


NameError: name 'PyPdfError' is not defined