In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
from utils.get_path_of_files_modified_in_last_day import get_path_of_files_modified_in_last_day
from utils.extract_pdf_text_basic import extract_pdf_text_basic
from utils.get_form10k_metadata_from_pdf_text_with_llm import get_form10k_metadata_from_pdf_text_with_llm, is_llm_response_valid, parse_valid_llm_response
from utils.create_unique_file_hash import create_unique_pdf_hash
from utils.convert_pdf_to_markdown_mistral import convert_pdf_to_markdown_mistral

## What this notebook does
1. Scan Volume for PDFs modified in last 24 hours
2. Extract metadata from document title page using 
3. Extract metadata from PDF and check if document has already been processed
4. If document is new then convert to markdown and chunk text
5. Save metadata and chunk text to tables

## Check volume for recently modified PDF files


In [0]:
pdfs_volume_path = "/Volumes/databricks_examples/financial_rag/form10k_pdfs"
pdfs_to_process = get_path_of_files_modified_in_last_day(pdfs_volume_path)

## Convert PDF to text
Crude conversion to extract metadata via LLM and create unique document hash


In [0]:
pdf_text = extract_pdf_text_basic(pdfs_to_process[0]) # start with one for now

## Extract First Page Metadata with LLM


In [0]:
raw_response = get_form10k_metadata_from_pdf_text_with_llm(pdf_text[0], "openai-completion-endpoint")

isResponseValid = is_llm_response_valid(raw_response)

if not isResponseValid:
    # TO DO: break out of file loop
    pass

json_metadata = parse_valid_llm_response(raw_response)


## If Document Already Exists Check For New Text Content


In [0]:
file_number = json_metadata['fileNumber']
unique_document_hash = create_unique_pdf_hash(file_number, "".join(pdf_text))

query = f"""
SELECT COUNT(*)
FROM databricks_examples.financial_rag.pdf_metadata
WHERE fileNumber = '{json_metadata['fileNumber']}'
AND documentHash = '{unique_document_hash}'
"""

# if the file number and unique hash exist the file and existing content were already processed
doesRecordExist = spark.sql(query).collect()[0][0] > 0

if doesRecordExist:
    # TO DO: break out of file loop
    print('record exists')
    pass

pdf_markdown = convert_pdf_to_markdown_mistral(pdfs_to_process[0], dbutils.secrets.get(scope="mistral", key="apikey"))

In [0]:
json_metadata['documentHash'] = unique_document_hash

# Convert json_metadata to a DataFrame
json_metadata_df = spark.createDataFrame([json_metadata])

# Upsert the json_metadata into the pdf_metadata table
json_metadata_df.createOrReplaceTempView("json_metadata_view")

upsert_query = """
MERGE INTO databricks_examples.financial_rag.pdf_metadata AS target
USING json_metadata_view AS source
ON target.fileNumber = source.fileNumber
WHEN MATCHED THEN
  UPDATE SET
    companyName = source.companyName,
    tradingSymbol = source.tradingSymbol,
    fiscalYearEndDate = source.fiscalYearEndDate,
    documentHash = source.documentHash
WHEN NOT MATCHED THEN
  INSERT (fileNumber, companyName, tradingSymbol, fiscalYearEndDate, documentHash)
  VALUES (source.fileNumber, source.companyName, source.tradingSymbol, source.fiscalYearEndDate, source.documentHash)
"""

spark.sql(upsert_query)

In [0]:
# Step 1: Drop any rows with the fileNumber
drop_query = f"""
DELETE FROM databricks_examples.financial_rag.pdf_text
WHERE edgar_pdf_metadata_fileNumber = '{json_metadata['fileNumber']}'
"""
spark.sql(drop_query)

# Step 2: Upsert new data
page_data = [
    (json_metadata['fileNumber'], content.markdown, page_number + 1)
    for page_number, content in enumerate(pdf_markdown.pages)
]

# Create DataFrame for all pages at once
page_df = spark.createDataFrame(page_data, ["edgar_pdf_metadata_fileNumber", "text", "pageNumber"])
page_df.createOrReplaceTempView("page_data_view")

# Step 3: Insert all data at once
insert_query = """
INSERT INTO databricks_examples.financial_rag.pdf_text 
(edgar_pdf_metadata_fileNumber, text, pageNumber)
SELECT edgar_pdf_metadata_fileNumber, text, pageNumber FROM page_data_view
"""
spark.sql(insert_query)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS databricks_examples.financial_rag.pdf_text (
    id BIGINT GENERATED ALWAYS AS IDENTITY,
    edgar_pdf_metadata_fileNumber BIGINT,
    text STRING,
    pageNumber INT,
    FOREIGN KEY (edgar_pdf_metadata_fileNumber) REFERENCES databricks_examples.financial_rag.pdf_metadata(fileNumber)
) TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:
query = "SELECT * FROM databricks_examples.financial_rag.pdf_text"
result_df = spark.sql(query)
display(result_df.limit(50))

# 5bda4e72a9b82f27960c5d963d05b7d10802458b5911b867ac135eac91d94ab9

In [0]:
%sql
CREATE TABLE IF NOT EXISTS databricks_examples.financial_rag.pdf_metadata (
    fileNumber BIGINT PRIMARY KEY,
    companyName STRING,
    tradingSymbol STRING,
    fiscalYearEndDate STRING,
    documentHash STRING
) TBLPROPERTIES (delta.enableChangeDataFeed = true);


In [0]:
import sqlite3

# Initialize Spark session
connection_obj = sqlite3.connect('geek.db')
 
# cursor object
cursor_obj = connection_obj.cursor()
 
# Drop the GEEK table if already exists.
cursor_obj.execute("DROP TABLE IF EXISTS GEEK")
 
# Creating table
table = """ CREATE TABLE GEEK (
            Email VARCHAR(255) NOT NULL,
            First_Name CHAR(25) NOT NULL,
            Last_Name CHAR(25),
            Score INT
        ); """
 
cursor_obj.execute(table)

In [0]:
json_data