In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SEC_Filings_Processor") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2") \
    .getOrCreate()

hadoop_conf = spark._jsc.hadoopConfiguration()

# List of properties that might have duration strings
duration_props = [
    "fs.s3a.threads.keepalivetime",
    "hadoop.security.groups.shell.command.timeout",
    "hadoop.service.shutdown.timeout",
    "yarn.resourcemanager.delegation-token-renewer.thread-timeout", 
    "yarn.federation.gpg.webapp.connect-timeout",
    "yarn.federation.gpg.webapp.read-timeout",
    "fs.s3a.retry.interval",
    "fs.s3a.retry.throttle.interval",
    "fs.s3a.connection.ttl",
    "fs.s3a.multipart.purge.age"
]

print("Clearing problematic properties...")

hadoop_conf.set(
    "fs.s3a.aws.credentials.provider",
    "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
)
for prop in duration_props:
    hadoop_conf.unset(prop)
    print(f"  Unset: {prop}")

# setting them with numeric values where needed
numeric_props = {
    "fs.s3a.threads.keepalivetime": "60",  # seconds as number
    "hadoop.security.groups.shell.command.timeout": "0",  # 0 seconds
    "fs.s3a.retry.interval": "500",  # milliseconds
    "fs.s3a.retry.throttle.interval": "100",  # milliseconds
    "fs.s3a.connection.ttl": "300000",  # 5 minutes in ms
}

for prop, value in numeric_props.items():
    hadoop_conf.set(prop, value)
    print(f"  Set {prop} = {value}")

# Configure MinIO
minio_configs = {
    "fs.s3a.endpoint": "http://minio:9000",
    "fs.s3a.access.key": "admin",
    "fs.s3a.secret.key": "password", 
    "fs.s3a.path.style.access": "true",
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "fs.s3a.connection.ssl.enabled": "false",
    "fs.s3a.connection.timeout": "60000",
    "fs.s3a.socket.timeout": "60000",
    "fs.s3a.connection.establish.timeout": "5000",
}

print("\nSetting MinIO configuration...")
for key, value in minio_configs.items():
    hadoop_conf.set(key, value)
    print(f"  Set {key} = {value}")

print("\nConfiguration complete!")

Clearing problematic properties...
  Unset: fs.s3a.threads.keepalivetime
  Unset: hadoop.security.groups.shell.command.timeout
  Unset: hadoop.service.shutdown.timeout
  Unset: yarn.resourcemanager.delegation-token-renewer.thread-timeout
  Unset: yarn.federation.gpg.webapp.connect-timeout
  Unset: yarn.federation.gpg.webapp.read-timeout
  Unset: fs.s3a.retry.interval
  Unset: fs.s3a.retry.throttle.interval
  Unset: fs.s3a.connection.ttl
  Unset: fs.s3a.multipart.purge.age
  Set fs.s3a.threads.keepalivetime = 60
  Set hadoop.security.groups.shell.command.timeout = 0
  Set fs.s3a.retry.interval = 500
  Set fs.s3a.retry.throttle.interval = 100
  Set fs.s3a.connection.ttl = 300000

Setting MinIO configuration...
  Set fs.s3a.endpoint = http://minio:9000
  Set fs.s3a.access.key = admin
  Set fs.s3a.secret.key = password
  Set fs.s3a.path.style.access = true
  Set fs.s3a.impl = org.apache.hadoop.fs.s3a.S3AFileSystem
  Set fs.s3a.connection.ssl.enabled = false
  Set fs.s3a.connection.timeout 

In [5]:
from pyspark.sql.functions import col, split, explode, udf, regexp_extract
from pyspark.sql.types import StringType
from bs4 import BeautifulSoup
import re

# s3a://raw-data/sec-edgar-filings-raw/AAPL/10-K/0000320193-24-000123.txt -- path
path = "s3a://raw-data/sec-edgar-filings-raw/*/*/*"

raw_df = spark.sparkContext.wholeTextFiles(path).toDF(["filepath", "content"])

                                                                                

In [6]:
raw_df.show()

                                                                                

+--------------------+--------------------+
|            filepath|             content|
+--------------------+--------------------+
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
+--------------------+--------------------+



In [7]:
# Split by <DOCUMENT> tag
docs_df = raw_df.withColumn("raw_doc", split(col("content"), "<DOCUMENT>")) \
    .select(
        col("filepath"), # Keep the filepath so we know the source
        explode(col("raw_doc")).alias("doc_content")
    )

# Filter out empty artifacts from the split
docs_df = docs_df.filter(col("doc_content").rlike("\S"))

In [26]:
docs_df.write.csv("docs_df", header=True, mode="overwrite")

                                                                                

In [8]:
docs_df.show()

+--------------------+--------------------+
|            filepath|         doc_content|
+--------------------+--------------------+
|s3a://raw-data/se...|<SEC-DOCUMENT>000...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|
|s3a://raw-data/se...|\n<TYPE>EX-4.1\n<...|
|s3a://raw-data/se...|\n<TYPE>EX-10.19\...|
|s3a://raw-data/se...|\n<TYPE>EX-10.20\...|
|s3a://raw-data/se...|\n<TYPE>EX-10.21\...|
|s3a://raw-data/se...|\n<TYPE>EX-10.22\...|
|s3a://raw-data/se...|\n<TYPE>EX-19.1\n...|
|s3a://raw-data/se...|\n<TYPE>EX-21.1\n...|
|s3a://raw-data/se...|\n<TYPE>EX-23.1\n...|
|s3a://raw-data/se...|\n<TYPE>EX-31.1\n...|
|s3a://raw-data/se...|\n<TYPE>EX-31.2\n...|
|s3a://raw-data/se...|\n<TYPE>EX-32.1\n...|
|s3a://raw-data/se...|\n<TYPE>EX-97.1\n...|
|s3a://raw-data/se...|\n<TYPE>EX-101.SC...|
|s3a://raw-data/se...|\n<TYPE>EX-101.CA...|
|s3a://raw-data/se...|\n<TYPE>EX-101.DE...|
|s3a://raw-data/se...|\n<TYPE>EX-101.LA...|
|s3a://raw-data/se...|\n<TYPE>EX-101.PR...|
|s3a://raw-data/se...|\n<TYPE>GR

Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
  File "/opt/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
BrokenPipeError: [Errno 32] Broken pipe


In [9]:
# Extract the <TYPE> value
docs_with_type = docs_df.withColumn(
    "report_type", 
    regexp_extract(col("doc_content"), r"<TYPE>(.*?)\n", 1)
)

# Keep only the main 10-K report
ten_k_df = docs_with_type.filter(col("report_type").contains("10-K"))

print("Filtered for 10-K reports.")

Filtered for 10-K reports.


In [10]:
ten_k_df.show()

                                                                                

+--------------------+--------------------+-----------+
|            filepath|         doc_content|report_type|
+--------------------+--------------------+-----------+
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|
+--------------------+--------------------+-----------+



In [11]:
# Define the cleaning function using BeautifulSoup
def clean_html_content(html_text):
    if not html_text:
        return ""
    # Parse HTML
    soup = BeautifulSoup(html_text, "html.parser")
    
    # Remove script/style tags
    for script in soup(["script", "style", "noscript"]):
        script.decompose()
    text = soup.get_text(separator=" ")
    return " ".join(text.split())

# Register as Spark UDF
clean_udf = udf(clean_html_content, StringType())

# Apply cleaning
ten_k_df_cleaned = ten_k_df.withColumn("cleaned_text", clean_udf(col("doc_content")))

In [12]:
ten_k_df_cleaned.show()

                                                                                

+--------------------+--------------------+-----------+--------------------+
|            filepath|         doc_content|report_type|        cleaned_text|
+--------------------+--------------------+-----------+--------------------+
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 aapl-20240...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 aapl-20250...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 amzn-20231...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 amzn-20241...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 goog-20231...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 goog-20241...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 msft-20240...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 msft-20250...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 nflx-20231...|
|s3a://raw-data/se...|\n<TYPE>10-K\n<SE...|       10-K|10-K 1 nflx-20241...|

In [17]:
from pyspark.sql.functions import regexp_extract, col, upper

# pattern: .../AAPL/10-K/...
ticker_pattern = r"\/([^\/]+)\/10-K"

final_df = ten_k_df_cleaned.withColumn(
    "ticker", 
    upper(regexp_extract(col("filepath"), ticker_pattern, 1))
)

output_df = final_df.select("ticker", "report_type", "cleaned_text")

[Stage 10:>                                                         (0 + 1) / 1]

+------+-----------+--------------------+
|ticker|report_type|        cleaned_text|
+------+-----------+--------------------+
|  AAPL|       10-K|10-K 1 aapl-20240...|
|  AAPL|       10-K|10-K 1 aapl-20250...|
|  AMZN|       10-K|10-K 1 amzn-20231...|
|  AMZN|       10-K|10-K 1 amzn-20241...|
| GOOGL|       10-K|10-K 1 goog-20231...|
| GOOGL|       10-K|10-K 1 goog-20241...|
|  MSFT|       10-K|10-K 1 msft-20240...|
|  MSFT|       10-K|10-K 1 msft-20250...|
|  NFLX|       10-K|10-K 1 nflx-20231...|
|  NFLX|       10-K|10-K 1 nflx-20241...|
|  NVDA|       10-K|10-K 1 nvda-20240...|
|  NVDA|       10-K|10-K 1 nvda-20250...|
+------+-----------+--------------------+

root
 |-- ticker: string (nullable = true)
 |-- report_type: string (nullable = false)
 |-- cleaned_text: string (nullable = true)



                                                                                

In [None]:
# result
output_df.show()
output_df.printSchema()

In [25]:
from pyspark.sql.functions import explode, monotonically_increasing_id
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, IntegerType

def redact_pii(text):
    if not text: return ""
    # Regex to find email addresses
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    # Regex to find phone numbers (simple US format)
    phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
    
    text = re.sub(email_pattern, "[EMAIL_REDACTED]", text)
    text = re.sub(phone_pattern, "[PHONE_REDACTED]", text)
    return text

# split text into 1000-character chunks with a little overlap.
def chunk_text(text, chunk_size=1000, overlap=100):
    if not text: return []
    chunks = []
    for i in range(0, len(text), chunk_size - overlap):
        chunks.append(text[i:i + chunk_size])
    return chunks

# Register UDFs
redact_udf = udf(redact_pii, StringType())
chunk_udf = udf(chunk_text, ArrayType(StringType()))

print("Logic defined.")

Logic defined.


In [27]:
redacted_df = output_df.withColumn("redacted_text", redact_udf(col("cleaned_text")))

chunked_df = redacted_df.withColumn("text_chunks", chunk_udf(col("redacted_text")))

# chunks so each chunk gets its own row for the LLM
final_chunks_df = chunked_df.select(
    "ticker", 
    "report_type", 
    explode(col("text_chunks")).alias("chunk_content")
).withColumn("chunk_id", monotonically_increasing_id())

In [21]:
redacted_df.show()

                                                                                

+------+-----------+--------------------+--------------------+
|ticker|report_type|        cleaned_text|       redacted_text|
+------+-----------+--------------------+--------------------+
|  AAPL|       10-K|10-K 1 aapl-20240...|10-K 1 aapl-20240...|
|  AAPL|       10-K|10-K 1 aapl-20250...|10-K 1 aapl-20250...|
|  AMZN|       10-K|10-K 1 amzn-20231...|10-K 1 amzn-20231...|
|  AMZN|       10-K|10-K 1 amzn-20241...|10-K 1 amzn-20241...|
| GOOGL|       10-K|10-K 1 goog-20231...|10-K 1 goog-20231...|
| GOOGL|       10-K|10-K 1 goog-20241...|10-K 1 goog-20241...|
|  MSFT|       10-K|10-K 1 msft-20240...|10-K 1 msft-20240...|
|  MSFT|       10-K|10-K 1 msft-20250...|10-K 1 msft-20250...|
|  NFLX|       10-K|10-K 1 nflx-20231...|10-K 1 nflx-20231...|
|  NFLX|       10-K|10-K 1 nflx-20241...|10-K 1 nflx-20241...|
|  NVDA|       10-K|10-K 1 nvda-20240...|10-K 1 nvda-20240...|
|  NVDA|       10-K|10-K 1 nvda-20250...|10-K 1 nvda-20250...|
+------+-----------+--------------------+--------------

In [29]:
# Show the result
print(f"Total Chunks Generated: {final_chunks_df.count()}")
final_chunks_df.show(5, truncate=50)

                                                                                

Total Chunks Generated: 4269


[Stage 26:>                                                         (0 + 1) / 1]

+------+-----------+--------------------------------------------------+--------+
|ticker|report_type|                                     chunk_content|chunk_id|
+------+-----------+--------------------------------------------------+--------+
|  AAPL|       10-K|10-K 1 aapl-20240928.htm 10-K aapl-20240928 fal...|       0|
|  AAPL|       10-K|urrent http://fasb.org/us-gaap/2024#OtherLiabil...|       1|
|  AAPL|       10-K| 2024-09-28 [PHONE_REDACTED] aapl:A0.500Notesdu...|       2|
|  AAPL|       10-K|nalPaidInCapitalMember 2022-09-24 [PHONE_REDACT...|       3|
|  AAPL|       10-K| us-gaap:RetainedEarningsMember 2024-09-28 [PHO...|       4|
+------+-----------+--------------------------------------------------+--------+
only showing top 5 rows


Traceback (most recent call last):                                              
  File "/opt/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
  File "/opt/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 87, in worker
BrokenPipeError: [Errno 32] Broken pipe


In [30]:
# Write to the 'processed-data' folder
output_path = "s3a://raw-data/processed-data"

final_chunks_df.write \
    .mode("overwrite") \
    .partitionBy("ticker") \
    .parquet(output_path)

print(f"Success! Data saved to {output_path}")

26/01/08 17:49:27 WARN Base64: JAXB is unavailable. Will fallback to SDK implementation which may be less performant.If you are using Java 9+, you will need to include javax.xml.bind:jaxb-api as a dependency.
                                                                                

Success! Data saved to s3a://raw-data/processed-data


In [31]:
# Read back the processed data
gold_df = spark.read.parquet("s3a://raw-data/processed-data")

In [32]:
gold_df.printSchema()

root
 |-- report_type: string (nullable = true)
 |-- chunk_content: string (nullable = true)
 |-- chunk_id: long (nullable = true)
 |-- ticker: string (nullable = true)



In [33]:
print(f"Total Chunks Available: {gold_df.count()}")

Total Chunks Available: 4269


In [34]:
gold_df.groupBy("ticker").count().show()

+------+-----+
|ticker|count|
+------+-----+
|  NVDA|  813|
| GOOGL|  842|
|  AMZN|  695|
|  NFLX|  601|
|  AAPL|  492|
|  MSFT|  826|
+------+-----+

