In [0]:
# Install required libraries
%pip install pdfplumber python-docx

In [0]:
%pip install striprtf

In [0]:
%restart_python

In [0]:
# Databricks Notebook: 01_ingest_documents (converted for Databricks notebook cell use)

# COMMAND ----------
from pyspark.sql.functions import lit, monotonically_increasing_id
import os
import pdfplumber
import docx

# COMMAND ----------
# Define DBFS path for uploaded documents (you can upload using Databricks UI)
DOCS_PATH = "/Volumes/docai-dbx/source_data/source_files"

# COMMAND ----------
# Function to extract text from PDF
def extract_text_from_pdf(file_path):
    try:
        with pdfplumber.open(file_path) as pdf:
            return "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return ""

# COMMAND ----------
# Function to extract text from DOCX
def extract_text_from_docx(file_path):
    try:
        doc = docx.Document(file_path)
        return "\n".join([para.text for para in doc.paragraphs])
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return ""

# COMMAND ----------
# Function to read TXT
# COMMAND ----------
def extract_text_from_txt(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        # Check for RTF signature
        if content.strip().startswith("{\\rtf"):
            from striprtf.striprtf import rtf_to_text
            return rtf_to_text(content)
        else:
            return content
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return ""

# COMMAND ----------
# Load all documents and extract text
def load_documents(doc_dir):
    docs = []
    for file in os.listdir(doc_dir):
        path = os.path.join(doc_dir, file)
        if file.endswith(".pdf"):
            text = extract_text_from_pdf(path)
        elif file.endswith(".docx"):
            text = extract_text_from_docx(path)
        elif file.endswith(".txt"):
            text = extract_text_from_txt(path)
        else:
            continue
        docs.append((file, text))
    return docs

# COMMAND ----------
# Ingest documents
doc_data = load_documents(DOCS_PATH)

# COMMAND ----------
# Convert to Spark DataFrame
df_docs = spark.createDataFrame(doc_data, ["file_name", "raw_text"])
df_docs = df_docs.withColumn("doc_id", monotonically_increasing_id())

# COMMAND ----------
# Save to Delta Lake
(df_docs
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable(f"`docai-dbx`.bronze.documentData"))

# COMMAND ----------
print("✅ Document ingestion completed.")