# Task Description

- **Task Requirements:**  
  The objective of this task is to develop a Gen AI solution that can extract specific data attributes from a selected company's 10-K filings for a given year. The workflow includes:
  - Converting the documents into chunks.
  - Converting the chunks into embeddings.
  - Creating a query.
  - Designing a prompt to extract data from chunks for a specific year.
  - Creating a validation dataset containing five true values extracted from the chunks.
  - Demonstrating that the LLM can retrieve the correct chunks from the embedding object for the specified year.

- **Dataset Description:**  
  - The dataset consists of 10-K filings from the years 2018 to 2020.
  - It includes all sections of the 10-K filing.
  - The analysis will focus on one selected company’s filings from a single year.
  - Five key data attributes will be extracted from the chosen company's annual report.

- **Selection Strategy:**  
  - A single company will be chosen for analysis.
  - The focus will be on one year’s 10-K filing to ensure precision in retrieval.
  - Five specific data attributes will be selected for extraction to validate the effectiveness of the LLM-driven retrieval and extraction process.

> **Note:** This approach ensures that the task remains focused on validating the LLM's capability to retrieve and extract relevant data attributes from embeddings while maintaining scope control.


In [1]:
!spark-submit --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.5
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.26
Branch HEAD
Compiled by user ubuntu on 2025-02-23T20:30:46Z
Revision 7c29c664cdc9321205a98a14858aaf8daaa19db2
Url https://github.com/apache/spark
Type --help for more information.


In [2]:
import sys
sys.path.append("utils")

import re
import math
import json
import datasets
from typing import List
from datasets import load_dataset, concatenate_datasets

import ollama
from ollama import chat
from ollama import ChatResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any

# Spark / Spark NLP
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import E5Embeddings

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, size, udf, expr, split, rand, mean, stddev,
    percentile_approx, when
)
from pyspark.sql.types import ArrayType, StringType, FloatType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, VectorUDT


# Local imports
from convert_dataset_to_spark import convert_dataset_to_spark_df
from section_processing import SectionProcessor
from plot_word_counts import plot_section_word_counts

# TQDM progress
from tqdm.auto import tqdm


# Initialize & Load Data

In [3]:
# 1. Initialize Spark Session
spark = (
    SparkSession.builder
    .appName("EDGARCorpus")
    .master("local[*]")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "32g")
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.5.3")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("OFF")
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.sparkContext.addPyFile("utils/section_processing.py")


:: loading settings :: url = jar:file:/Users/yuxiangwang/venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/yuxiangwang/.ivy2/cache
The jars for the packages stored in: /Users/yuxiangwang/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f8939018-dba4-43b1-9516-96ecd616a606;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.5.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.5

In [4]:
def load_sampled_cik_data(
    years: List[int],
    spark: SparkSession,
    batch_size: int = 300,
    output_dir: str = "parquet_batches_task2",
    dataset_name: str = "eloukas/edgar-corpus",
    split: str = "train",
    random_seed: int = 66
):
    """
    Load multiple 'year_*' configurations for a given Hugging Face dataset, 
    convert them to Spark, ensure only 'cik' with all specified years are retained,
    then randomly pick a single 'cik' and return a subset Spark DataFrame.
    
    Args:
        years (List[int]): List of year values, e.g. [2018, 2019, 2020].
        spark (SparkSession): An active Spark session.
        batch_size (int, optional): Number of HF rows to process per batch 
                                    when converting to Spark. Defaults to 300.
        output_dir (str, optional): Path where parquet batches are written 
                                    during conversion. Defaults to "parquet_batches_task2".
        dataset_name (str, optional): Dataset name on Hugging Face hub. 
                                      Defaults to "eloukas/edgar-corpus".
        split (str, optional): The split name to load (e.g., "train"). Defaults to "train".
        random_seed (int, optional): Seed for reproducible random sampling. Defaults to 66.
    
    Returns:
        DataFrame: A Spark DataFrame containing only one sampled 'cik' 
                   that has data for all requested years.
    
    Raises:
        ValueError: If no data could be loaded for the given years, 
                    or no 'cik' has all specified years.
    """
    # -------------------------------------------------------
    # Step 1: Load multiple Hugging Face 'year_*' configs and concatenate
    # -------------------------------------------------------
    dataset_list = []
    for year in years:
        config_name = f"year_{year}"
        try:
            ds = load_dataset(dataset_name, config_name, split=split)
            dataset_list.append(ds)
            print(f"Loaded data for year={year}.")
        except Exception as e:
            print(f"Warning: Could not load data for year={year}. Error: {e}")

    if not dataset_list:
        raise ValueError(f"No datasets were loaded successfully for years={years}.")

    combined_dataset = concatenate_datasets(dataset_list)
    print(f"Successfully concatenated {len(dataset_list)} dataset(s).")

    # -------------------------------------------------------
    # Step 2: Convert the combined HF dataset to Spark
    # -------------------------------------------------------
    spark_df = convert_dataset_to_spark_df(
        combined_dataset,
        spark,
        batch_size=batch_size,
        output_dir=output_dir
    )
    print(f"Converted dataset to Spark DataFrame with {spark_df.count()} rows.")

    # -------------------------------------------------------
    # Step 3: Filter 'cik' with complete coverage of all years
    # -------------------------------------------------------
    grouped_df = spark_df.groupBy("cik").agg(F.collect_set("year").alias("years"))
    valid_ciks = grouped_df.filter(F.size("years") == len(years))

    # If none has all the specified years, raise an error
    if valid_ciks.count() == 0:
        raise ValueError(
            f"No 'cik' found that contains data for all these years: {years}."
        )

    # -------------------------------------------------------
    # Step 4: Randomly pick one 'cik'
    # -------------------------------------------------------
    sampled_cik_row = valid_ciks.orderBy(F.rand(random_seed)).limit(1).first()
    sampled_cik = sampled_cik_row["cik"]
    print(f"Sampled CIK is: {sampled_cik}")

    # -------------------------------------------------------
    # Step 5: Filter the original Spark DataFrame to just that 'cik' + the chosen years
    # -------------------------------------------------------
    sampled_df = spark_df.filter(
        (F.col("cik") == sampled_cik)
        & (F.col("year").isin(years))
    )

    sampled_df.cache()
    final_count = sampled_df.count()
    print(f"Final sampled_df has {final_count} rows for CIK={sampled_cik} and years={years}.")

    # Return the final Spark DataFrame for further processing
    return sampled_df

In [5]:
years_list = [2018, 2019, 2020]
sampled_df = load_sampled_cik_data(
    years=years_list,
    spark=spark,
    batch_size=300,
    output_dir="parquet_batches_task2",
    dataset_name="eloukas/edgar-corpus",
    split="train",
    random_seed=66
)

Loaded data for year=2018.
Loaded data for year=2019.
Loaded data for year=2020.
Successfully concatenated 3 dataset(s).


Writing batches to Parquet:   0%|          | 0/16342 [00:00<?, ?it/s]

                                                                                

Converted dataset to Spark DataFrame with 16342 rows.


                                                                                

Sampled CIK is: 1035976
Final sampled_df has 3 rows for CIK=1035976 and years=[2018, 2019, 2020].


In [6]:
sampled_df.printSchema()
sampled_df.cache()
_ = sampled_df.count()  

root
 |-- filename: string (nullable = true)
 |-- cik: string (nullable = true)
 |-- year: string (nullable = true)
 |-- section_1: string (nullable = true)
 |-- section_1A: string (nullable = true)
 |-- section_1B: string (nullable = true)
 |-- section_2: string (nullable = true)
 |-- section_3: string (nullable = true)
 |-- section_4: string (nullable = true)
 |-- section_5: string (nullable = true)
 |-- section_6: string (nullable = true)
 |-- section_7: string (nullable = true)
 |-- section_7A: string (nullable = true)
 |-- section_8: string (nullable = true)
 |-- section_9: string (nullable = true)
 |-- section_9A: string (nullable = true)
 |-- section_9B: string (nullable = true)
 |-- section_10: string (nullable = true)
 |-- section_11: string (nullable = true)
 |-- section_12: string (nullable = true)
 |-- section_13: string (nullable = true)
 |-- section_14: string (nullable = true)
 |-- section_15: string (nullable = true)



In [7]:
sampled_df

filename,cik,year,section_1,section_1A,section_1B,section_2,section_3,section_4,section_5,section_6,section_7,section_7A,section_8,section_9,section_9A,section_9B,section_10,section_11,section_12,section_13,section_14,section_15
1035976_2020.htm,1035976,2020,Item 1.\nBusiness...,Item 1A.\nRisk Fa...,Item 1B.\nUnresol...,Item 2.\nProperti...,Item 3.\nLegal Pr...,Item 4.\nMine Saf...,Item 5.\nMarket f...,Item 6.\nSelected...,Item 7.\nManageme...,Item 7A. Quantita...,Item 8. Financial...,Item 9. Changes i...,Item 9A. Controls...,Item 9B.\nOther I...,Item 10.\nDirecto...,Item 11.\nExecuti...,Item 12.\nSecurit...,Item 13.\nCertain...,Item 14.\nPrincip...,Item 15.\nExhibit...
1035976_2018.htm,1035976,2018,Item 1.\nBusiness...,Item 1A.\nRisk Fa...,Item 1B.\nUnresol...,Item 2.\nProperti...,Item 3.\nLegal Pr...,Item 4.\nMine Saf...,Item 5.\nMarket f...,Item 6.\nSelected...,Item 7.\nManageme...,Item 7A. Quantita...,Item 8. Financial...,Item 9. Changes i...,Item 9A. Controls...,Item 9B.\nOther I...,Item 10.\nDirecto...,Item 11.\nExecuti...,Item 12.\nSecurit...,Item 13.\nCertain...,Item 14.\nPrincip...,Item 15.\nExhibit...
1035976_2019.htm,1035976,2019,Item 1.\nBusiness...,Item 1A.\nRisk Fa...,Item 1B.\nUnresol...,Item 2.\nProperti...,Item 3.\nLegal Pr...,Item 4.\nMine Saf...,Item 5.\nMarket f...,Item 6.\nSelected...,Item 7.\nManageme...,Item 7A. Quantita...,Item 8. Financial...,Item 9. Changes i...,Item 9A. Controls...,Item 9B.\nOther I...,Item 10.\nDirecto...,Item 11.\nExecuti...,Item 12.\nSecurit...,Item 13.\nCertain...,Item 14.\nPrincip...,Item 15.\nExhibit...


#  Convert Documents into Chunks

In [8]:
# 1. Convert Documents into Chunks
processor = SectionProcessor(max_words=300, overlap=1)

processed_df = processor.process_all_sections(
    sampled_df, 
    show_progress=True,
    force_action=False
)

chunk_cols = [c for c in processed_df.columns if "_chunk_" in c]
print(f"Found {len(chunk_cols)} chunk columns.")

Processing sections:   0%|          | 0/20 [00:00<?, ?it/s]

                                                                                

Found 282 chunk columns.


In [9]:
# Stack chunk columns into long format: (chunk_col_name, text)
stack_expr = "stack({0}, {1}) as (chunk_col_name, text)".format(
    len(chunk_cols),
    ", ".join([f"'{col}', {col}" for col in chunk_cols])
)

df_long = processed_df.select("cik", "year", expr(stack_expr))
df_long = df_long.filter(df_long["text"].isNotNull())

df_long.printSchema()
df_long.cache()
_ = df_long.count()

root
 |-- cik: string (nullable = true)
 |-- year: string (nullable = true)
 |-- chunk_col_name: string (nullable = true)
 |-- text: string (nullable = true)



                                                                                

In [10]:
df_long

cik,year,chunk_col_name,text
1035976,2020,section_1_chunk_1,Item 1.\nBusiness...
1035976,2020,section_1_chunk_2,●\nPeople - A tea...
1035976,2020,section_1_chunk_3,Telephone banking...
1035976,2020,section_1_chunk_4,The Bank also off...
1035976,2020,section_1_chunk_5,Residential Mortg...
1035976,2020,section_1_chunk_6,"At December 31, 2..."
1035976,2020,section_1_chunk_7,FNCB also partici...
1035976,2020,section_1_chunk_8,FNCB relies prima...
1035976,2020,section_1_chunk_9,The cost of regul...
1035976,2020,section_1_chunk_10,"In general, these..."


# Convert Chunks to Embeddings

In [11]:
# Create a single pipeline with DocumentAssembler + E5 embeddings
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
embedding = E5Embeddings.pretrained("e5_large", "en") \
    .setInputCols(["document"]) \
    .setOutputCol("embeddings")

pipeline = Pipeline().setStages([document_assembler, embedding])

# because E5Embeddings does not actually learn from data in the pipeline.
fit_sample_df = df_long.limit(1)
pipeline_model = pipeline.fit(fit_sample_df)


e5_large download started this may take some time.
Approximate size to download 759.2 MB
e5_large download started this may take some time.
Approximate size to download 759.2 MB
Download done! Loading the resource.
[ / ]



[OK!]


In [12]:
def transform_embeddings(input_df, pipeline_model):
    """
    Transform the input_df using a pre-fitted pipeline_model.
    Extract the embedding array into a Spark ML Vector column 'embeddings_vector'.
    """
    transformed_df = pipeline_model.transform(input_df)
    
    # UDF to convert python array -> Spark ML DenseVector
    def array_to_vector(arr):
        return Vectors.dense(arr) if arr else Vectors.dense([])
    to_vector_udf = udf(array_to_vector, VectorUDT())
    
    # Extract embeddings from the annotation
    df_embeddings = (
        transformed_df
        .withColumn("embeddings_array", expr("transform(embeddings, x -> x.embeddings)[0]"))
        .withColumn("embeddings_vector", to_vector_udf("embeddings_array"))
        .select("cik", "year", "chunk_col_name", "text", "embeddings_vector")
    )
    
    return df_embeddings

In [13]:
df_embeddings = transform_embeddings(df_long, pipeline_model)

In [14]:
df_embeddings.printSchema()

df_embeddings.cache()
_ = df_embeddings.count()

root
 |-- cik: string (nullable = true)
 |-- year: string (nullable = true)
 |-- chunk_col_name: string (nullable = true)
 |-- text: string (nullable = true)
 |-- embeddings_vector: vector (nullable = true)





Using CPUs


                                                                                

In [15]:
df_embeddings

cik,year,chunk_col_name,text,embeddings_vector
1035976,2020,section_1_chunk_1,Item 1.\nBusiness...,[0.00406089192256...
1035976,2020,section_1_chunk_2,●\nPeople - A tea...,[0.00559781398624...
1035976,2020,section_1_chunk_3,Telephone banking...,[-0.0019283706787...
1035976,2020,section_1_chunk_4,The Bank also off...,[0.00454328162595...
1035976,2020,section_1_chunk_5,Residential Mortg...,[0.00315549760125...
1035976,2020,section_1_chunk_6,"At December 31, 2...",[-0.0166966784745...
1035976,2020,section_1_chunk_7,FNCB also partici...,[0.00743138557299...
1035976,2020,section_1_chunk_8,FNCB relies prima...,[0.00184497167356...
1035976,2020,section_1_chunk_9,The cost of regul...,[-0.0049952873960...
1035976,2020,section_1_chunk_10,"In general, these...",[-0.0050230352208...


# Creating a query.

In [16]:
query_text = "What is the company's net income for the year 2018?"

In [17]:
def get_query_embedding(query_text: str, pipeline_model) -> Vectors:
    """
    Given a single query string, transform it with a pre-fitted Spark NLP
    embedding pipeline model to obtain a Spark ML Vector.
    
    Args:
        query_text (str): The input query text.
        pipeline_model: A fitted Spark ML PipelineModel (including a text embedding stage).
    
    Returns:
        Vectors: A Spark DenseVector containing the query embeddings.
    """
    # Create a single-row DataFrame to hold the query text
    df_query = spark.createDataFrame([(query_text, )], ["text"])
    
    # Transform the query text with the fitted pipeline model
    transformed_query = pipeline_model.transform(df_query)
    
    # UDF to convert a Python array to Spark DenseVector
    def array_to_vector(arr):
        return Vectors.dense(arr) if arr else Vectors.dense([])
    to_vector_udf = udf(array_to_vector, VectorUDT())
    
    # Extract the embeddings array from the annotation output
    df_q_emb = (
        transformed_query
        .withColumn("embeddings_array", expr("transform(embeddings, x -> x.embeddings)[0]"))
        .withColumn("query_vector", to_vector_udf(col("embeddings_array")))
        .select("query_vector")
    )
    
    # Collect the single row
    row = df_q_emb.collect()[0]
    return row["query_vector"]


In [18]:
def create_cosine_similarity_udf(query_vector: Vectors):
    """
    Create a Spark UDF that calculates the cosine similarity between
    an 'embeddings_vector' column and the provided query_vector.
    
    Args:
        query_vector (Vectors): A Spark DenseVector representing the query embedding.
    
    Returns:
        A pyspark SQL UDF that takes a DenseVector and returns a float (similarity).
    """
    # Broadcast the query vector to worker nodes for efficient dot-product computation
    bc_query_vec = spark.sparkContext.broadcast(query_vector)
    
    def cos_sim(chunk_vec):
        """
        Compute the cosine similarity between chunk_vec and the broadcast query vector.
        """
        dot_val = chunk_vec.dot(bc_query_vec.value)
        norm_mult = chunk_vec.norm(2) * bc_query_vec.value.norm(2)
        if norm_mult == 0.0:
            return 0.0
        return float(dot_val / norm_mult)
    
    return udf(cos_sim, DoubleType())

# Create a prompt to extract data from chunks from a specific year

In [19]:
# 1. Define a Pydantic model for the LLM output schema
class QAPair(BaseModel):
    """
    A simple schema with 'question' and 'answer' fields. after manual check can be used as val dataset.
    """
    question: str
    answer: str

In [20]:
def extract_qa_from_chunk(
    year: str,
    chunk_col_name: str,
    chunk_text: str,
    model: str = "llama3.3"
) -> Optional[QAPair]:
    """
    Calls Ollama with a structured JSON format prompt, asking the LLM
    to return a JSON object matching QAPair's schema.
    
    Args:
        year (str): The year for this text chunk, e.g. "2018".
        chunk_col_name (str): The chunk column name or identifier.
        chunk_text (str): The actual text of the chunk from the 10-K.
        model (str): Ollama model name, default='llama3.3'.
    
    Returns:
        QAPair: If parsing is successful, returns a QAPair object with
                question and answer attributes.
        None: If the response is not valid JSON or fails validation.
    """
    
    # 2. Build a user prompt that instructs the LLM to produce JSON
    prompt = f"""You are a financial analysis assistant.
You are given a 10-K chunk from a company's filing with following data:
- Year: {year}
- Chunk Name: {chunk_col_name}
- Text Content:

\"\"\"{chunk_text}\"\"\"

From this chunk, please:
#1. Formulate the MOST relevant or significant question about this chunk's financial data.
#2. Provide an answer to that question, based on the chunk.

Your response MUST be valid JSON in the format:
{{
  "question": "...",
  "answer": "..."
}}

Do not include extra keys. No extra commentary—just valid JSON.
"""

    # 3. Call Ollama, requesting the QAPair schema as the output format
    response = chat(
        messages=[{'role': 'user', 'content': prompt}],
        model=model,
        format=QAPair.model_json_schema(),
    )
    #print(prompt)
    # 4. Parse the JSON response using Pydantic
    try:
        qa_obj = QAPair.model_validate_json(response.message.content)
        return qa_obj
    except Exception as e:
        print(f"Failed to parse JSON into QAPair: {e}")
        print("Raw LLM output was:", response.message.content)
        return None

# Create a validation dataset (5 true values from chunks).

In [21]:
# Step 1: Randomly select 10 chunks from df_embeddings
df_meta_val = df_embeddings.orderBy(rand(42)).limit(10)  
rows = df_meta_val.collect()

validation_data = []  # We'll store final results here

In [22]:
# Step 2: For each chunk, call extract_qa_from_chunk with a progress bar
for row in tqdm(rows, desc="Extracting Q/A from chunks"):
    # Convert 'year' to a string if necessary
    year_str = str(row["year"])
    
    chunk_name = row["chunk_col_name"]
    chunk_text = row["text"]
    
    # Call the function that prompts the LLM to extract Q/A in JSON
    qa_result = extract_qa_from_chunk(
        year=year_str,
        chunk_col_name=chunk_name,
        chunk_text=chunk_text,
        model="deepseek-r1:70b"  # or another Ollama model if desired
    )
    
    # If parsing was successful, store the result; otherwise store placeholders
    if qa_result is not None:
        validation_data.append({
            "year": year_str,
            "chunk_col_name": chunk_name,
            "text": chunk_text,
            "question": qa_result.question,
            "answer": qa_result.answer
        })
    else:
        validation_data.append({
            "year": year_str,
            "chunk_col_name": chunk_name,
            "text": chunk_text,
            "question": None,
            "answer": None
        })


Extracting Q/A from chunks:   0%|          | 0/10 [00:00<?, ?it/s]

In [23]:
# Step 3: Display and store the validation dataset

# Step 3.1: Just print them directly (as Python dicts)
for i, item in enumerate(validation_data, start=1):
    print(f"=== Validation Chunk #{i} ===")
    print("Year:", item["year"])
    print("Chunk Column:", item["chunk_col_name"])
    print("\n")
    print("Question:", item["question"])
    print("Answer:", item["answer"])
    print("\n")
    print("---Text ---")
    print(item["text"])
    print("================================\n")


=== Validation Chunk #1 ===
Year: 2020
Chunk Column: section_8_chunk_75


Question: What is the methodology used by FNCB to determine estimated fair value amounts, and what factors could affect these estimates?
Answer: FNCB determines its estimated fair value amounts using available market information and appropriate valuation methodologies. However, management judgment is required to interpret data and develop these estimates. The use of different market assumptions and/or estimation methodologies may have a material effect on the estimated fair value amounts.


---Text ---
The following estimated fair value amounts have been determined using available market information and appropriate valuation methodologies. However, management judgment is required to interpret data and develop fair value estimates. Accordingly, the estimates below are not necessarily indicative of the amounts FNCB could realize in a current market exchange. The use of different market assumptions and/or estimation

In [24]:
validation_data_json = """
[
  {
    "question": "What was the trend in the company's tax-equivalent net interest margin and rate spread in 2019 compared to 2018, and what were the key factors contributing to these changes?",
    "answer": "The company's tax-equivalent net interest margin increased by 5 basis points to 3.27% in 2019 from 3.22% in 2018. The rate spread remained stable at 3.07% for both years. The increase in the net interest margin was driven by a rise in funding costs, particularly an increase in rates paid on interest-bearing deposits and borrowed funds. Specifically, rates on interest-bearing demand deposits increased by 24 basis points, time deposits by 37 basis points, and borrowed funds by 44 basis points, contributing to higher interest expense."
  },
  {
    "question": "What is the company's stance on potential mergers or acquisitions, and how might this impact its shareholders?",
    "answer": "The company has implemented anti-takeover provisions that could discourage, delay, or prevent mergers or acquisitions. These include requiring 75% shareholder approval for certain transactions unless approved by the board of directors, authorizing the board to oppose tender offers, and classifying the board into three classes with staggered terms. These measures may protect the company's strategic direction but could also diminish shareholder value by limiting opportunities for favorable mergers or acquisitions."
  },
  {
    "question": "What methods does management use to determine the fair value of its investment portfolio?",
    "answer": "Management utilizes various inputs, primarily unadjusted quoted market prices in active markets (Level 1) or observable inputs like quotes for similar assets or model outputs (Level 2). When necessary, especially with illiquid markets or unobservable inputs, valuation techniques based on assumptions such as cash flows and discount rates are applied (Level 3)."
  },
  {
    "question": "What is the company's maximum borrowing capacity under different credit facilities as of December 31, 2018?",
    "answer": "As of December 31, 2018, the company's maximum borrowing capacity was $40.0 million under federal funds lines of credit and $9.9 million through the Federal Reserve Discount Window. Additionally, their agreement with the FHLB of Pittsburgh allowed for borrowings up to $344.9 million based on pledged loans."
  },
  {
    "question": "What is the company's organizational structure?",
    "answer": "The company is a registered bank holding company incorporated under Pennsylvania law in 1997 and serves as the parent company to a bank and its subsidiaries, which include realty-related entities. The term 'the company' refers to both the holding company and its consolidated subsidiaries unless otherwise specified. The bank operates several full-service branch locations in its primary market area."
  }
]
"""

validation_data = json.loads(validation_data_json)

for qa in validation_data:
    print("Q:", qa["question"])
    print("A:", qa["answer"])
    print()

Q: What was the trend in the company's tax-equivalent net interest margin and rate spread in 2019 compared to 2018, and what were the key factors contributing to these changes?
A: The company's tax-equivalent net interest margin increased by 5 basis points to 3.27% in 2019 from 3.22% in 2018. The rate spread remained stable at 3.07% for both years. The increase in the net interest margin was driven by a rise in funding costs, particularly an increase in rates paid on interest-bearing deposits and borrowed funds. Specifically, rates on interest-bearing demand deposits increased by 24 basis points, time deposits by 37 basis points, and borrowed funds by 44 basis points, contributing to higher interest expense.

Q: What is the company's stance on potential mergers or acquisitions, and how might this impact its shareholders?
A: The company has implemented anti-takeover provisions that could discourage, delay, or prevent mergers or acquisitions. These include requiring 75% shareholder appro

# Demonstrate that your LLM can retrieve the correct chunks from your embedding object for the correct year

In [25]:
def filter_by_year_and_search(
    year: int,
    query: str,
    top_k: int = 5
) -> str:
    """
    Filters the df_embeddings DataFrame by the specified year, computes the query embedding,
    calculates the cosine similarity with the 'embeddings_vector' column, 
    and selects the top_k most similar chunks. It then concatenates these chunks into a prompt 
    string following the specified structure:
    
    Prompt Structure:
      1) Role: "You are a financial analysis assistant..."
      2) '### Question ###' with the user's query
      3) '### Context (Top {top_k} Chunks) ###' containing:
         - [Chunk Info] (year, chunk_col_name, similarity)
         - [Chunk Text]
      4) A '### Requirements for Response Format ###' section instructing the model
         to respond in JSON format with keys: "year", "financial_factor", "answer", "reference".
    
    Args:
        year (int): The year to filter (e.g., 2018).
        query (str): The user query (e.g., "What is the company's net income for 2018?").
        top_k (int, optional): Number of top similar chunks to include (default=5).
    
    Returns:
        str: A single string containing the assembled prompt with context chunks and instructions.
    """
    # We assume df_embeddings is accessible in this scope. If not, pass it as a parameter.
    # Also assume embedding_pipeline_model is available globally for get_query_embedding().
    from pyspark.sql.functions import col

    # 1) Filter the DataFrame by the specified year
    #    Note: If your 'year' is stored as an integer in df_embeddings, 
    #    cast or compare as needed. If it's stored as string, do .isin([str(year)]).
    df_filtered = df_embeddings.filter(col("year") == str(year))
    
    # 2) Compute the query embedding
    query_vec = get_query_embedding(query, pipeline_model)
    cos_sim_udf = create_cosine_similarity_udf(query_vec)
    
    # 3) Compute similarity, sort descending, and pick top_k
    df_with_sim = df_filtered.withColumn("similarity", cos_sim_udf(col("embeddings_vector")))
    df_topk = df_with_sim.orderBy(col("similarity").desc()).limit(top_k)
    
    # 4) Build context blocks for the top_k chunks
    rows = df_topk.collect()
    context_blocks = []
    for i, r in enumerate(rows, start=1):
        block = (
            f"[Chunk Info]\n"
            f"- year: {r['year']}\n"
            f"- chunk_col_name: {r['chunk_col_name']}\n"
            f"- similarity: {r['similarity']:.4f}\n\n"
            f"[Chunk Text]\n{r['text']}\n"
            "----\n"
        )
        context_blocks.append(block)
    context_str = "\n".join(context_blocks)
    
    # 5) Construct the final prompt
    prompt_result = f"""You are a financial analysis assistant.
Given the following text from a 10-K filing, please answer the question.

### Context (Top {top_k} Chunks) ###

{context_str}

### Requirements for Response Format ###
Your response MUST be valid JSON in the format:
{{
  "year": "...",
  "financial_factor": "...",
  "answer": "...",
  "reference": {{
    "chunk_col_name": ...",
    "text_snippet": "..."
  }}
}}

### Question ###
{query}

If there is no explicit mention in the text, return "unknow".

"""
    return prompt_result

In [26]:
def run_ollama_with_filter_tool(
    query_text: str,
    model: str = "llama3.3"
) -> str:
    """
    A single function that:
      1) Defines the 'tools' list with filter_by_year_and_search.
      2) Calls ollama.chat(...) using the given model (default='llama3.3').
      3) Iterates through response.tool_calls to execute the needed function.
      4) Returns the final 'result_str' produced by the filter_by_year_and_search tool.
    
    Args:
        query_text (str): The user's query or prompt message.
        model (str, optional): The Ollama model name to use. Defaults to 'llama3.3'.
        
    Returns:
        str: The output (prompt or final string) returned by the filter_by_year_and_search tool.
    """
    # 1) Define the tools list (includes your filter_by_year_and_search function)
    tools = [filter_by_year_and_search]

    # 2) Make the Ollama call, passing the 'tools' argument
    response = ollama.chat(
        model=model,
        messages=[{'role': 'user', 'content': query_text}],
        tools=tools,
    )

    # 3) Prepare a lookup dict for function references
    available_functions = {
        'filter_by_year_and_search': filter_by_year_and_search,
    }

    # 4) Iterate over any tool calls that the LLM made
    result_str = None
    for tool_call in response.message.tool_calls or []:
        func = available_functions.get(tool_call.function.name)
        if func:
            # We assume the LLM will supply correct arguments. 
            # For example: {'year': 2018, 'query': "...", 'top_k': 5}
            result_str = func(**tool_call.function.arguments)
            print(result_str)
        else:
            print("Function not found:", tool_call.function.name)
    
    return result_str

In [27]:
query_text = validation_data[3]["question"]
print(query_text)

What is the company's maximum borrowing capacity under different credit facilities as of December 31, 2018?


In [28]:
print(validation_data[3]["answer"])

As of December 31, 2018, the company's maximum borrowing capacity was $40.0 million under federal funds lines of credit and $9.9 million through the Federal Reserve Discount Window. Additionally, their agreement with the FHLB of Pittsburgh allowed for borrowings up to $344.9 million based on pledged loans.


In [29]:
response: ChatResponse = chat(model='deepseek-r1:70b', 
                              messages=[{'role': 'user','content': run_ollama_with_filter_tool(query_text)}],
                              options={"temperature":0})

print(response.message.content)

                                                                                

You are a financial analysis assistant.
Given the following text from a 10-K filing, please answer the question.

### Context (Top 5 Chunks) ###

[Chunk Info]
- year: 2018
- chunk_col_name: section_7_chunk_12
- similarity: 0.8668

[Chunk Text]
FNCB paid dividends to holders of common stock of $0.17 per share and $0.13 per share for the years ended December 31, 2018 and 2017, respectively. Balance Sheet Profile
Total assets increased $75.4 million, or 6.5%, to $1.238 billion at December 31, 2018 from $1.162 billion at December 31, 2017. The increase in total assets primarily reflected strong growth in interest-earning assets. Specifically, loans, net of net deferred costs and unearned income, increased $68.5 million, or 8.9%, to $839.1 million at December 31, 2018 from $770.6 million at December 31, 2017. In addition, securities available for sale increased $6.5 million, or 2.3%, to $296.0 million at December 31, 2018 from $289.5 million at the end of 2017. The asset growth was funded w

In [30]:
# Stop Spark
spark.stop()

# Summary

In this task, I reused some utility functions from Task 1.

## Key Objective  
The main goal of this task is to **demonstrate that the LLM can retrieve the correct chunks from the embedding object for the specified year**. This involves two key aspects:  
1. **Retrieving the correct chunk** based on the query.  
2. **Identifying the correct year from the user query.**  

For the second aspect, I adopted a **function call approach**, where a **customized function** is used. The LLM automatically extracts the year from the query and passes it as an argument to the function, rather than relying on **hard-coded logic** or **NER-based methods**.

## Scalability Considerations  
Given the potential expansion of the dataset to cover **more years, more companies, and more table types**, this function-based approach enables flexible **subset selection** as a core part of the logic.

However, this task does not yet explore challenges related to **cross-table and cross-year retrieval**. If needed, an **agent-based approach** for complex query decomposition could be considered.

## Dataset Limitations  
A crucial challenge in this task is the **poor parsing quality** of the dataset. Without access to the **original filling forms**, some critical **first-hand information** (such as heading levels and structured table data) is lost.

## Future Work  
Potential future improvements include:  
- **Testing different models** to evaluate their retrieval performance.  
- **Expanding the dataset** to enhance generalization.  
- **Evaluating retrieval quality** using metrics such as **recall@N** and **mAP (mean Average Precision)**.
