In [0]:
%pip install pdf2image==1.16.0  unidecode==1.3.6 beautifulsoup4==4.10.0 PyPDF2==3.0.1


In [0]:
dbutils.library.restartPython()

In [0]:
def get_pdf_text(content):
    from PyPDF2 import PdfReader
    from io import BytesIO
    pdf = PdfReader(BytesIO(content))
    page_texts = [pdf.pages[n].extract_text()  for n in range(len(pdf.pages))]
    return '-------page break ------'.join(page_texts)


In [0]:


from pyspark.sql.functions import *
from pyspark.sql.types import StringType


get_pdf_text_udf = udf(get_pdf_text, StringType())




In [0]:
df = spark.read.format("binaryFile").load('/Volumes/workspace/default/data/origin-statement-2025-09-24.pdf')


pdf_df = df.withColumn('text_content', get_pdf_text_udf('content'))
pdf_df.createOrReplaceTempView('pdf_df')

In [0]:
from pyspark.sql.functions import replace, lit, format_string

definition_promot = '''You are a text analyst who needs to extract entities and information from the data given below
    '''
data_content_prompt = '''
    ##### Data starts here ######
    %s
    ##### Data ends here ######
'''

instructions_prompt = '''
    Extract following information from the data: 
    - Name of the person
    - Address
    - Total Amount Due
    - Invoice Date
    - Due Date
    - List of Billing Periods: Period Start Data, Period End Date, kWh consumed
    - Total kWh consumed
    - Total Amount Due
'''

return_format_prompt = '''
    Return the information in the following format:
    {'name': '',
     'address': '',
     'total_amount_due': '',
     'invoice_date': '',
     'due_date': '',
     'billing_periods': [{'period_start_date': '', 'period_end_date': '', 'kwh_consumed': ''}, ...]'''

guard_rail_prompt = '''
    ### Important Instructions ###
    - Do not hallucinate any information
    - Do not add anything to the format other than the information in the data
    - Do not add any text before or after the format above as this needs to be parsed by a program'''

combined_prompt = definition_promot + data_content_prompt + instructions_prompt + return_format_prompt + guard_rail_prompt


add_prompt_df = pdf_df.withColumn('model_input', 
                              format_string(lit(combined_prompt), 
                                            pdf_df.text_content) 
                              )                               

add_prompt_df.createOrReplaceTempView('add_prompt_df')

parsed_df = spark.sql('''
SELECT ai_query('databricks-meta-llama-3-3-70b-instruct', model_input) AS ai_response, *
FROM add_prompt_df
''')


In [0]:
result = parsed_df.select('ai_response', 'path', 'modificationTime', 'length', 'text_content', 'model_input')\
    .withColumn('ai_response', 
                from_json('ai_response', 
                          'name string, address string, total_amount_due string, invoice_date string, due_date string, total_kwh_consumed string, billing_periods string'))\
    .withColumn('name', col('ai_response.name'))\
    .withColumn('address', col('ai_response.address'))\
    .withColumn('total_amount_due', col('ai_response.total_amount_due'))\
    .withColumn('invoice_date', col('ai_response.invoice_date'))\
    .withColumn('due_date', col('ai_response.due_date'))\
    .withColumn('total_kwh_consumed', col('ai_response.total_kwh_consumed'))\
    .withColumn('billing_periods', 
                from_json('ai_response.billing_periods', 
                          'array<struct<period_start_date: string, period_end_date: string, kwh_consumed: string>>'))\
    .drop('ai_response')

result.write.mode('overwrite').saveAsTable('origin_statement_details')



In [0]:
%sql

select text_content from origin_statement_details