In [0]:
%pip install pypdf langchain langchain-text-splitters
dbutils.library.restartPython()

In [0]:
from pypdf import PdfReader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from datetime import datetime
import os

In [0]:
df_existing_file = spark.sql("select distinct file_name from workspace.input_data.drug_file_tracker")
existing_files = [row['file_name'] for row in df_existing_file.collect()]

In [0]:
# Folder path
uc_location_path = "/Volumes/workspace/input_data/pdf_drugs/"

# Recursive splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=150,
    separators=["\n\n", "\n", ".", " ", ""]
)

# Iterating through the files
for file in os.listdir(uc_location_path): 
    if file.endswith(".pdf"):
        if file not in existing_files:

            spark.sql(f"""INSERT INTO workspace.input_data.drug_file_tracker (file_name)VALUES ('{file}')""")

            file_path = os.path.join(uc_location_path, file)
            file_name = file

            # Extract drug name from filename
            drug_name = file_name.split(".")[0]
            print(f"Processing: {file_name}")
        
            # Read PDF
            reader = PdfReader(file_path)
        
            full_text = ""
            for page in reader.pages:
                page_text = page.extract_text()
                if page_text:
                    full_text += page_text + "\n"
        
            # Skip empty documents
            if not full_text.strip():
                print(f"Skipped empty file: {file_name}")
                continue
        
            # Recursive chunking
            chunks = text_splitter.split_text(full_text)
        
            # Prepare dataframe
            current_time = datetime.now()
            data = [(file_name, drug_name, chunk, current_time) for chunk in chunks]
        
            df = spark.createDataFrame(
                data,
                ["file_name", "drug_name", "chunk_text", "created_at"]
            )
        
            # Append to Delta table
            df.write.mode("append").saveAsTable("workspace.input_data.drug_file_chunk")
        
            print(f"Inserted {len(chunks)} chunks from {file_name}")

        else:
            print(f"Skipping existing file: {file} as it has been already chunked")

print("All files processed successfully.")
