In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("trading_sentiment_platform").getOrCreate()

raw_file = spark.read.table("sec_filings.raw_filing_doc")
display(raw_file.limit(5))

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

cleaned_filing_schema = StructType([
    StructField("cik", StringType(), True),
    StructField("company_name", StringType(), True),
    StructField("form_type", StringType(), True),
    StructField("filing_date", DateType(), True),
    StructField("accessionNumber", StringType(), True),
    StructField("filing_url", StringType(), True),
    StructField("text", StringType(), True),
    StructField("risks_text", StringType(), True),
    StructField("mda_text", StringType(), True)
])

In [0]:
from bs4 import BeautifulSoup
import requests
import re

headers = {"User-Agent": "Joel Doh joeljuniordoh19@gmail.com"}

def parsing(filing_url):
    try:
        response = requests.get(filing_url, headers=headers)
        
        if response.status_code != 200:
            return ("", "", "")
        soup = BeautifulSoup(response.text, "html.parser")
        text = soup.get_text(separator=" ", strip=True)

        cleaned_text = re.sub(r"\s+", " ", text)
        upper_text = cleaned_text.upper()

        upper_text_risk = None
        risk_iter = list(re.finditer(r"ITEM\s*1A\.?\s*RISK FACTORS", upper_text))
        if len(risk_iter) > 1:
            upper_text_risk = upper_text[risk_iter[1].start():]

        risks_match = re.search(r"(?is)ITEM\s*1A\.?\s*RISK FACTORS(.*?)(ITEM\s*1B\.|ITEM\s*2\.|ITEM\s*3\.|ITEM\s*4\.)", upper_text_risk)
        risks_text = risks_match.group(1).strip() if risks_match else ""

        upper_text_mda = None
        mda_iter = list(re.finditer(r"ITEM\s*(7|2)\.?\s*MANAGEMENT[^A-Z]{0,5}?S\s+DISCUSSION\s+AND\s+ANALYSIS", upper_text))
        if len(mda_iter) > 1:
            upper_text_mda = upper_text[mda_iter[1].start():]
        elif len(mda_iter) == 1:
            upper_text_mda = upper_text[mda_iter[0].start():]
        else:
            upper_text_mda = upper_text

        mda_match = re.search(r"(?is)ITEM\s*(7|2)\.?\s*MANAGEMENT[^A-Z]{0,5}?S\s+DISCUSSION\s+AND\s+ANALYSIS(.*?)(?=ITEM\s*(7A|3)\.|ITEM\s*8\.|PART\s*III\.)", upper_text_mda)
        mda_text = mda_match.group(2).strip() if mda_match else ""

        return (cleaned_text, risks_text, mda_text)

    except Exception as e:
        print(f"Error parsing {filing_url}: {e}")

        return ("", "", "")



In [0]:
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
# print(parsing("https://www.sec.gov/Archives/edgar/data/320193/000032019325000079/aapl-20250927.htm")[2])
# print("")
# print(parsing("https://www.sec.gov/Archives/edgar/data/320193/000032019325000079/aapl-20250927.htm")[3])
# print("")
# print(parsing("https://www.sec.gov/Archives/edgar/data/320193/000032019325000079/aapl-20250927.htm")[5])
# print("")
# print(parsing("https://www.sec.gov/Archives/edgar/data/320193/000032019325000079/aapl-20250927.htm")[4])
# print("")
# print(parsing("https://www.sec.gov/Archives/edgar/data/320193/000032019325000079/aapl-20250927.htm")[6])

parse_udf = udf(parsing, StructType([
    StructField("cleaned_text", StringType()),
    StructField("risks_text", StringType()),
    StructField("mda_text", StringType())
]))

parsed_df = (
    raw_file
    .withColumn("parsed", parse_udf(F.col("filing_url")))
    .select(
        "cik", "company_name", "form_type", "filing_date",
        "accessionNumber", "filing_url",
        F.col("parsed.cleaned_text").alias("text"),
        F.col("parsed.risks_text").alias("risks_text"),
        F.col("parsed.mda_text").alias("mda_text")
    )
)



display(parsed_df.limit(50))

In [0]:
parsed_df.write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("sec_filings.clean_filing_doc")