### TASK_01
#### Coding Assignment Task # 1
- Dataset: IEDGAR SEC filings (public data) -
- https://huggingface.co/datasets/eloukas/edgar-corpus
- Language: Python
- Implementation: Pyspark
- Submission: Github repository containing code + plots (jpeg).
- Expected Maximum Duration: 3 hours

Given a set of documents: create a solution that allows the end user to understand the documents in a two dimensional space and to identify outliers.
Task #2 – Gen AI

Dataset

( Year: 2018-2020

( Filing type: 10K

( Sections: All

( Company: Choose 1.

( Choose 5 data attributes to extract from a single year.

Steps

( Convert documents to chunks

( Covert chunks to embeddings

( Create a query

( Create a prompt to extract data from chunks from a specific year.

( Create a validation dataset (5 true values from chunks).

( Demonstrate that your LLM can retrieve the correct chunks from your

embedding object for the correct year

#### Dataset card:
This dataset card is based on the paper EDGAR-CORPUS: Billions of Tokens Make The World Go Round authored by Lefteris Loukas et.al, as published in the ECONLP 2021 workshop.
This dataset contains the annual reports of public companies from 1993-2020 from SEC EDGAR filings.
There is supported functionality to load a specific year.
Care: since this is a corpus dataset, different train/val/test splits do not have any special meaning. It's the default HF card format to have train/val/test splits.
If you wish to load specific year(s) of specific companies, you probably want to use the open-source software which generated this dataset, EDGAR-CRAWLER: https://github.com/nlpaueb/edgar-crawler.

In [1]:
# import os
import regex as re
from collections import defaultdict
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.patches import Patch
import colorcet as cc
from scipy.spatial.distance import cdist

import datasets
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler as sklearnScaler
from sklearn.decomposition  import PCA as sklearnPCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans, DBSCAN
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import silhouette_score


import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import size, col, count, when, length, concat_ws, \
                udf, explode, pandas_udf, lit, count, avg, max as spark_max, min as spark_min, length, expr
from pyspark.sql.types import ArrayType, StringType, FloatType,  StructType, StructField, DoubleType

from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.ml.feature import StandardScaler, PCA

from pyspark.ml.functions import vector_to_array


from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
import requests
from dotenv import load_dotenv
from openai import OpenAI



import spacy
NER = spacy.load("en_core_web_sm")
from math import sqrt

import torch
print(torch.cuda.is_available())





# Load environnt variables from .env file
# from dotenv import load_dotenv
# load_dotenv()
# os.environ["HUGGINGFACE_API_KEY"] = os.getenv("HUGGINGFACE_API_KEY")
import os
os.environ['JAVA_HOME'] = r'D:\Softwares\Microsoft\jdk-17.0.15.6-hotspot'

# import sys


# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from IPython.display import display, HTML
display(HTML("<style>div.output_scroll { height: 44em; }</style>"))


  from .autonotebook import tqdm as notebook_tqdm


False


In [2]:
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
print(sys.executable)

D:\Softwares\Anaconda3\envs\coding_task_venv\python.exe


In [4]:
# Start Spark session
spark = SparkSession.builder \
    .appName("AIG_SEC_Filing_Analysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.python.worker.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.sql.execution.pyspark.udf.faulthandler.enabled","true") \
    .config("spark.python.worker.faulthandler.enabled", "true").getOrCreate()


# spark = SparkSession.builder \
#     .appName("AIG_SEC_Filing_Analysis").getOrCreate()

spark.sparkContext.setLogLevel("WARN")



### Based on the observations above :
- The text will retain context if split with "\n"
- It might also help to consider ":" as that sometime is identifying the start of a sub-section [ Text between "\n" and ":" will be the header]
- ";" might signal the end of a contextually similar section
- Size of each chunk should be less than 512 tokens or about 2000 characters (to begin with) as I am planning to use BERT based sentence transformers
- An overlap startegy of 200 characters to preserve context

In [5]:
def create_custom_chunks(text:str, overlap: int = 200, chunk_size = 2000)-> list:
    if not isinstance(text, str):
        return []
    text = re.sub(r'[\x00-\x09\x0B-\x1F\x7F-\x9F]','', text)
    chunks = []

    
    if len(text) <= chunk_size:
        return [text.strip()] if text.strip() else []
        
    prev_chunk_len = 0  
    
    while len(text) > chunk_size:
        right = chunk_size
        
        last_newline_index = text[:chunk_size].rfind("\n")
        
        if last_newline_index > 0:
            right = last_newline_index

        # to ensure chunks have complete words instead of cut words
        # if right > 0 and text[right-1].isalnum() and text[right].isalnum():
        #     last_space = text[:right].rfind(' ')
        #     if last_space > right-50 and last_space > 0:
        #         right = last_space
            
        chunk = text[:right].strip()
        if chunk:
            chunks.append(chunk)
        
        if len(chunks) > 1 and prev_chunk_len < 1000 and len(chunks[-1]) < 1000 :
            chunks[-2]+=" " + chunks[-1]
            chunks = chunks[:-1]
        
        prev_chunk_len = len(chunks[-1])
        
        if right > overlap:
            start = right - overlap
            # Prevent index error
            while start < len(text):
                if start == 0:
                    break
                if not (text[start].isalnum() and text[start-1].isalnum()):
                    break
                start += 1
            text = text[start:]
        else:
            text = text[right:]
            
    if text:
        rem = text.strip()
        if rem:
            chunks.append(rem)        

    return chunks


In [6]:
df_all_companies = spark.read.parquet("D:\\PracticeProjects\\TASK_01\\Data\\data.parquet")
df = df_all_companies.limit(1)

In [7]:
df.printSchema()

root
 |-- filename: string (nullable = true)
 |-- cik: string (nullable = true)
 |-- year: string (nullable = true)
 |-- section_1: string (nullable = true)
 |-- section_1A: string (nullable = true)
 |-- section_1B: string (nullable = true)
 |-- section_2: string (nullable = true)
 |-- section_3: string (nullable = true)
 |-- section_4: string (nullable = true)
 |-- section_5: string (nullable = true)
 |-- section_6: string (nullable = true)
 |-- section_7: string (nullable = true)
 |-- section_7A: string (nullable = true)
 |-- section_8: string (nullable = true)
 |-- section_9: string (nullable = true)
 |-- section_9A: string (nullable = true)
 |-- section_9B: string (nullable = true)
 |-- section_10: string (nullable = true)
 |-- section_11: string (nullable = true)
 |-- section_12: string (nullable = true)
 |-- section_13: string (nullable = true)
 |-- section_14: string (nullable = true)
 |-- section_15: string (nullable = true)



In [8]:
df.count()

1

In [9]:
# chunking_udf = udf(create_custom_chunks, ArrayType(StringType()))       
@pandas_udf(returnType = ArrayType(StringType()))
def chunk_pandas_udf(series):
    return series.apply(create_custom_chunks)



In [10]:
# columns = [column for column in df.columns if column not in ["filename", "cik", "year"]]
# for column in columns:
#     df = df.withColumn(f"{column}"+f"_chunked", chunk_pandas_udf(col(column)))

# for column in columns:
#     df_chunked = df.select("cik", "filename", explode(col(f"{column}_chunked")).alias("chunk")).withColumn("section_name", column))
# # for column in columns:
# #     df = df.withColumn(f"{column}"+f"_chunked", chunking_udf(col(column)))


columns = [column for column in df.columns if column not in ["filename", "cik", "year"]]

for column in columns:
    df = df.withColumn(column, when(col(column).isNotNull(), col(column)).otherwise(lit("")))
# Create chunks
for column in columns:
    df = df.withColumn(f"{column}_chunked", chunk_pandas_udf(col(column)))

# Explode and collect all sections
all_chunks = []
for column in columns:
    df_section = df.select(
        "cik",
        "filename",
        "year",
        explode(col(f"{column}_chunked")).alias("chunk")
    ).withColumn("section_name", lit(column)).withColumn("chunk_id", expr("uuid()"))
    
    all_chunks.append(df_section)

# Union all
df_exploded = all_chunks[0]
for chunk_df in all_chunks[1:]:
    df_exploded = df_exploded.union(chunk_df)



In [11]:
print(df_exploded.columns)

['cik', 'filename', 'year', 'chunk', 'section_name', 'chunk_id']


In [12]:
df_exploded_pd = df_exploded.toPandas()

In [13]:
texts = df_exploded_pd["chunk"].tolist()

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device)

embeddings = model.encode(
    texts, batch_size=64, show_progress_bar=True, convert_to_numpy=True
)

Batches: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:29<00:00, 14.84s/it]


In [14]:
has_nan = np.isnan(embeddings).any()
print(f"Contains NaNs? {has_nan}")

# Check if any embedding vectors are empty or zero vectors
empty_vectors = np.sum(embeddings, axis=1) == 0
print(f"Number of empty embeddings: {np.sum(empty_vectors)}")

Contains NaNs? False
Number of empty embeddings: 0


In [15]:
embeddings.shape

(107, 768)

In [16]:
df_exploded_pd["embeddings"] = embeddings.tolist()

In [17]:
df_cleaned = spark.createDataFrame(df_exploded_pd)

In [18]:
df_cleaned.printSchema()

root
 |-- cik: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- year: string (nullable = true)
 |-- chunk: string (nullable = true)
 |-- section_name: string (nullable = true)
 |-- chunk_id: string (nullable = true)
 |-- embeddings: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [19]:
df_embeddings = df_cleaned

In [51]:
query = "What does the bank compete with?"
extracted_year = "2020"

In [52]:
query_embedding = model.encode(query, convert_to_numpy = True)
query_embedding = query_embedding.reshape(1,-1)
query_embedding.shape

(1, 768)

In [53]:
# def find_year(text):
#     if not isinstance(text, str):
#         return None
#     entities = NER(text).ents
#     year = [ent.text for ent in entities if ent.label_ == "DATE"]
#     pattern = r'20\d{2}|19\d{2}'
#     if year:
#         extracted_year = re.findall(pattern, year[0])[0]
#         return extracted_year
#     else:
#         return None
# extracted_year = find_year(query)       

In [54]:
if extracted_year:
    filtered_embeddings_df = df_embeddings.filter(col("year") == extracted_year)
else:
    filtered_embeddings_df = df_embeddings
    

In [55]:
filtered_embeddings_df.printSchema()

root
 |-- cik: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- year: string (nullable = true)
 |-- chunk: string (nullable = true)
 |-- section_name: string (nullable = true)
 |-- chunk_id: string (nullable = true)
 |-- embeddings: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [56]:
# def cosine_similarity(embedding1, embedding2):
#     dot_product = sum(a * b for a,b in zip(embedding1, embedding2))
#     norm1 = sqrt(sum(a * a for a in embedding1))
#     norm2 = sqrt(sum(b * b for b in embedding2))
#     return dot_product/(norm1 * norm2)

# @pandas_udf(returnType = DoubleType())
# def similarity_pandas_udf(embedding_series, query_embedding):
#     query_embedding = query_broadcast.value
#     return embedding_series.apply(cosine_similarity, query_embedding)
    

In [57]:
# Precompute query vector
query_vector = Vectors.dense(query_embedding)

@udf(DoubleType())
def cosine_similarity_udf(vec):
    # Convert list to DenseVector
    if isinstance(vec, list):
        vec = Vectors.dense(vec)
    
    dot = float(vec.dot(query_vector))
    norm1 = float(vec.norm(2))
    norm2 = float(query_vector.norm(2))
    
    return dot / (norm1 * norm2) if norm1 != 0 and norm2 != 0 else 0.0

df_result = filtered_embeddings_df.withColumn("score", cosine_similarity_udf(col("embeddings")))

In [58]:
df_result.printSchema()

root
 |-- cik: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- year: string (nullable = true)
 |-- chunk: string (nullable = true)
 |-- section_name: string (nullable = true)
 |-- chunk_id: string (nullable = true)
 |-- embeddings: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- score: double (nullable = true)



In [59]:
top_chunks_df = df_result.orderBy(col("score").desc()).select("cik","year", "section_name","chunk","chunk_id").limit(3)

In [60]:
top_chunks_df.count()

Py4JJavaError: An error occurred while calling o860.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 50.0 failed 1 times, most recent failure: Lost task 10.0 in stage 50.0 (TID 321) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:624)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:263)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:265)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:333)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.writeNextInputToStream(PythonUDFRunner.scala:69)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:844)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:767)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:98)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:90)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.convert.JavaCollectionWrappers$IteratorWrapper.hasNext(JavaCollectionWrappers.scala:32)
	at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:795)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:42)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$doExecute$12(limit.scala:361)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: An established connection was aborted by the software in your host machine
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:76)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:855)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:767)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
	... 57 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:462)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:318)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:316)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:312)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:624)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:263)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:265)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:333)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.writeNextInputToStream(PythonUDFRunner.scala:69)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:844)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:767)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:98)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:90)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at scala.collection.convert.JavaCollectionWrappers$IteratorWrapper.hasNext(JavaCollectionWrappers.scala:32)
	at org.sparkproject.guava.collect.Ordering.leastOf(Ordering.java:795)
	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:42)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$doExecute$12(limit.scala:361)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	... 3 more
Caused by: java.io.IOException: An established connection was aborted by the software in your host machine
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:76)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:855)
	at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:767)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
	... 57 more


In [None]:
top_chunks_df.printSchema()

In [None]:
top_k_context = top_chunks_df.collect()
context = "\n\n".join([row["chunk"] for row in top_k_context])

In [None]:
top_k_context

In [None]:
prompt = f"""This is a SEC 10K filing context. As an expert financial analyst, extract information about the query {query}. 
Use the below context to answer with just the exact dollar amount. If answer is not found, reply - not found.
Context: {context}
Answer:"""

In [None]:
prompt

In [None]:
load_dotenv()

In [None]:
opeai_token = os.getenv("OPENAI_API_KEY")

In [None]:
llm_client = OpenAI()

In [None]:
response = llm_client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": prompt}]
)
answer = response.choices[0].message.content

In [None]:
print(answer)

### Validation

In [303]:
validation_data = [
    {
        "query": "What is the outstanding amount of Subordinated Debentures in 2020?",
        "expected_value": "$12,887,000",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
    {
        "query": "What is the aggregate market value of the voting stock held by non-affiliates in 2020 ?",
        "expected_value": "$65,684,788",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
    {
        "query": "How much money was allocated under the HHSB Act in 2020 ?",
        "expected_value": "$284.45 billion",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },    
    {
        "query": "What is the FDIC’s standard maximum deposit insurance amount in 2020 ?",
        "expected_value": "$250,000",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
    {
        "query": "What is the aggregate market value of the voting stock held by non-affiliates in 2018 ?",
        "expected_value": "$82,097,082",
        "year": "2018",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
    {
        "query": "What amount of wholesale deposit was purchased in 2018 ?",
        "expected_value": "$35.3 million",
        "year": "2018",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    }
    {
        "query": "How much money was allocated under the HHSB Act in 2018 ?",
        "expected_value": "not found",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
    {
        "query": "How much money was allocated under the CARES Act in 2018 ?",
        "expected_value": "not found",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
    {
        "query": "What is the total FDIC insurance assessment in 2019 ?",
        "expected_value": "$64,079",
        "year": "2020",
        "chunk_id": "actual_chunk_id_here",
        "section": "item_8"
    },
]
   
        
    