In [17]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
import json
import sys
import concurrent.futures
import re
from tqdm import tqdm # Progress bar for visualizing data cleaning progress

# Defined global variables
csvFile = "./truncatedData.csv";
cikFile = "./cik-lookup-data.txt";
minDate = datetime(2001, 1, 1);
formTypes = ["PREM14A", "S-4", "SC 14D9", "SC TO-T"];
mainIndex = 0;

# Read the CSV file and extract the date & both merging companies (index base)
filedDate = pd.read_csv(csvFile, header=None).iloc[:, 1].tolist();
companyAList = pd.read_csv(csvFile, header=None).iloc[:, 2].tolist();
companyBList = pd.read_csv(csvFile, header=None).iloc[:, 3].tolist();

In [18]:
# Acquire the constraint of a given date.
# Pad 2 months backward and forward for constraint.
def getDateConstraints(date):
    originalDate = datetime.strptime(date, "%m/%d/%Y");

    # Define the lower-bound date
    lbMonth = originalDate.month - 2;
    if (lbMonth <= 0): # Case: Wrap to previous year
        lbMonth += 12;
        lbYear = originalDate.year - 1;
    else: # Case: Still on current year
        lbYear = originalDate.year;

    # Construct lower-bound date
    try:
        lowerBoundDate = originalDate.replace(year=lbYear, month=lbMonth);
    except ValueError: # Catch potential error i.e. feb. 30 not existing
        lowerBoundDate = originalDate.replace(year=lbYear, month=lbMonth, day=1);

    # Ensure the new date does not go below the minimum date
    if (lowerBoundDate < minDate):
        lowerBoundDate = minDate;

    
    # Define the upper-bound date
    ubMonth = originalDate.month + 2;
    if (ubMonth > 12): # Case: Wrap to next year
        ubMonth -= 12;
        ubYear = originalDate.year + 1;
    else: # Case: Still on current year
        ubYear = originalDate.year;

    # Construct upper-bound date
    try:
        upperBoundDate = originalDate.replace(year=ubYear, month=ubMonth);
    except ValueError: # Catch potential error i.e. feb. 30 not existing
        upperBoundDate = originalDate.replace(year=ubYear, month=ubMonth + 1, day=1);

    return [lowerBoundDate, upperBoundDate];

In [19]:
# Acquire the CIK for further filtering in document look-up
def getCIKs(companyName):
    cikList = [];

    # Create a request that mimics browser activity
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
        "Referer": "https://www.sec.gov/search-filings/cik-lookup"
    }

    formData = {
        "company": companyName
    }

    url = "https://www.sec.gov/cgi-bin/cik_lookup";

    # Request the search query & acquire the DOM elements
    response = requests.post(url, headers=headers, data=formData);
    if (response.status_code != 200):
        sys.exit(response.status_code);

    soup = BeautifulSoup(response.text, "html.parser");

    # Extract the CIK from the DOM elements
    preTag = soup.find_all("pre");
    if (len(preTag) >= 2):
        preTag = preTag[1];
        for anchor in preTag.find_all("a"):
            cikList.append(anchor.text.strip());
    else:
        print("ERROR: CIK not found!");
        sys.exit(1);
    
    return cikList;

In [20]:
# Acquire all the json documents for the given company
def getDocumentJson(companyName, dateLB, dateUB, formTypes, cik):
    url = f"https://efts.sec.gov/LATEST/search-index?q={companyName}&dateRange=custom&category=custom&startdt={dateLB}&enddt={dateUB}&forms={formTypes}&filter_ciks={cik}";
    print(url);

    # Create a request that mimics browser activity
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
        "Referer": "https://www.sec.gov/"
    }

    # Request the search query & acquire the DOM elements
    response = requests.get(url, headers=headers);
    if (response.status_code != 200):
        print("FATAL: getDocumentJson response yielded an error!");
        sys.exit(response.status_code);
    
    data = response.json();
    totalValue = data["hits"]["total"]["value"];

    if (totalValue > 0):
        return data["hits"]["hits"];
    else:
        return None;

In [21]:
# Formulate the source document links from the search result json
def getSourceLinks(documentJson):
    # Formulate all source document file links
    sourceLinks = [];

    # Iterate through each json object and construct the source document file links
    for document in documentJson[0]:
        try:
            # Get the CIK id or if there is multiple, then acquire the last one
            validatedCik = None;
            ciks = document["_source"]["ciks"];
            if ciks:
                lastCIK = ciks[-1];
                validatedCik = lastCIK;

            # Remove leading zeros from the CIK
            validatedCik.lstrip('0');
        
            # Acquire normal adsh & adsh without the "-" character
            adsh = document["_source"]["adsh"];
            truncatedADSH = document["_source"]["adsh"].replace("-", "");
            
            sourceLinks.append(f"https://www.sec.gov/Archives/edgar/data/{validatedCik}/{truncatedADSH}/{adsh}.txt");
        except KeyError as e:
            print(f"Missing key in document: {e}, result: {document}");
            continue; # Skip the document if there is a missing key; logged for further investigation

    return sourceLinks;

In [22]:
"""
- Apparently "Cos" as a 2nd word means "Companies" in the search query.
- For CIK lookup, .com is replaced with Com.
- For CIK lookup, remove content within parentheses.
"""
def restructCompanyName(companyName, use_url_encoding=False):
    words = companyName.split()
    if len(words) > 1 and words[1] == "Cos":
        words[1] = "Companies"
    
    if not use_url_encoding:
        words = [word.replace(".com", " Com") for word in words]
        # Remove content within parentheses
        companyName = re.sub(r'\(.*?\)', '', ' '.join(words)).strip()
        words = companyName.split()
    
    separator = "%20" if use_url_encoding else " "
    return separator.join(words)

In [23]:
def searchDocuments(searchCompany, pairCompany, dateLB, dateUB, formTypes):
    restructName = restructCompanyName(searchCompany, use_url_encoding=True);

    # Multi-thread the ciks to concurrently process https request for companyA
    with concurrent.futures.ThreadPoolExecutor() as executor:
        ciks = getCIKs(restructCompanyName(pairCompany, use_url_encoding=False));
        results = list(executor.map(
            getDocumentJson,
            [restructName] * len(ciks),
            [dateLB] * len(ciks),
            [dateUB] * len(ciks),
            [formTypes] * len(ciks),
            ciks
        ));

    return results;

In [None]:
def main():
    # for mainIndex in tqdm(range(len(companyAList)), desc="Scanning", leave=False):
    # Reconstruct the constraint date & form type for url parsing
    constraintDates = getDateConstraints(filedDate[mainIndex]);
    lbDate, ubDate = constraintDates;
    restructLB = f"{lbDate.year}-{lbDate.month:02}-{lbDate.day:02}";
    restructUB = f"{ubDate.year}-{ubDate.month:02}-{ubDate.day:02}";
    restructForms = "%2C".join(formTypes).replace(" ", "%20");

    # We will first attempt to search for the "Background of the Merger" via companyA;
    # if not found, then we will search for companyB.
    results = searchDocuments(companyAList[mainIndex], companyBList[mainIndex], restructLB, restructUB, restructForms);
    if all(result is None for result in results):
        # Search for companyB documents
        results = searchDocuments(companyBList[mainIndex], companyAList[mainIndex], restructLB, restructUB, restructForms);
        if all(result is None for result in results):
            print("No results found for index:", mainIndex, "; companies: ", companyAList[mainIndex], " & ", companyBList[mainIndex]);

    # for link in getSourceLinks(results):
    #     print(link);

if __name__ == '__main__':
    main();