%md

# Scaling up parsing routines

Lets look at how to scale up parsing leveraging a spark cluster

As a general guidance if you have less than ?couple of dozen? files then it may not be worth distributing across a cluster 

In [None]:
%pip install -U pymupdf4llm langchain databricks-langchain
%restart_python

In [None]:
import os
import pymupdf4llm

catalog = 'brian_ml_dev'
schema = 'parsing_tests'
volume = 'raw_data'

llm_model_name = 'brian_serving_test'
llm_model_name = 'databricks-meta-llama-3-1-70b-instruct'

full_vol_path = f'/Volumes/{catalog}/{schema}/{volume}'

example_files = [f for f in os.listdir(f'{full_vol_path}')  if f.endswith('.pdf')]
example_files

In [None]:
# parse all files into a dataframe

raw_files_df = (
    spark.read.format("binaryFile")
    .option("recursiveFileLookup", "true")
    .option("pathGlobFilter", f"*.pdf")
    .load(full_vol_path)
)

display(raw_files_df)

# Parsing Function

At it's most basic level a parsing function just takes an input and produces an output

In this case we get a filepath and we output markdown text

In [None]:


def pymupdf_parse(file_path: str) -> str:
    markdown_text = pymupdf4llm.to_markdown(file_path)
    
    return markdown_text

We then need to convert this to a pyspark udf in order to distribute it on spark

This requires that we set it up with an output signature

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# for out basic example, the types are: string

pymupdf_parse_udf = F.udf(
    pymupdf_parse,
    returnType=T.StringType()
)

In [None]:
parsed_files = raw_files_df \
    .withColumn('file_path', F.substring('path', 6, F.length('path'))) \
    .withColumn('markdown', pymupdf_parse_udf("file_path"))

In [None]:
display(parsed_files)