# POC 2.0: AI_Query in Databricks
- AI_Query calls the LLM model of choice
- This notebook intelligently extract data from Customer Invoice PDF files into structured format
- The structured data is pushed to Delta table for analysis 

## Convert PDF Invoices to Delta table in Structured JSON format using AI_Query

## 01. Bronze Layer
#### Store data in Structured format from unstructured - PDF extract 

In [0]:
import dlt
import urllib3
import re

@dlt.table(
    comment="Silver: Raw structured and summarized invoice data extracted using AI from invoice PDFs"
)
def dlt_bronze_invoices_raw():
    return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "binaryFile")
            .load("/Volumes/pdev/shaurya/documents/Invoices/")
            .selectExpr(
                "path",
                """
                regexp_replace(
                    regexp_replace(
                        ai_query(
                            "databricks-claude-sonnet-4",
                            "You are an invoice parser. Output only a compact JSON object with keys:
                             InvoiceNumber, InvoiceDate, CustomerID, DueDate, BillTo, ShipTo,
                             LineItems:[{Product,Qty,UnitPrice,ExtendedPrice,SalesTax,Total}],
                             Totals:{Subtotal,Tax,TotalInvoice}.
                             Rules:
                               - Get Invoice Number, we can infer the invoice number from the path provided at the beginning of your query after dbfs:/Volumes/pdev/shaurya/documents/Invoices, it is between two underscores usually longest 12 digit long,
                                 Do NOT use 'Document details' (e.g., CON…).
                               - For InvoiceDate, get it from Date: at the top, do not show any other text
                               - Return only JSON, no commentary.
                               - Extract all the line level information for eg: some invoice may have 120 line items
                               - Only keep the data between first { and last } inclusive "
                            || ' ' || CAST(path AS STRING) || ' ' || CAST(ai_parse_document(content) AS STRING)
                        ),
                        '^```json', ''   -- remove leading ```json
                    ),
                    '```$', ''          -- remove trailing ```
                ) AS structured_invoice
                """,
                """
                ai_query(
                    "databricks-meta-llama-3-3-70b-instruct",
                    "Summarize this invoice document in plain text for a human reader. Document: "
                    || CAST(ai_parse_document(content) AS STRING)
                ) AS raw_text
                """
            )
    )


## 02. Silver Layer
#### Get Insights from Invoice PDF extracts and Customer information available online
- Get relevant data in structured format from Invoice PDF's
- Get relevant data from web scrapping wikipedia about company information

In [0]:
@dlt.table(
    comment="Silver: Structured and enriched invoice data extracted from PDF using AI"
)
def dlt_silver_invoices_info():
    return spark.sql("""
        WITH pdf_parsed AS (
            SELECT
                structured_invoice,
                from_json(
                    structured_invoice,
                    'struct<
                        InvoiceNumber:string,
                        InvoiceDate:string,
                        CustomerID:string,
                        DueDate:string,
                        BillTo:string,
                        ShipTo:string,
                        LineItems:array<struct<
                            Product:string,
                            Description:string,
                            Qty:double,
                            UnitPrice:double,
                            ExtendedPrice:double,
                            SalesTax:double,
                            Total:double
                        >>,
                        Totals:struct<
                            Subtotal:double,
                            Tax:double,
                            TotalInvoice:double
                        >
                    >'
                ) AS parsed
            FROM shaurya.dlt_bronze_invoices_raw
        ),
        final AS (
            SELECT DISTINCT 
                parsed.InvoiceNumber                                                       AS invoice_num,
                parsed.InvoiceDate                                                         AS invoice_date,
                parsed.CustomerID                                                          AS customer_id,
                parsed.DueDate                                                             AS due_date,
                parsed.BillTo                                                              AS bill_to,
                parsed.ShipTo                                                              AS ship_to,
                parsed.Totals.Subtotal                                                     AS Invoice_subtotal,
                parsed.Totals.Tax                                                          AS Invoice_tax,
                parsed.Totals.TotalInvoice                                                 AS Invoice_total,
                li.Product                                                                 AS line_product,
                li.Description                                                             AS line_description,
                li.Qty                                                                     AS line_qty,
                li.UnitPrice                                                               AS line_unit_price,
                li.ExtendedPrice                                                           AS line_extended_price,
                li.SalesTax                                                                AS line_sales_tax,
                li.Total                                                                   AS line_total
            FROM pdf_parsed
            LATERAL VIEW explode(parsed.LineItems) li_tbl AS li
        )
        SELECT DISTINCT
            coalesce(cad.Cus_AR_Num ,pdf.customer_id)                                  AS Extracted_customer_id,
            cad1.Cus_Acct_Name                                                         AS Enriched_Customer_Name,
            cad.Cus_Acct_Name                                                          AS EDW_Customer_Name,
            pd.Prod_Name                                                               AS EDW_product_name,
            svcpd.Prod_Name                                                            AS EDW_covered_product_name,
            pdf.invoice_num                                                            AS Extracted_invoice_num,
            coalesce(invoice_date.Calendar_Date ,pdf.invoice_date)                     AS Extracted_invoice_date,
            pdf.due_date                                                               AS Extracted_due_date,
            pdf.bill_to                                                                AS Extracted_bill_to,
            pdf.ship_to                                                                AS Extracted_ship_to,
            pdf.Invoice_subtotal                                                       AS Extracted_Invoice_subtotal,
            sum(f.Invoice_Ln_Tax_Amt_TC)                                               AS EDW_Invoice_tax,
            pdf.Invoice_tax                                                            AS Extracted_Invoice_tax,
            f.Total_Invoice_Amt_TC                                                     AS EDW_Invoice_total,
            pdf.Invoice_total                                                          AS Extracted_Invoice_total,
            pdf.line_product                                                           AS Extracted_line_product,
            pdf.line_description                                                       AS Extracted_line_description,
            pdf.line_qty                                                               AS Extracted_line_qty,
            pdf.line_unit_price                                                        AS Extracted_line_unit_price,
            pdf.line_extended_price                                                    AS Extracted_line_extended_price,
            pdf.line_sales_tax                                                         AS Extracted_line_sales_tax,
            pdf.line_total                                                             AS Extracted_line_total
        FROM final pdf 
        LEFT JOIN edw.customer_invoice_line_fact f
               ON f.invoice_num = pdf.invoice_num
        LEFT JOIN edw.customer_account_dim cad1 
               ON pdf.customer_id = CASE WHEN cad1.Cus_AR_Num IS NOT NULL AND cad1.Cus_AR_Num <> -1 
                                        THEN cad1.Cus_AR_Num
                                        ELSE cad1.Cus_D365_Account_Num
                                        END
        LEFT JOIN edw.customer_account_dim cad 
               ON f.customer_account_dim_id = cad.customer_account_dim_id 
        LEFT JOIN edw.product_dim pd 
               ON f.product_dim_id = pd.product_dim_id 
        LEFT JOIN edw.product_dim svcpd 
               ON f.covered_product_dim_id = svcpd.product_dim_id
        LEFT JOIN edw.date_dim invoice_date 
               ON f.Invoice_Document_Date_Dim_Id = invoice_date.date_dim_id 
        group by ALL
        ORDER BY Extracted_customer_id, Extracted_invoice_num
    """)

In [0]:
import dlt
import urllib3
import re
import requests
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from pyspark.sql.functions import coalesce

@dlt.table(
    comment="Silver: Wikipedia enrichment for customer companies"
)
def dlt_silver_wikipedia_customer_info():
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    @udf(returnType=StringType())
    def search_wikipedia_and_get_content(search_term):
        if search_term is None:
            return "No results found for your search term."
        search_url = "https://en.wikipedia.org/w/api.php"
        headers = {
            "User-Agent": "UKG-InvoiceParser/1.0 (shaurya.rawat@ukg.com)"
        }
        search_params = {
            "action": "query",
            "format": "json",
            "list": "search",
            "srsearch": search_term
        }
        try:
            search_response = requests.get(
                search_url,
                params=search_params,
                headers=headers,
                verify=False,
                timeout=10
            ).json()
            if not search_response["query"]["search"]:
                return "No results found for your search term."
            page_title = search_response["query"]["search"][0]["title"]
            content_params = {
                "action": "query",
                "format": "json",
                "prop": "extracts",
                "titles": page_title,
                "exlimit": 1,
                "exsectionformat": "wiki"
            }
            content_response = requests.get(
                search_url,
                params=content_params,
                headers=headers,
                verify=False,
                timeout=10
            ).json()
            page_id = list(content_response["query"]["pages"].keys())[0]
            return content_response["query"]["pages"][page_id]["extract"]
        except Exception as e:
            return f"Error fetching Wikipedia data: {e}"

    @udf(returnType=StringType())
    def extract_company_name(bill_to: str):
        if not bill_to or not isinstance(bill_to, str):
            return None
        text = bill_to.strip()
        match = re.search(r'\b(?:Inc|LLC|Ltd|Corp|Co)\b', text, flags=re.IGNORECASE)
        if match:
            end_idx = match.end()
            return text[:end_idx].strip()
        text = re.split(r'[:.,]', text)[0]
        return text.strip()

    invoices_df = dlt.read_stream("dlt_silver_invoices_info")
    
    # Apply PySpark UDFs using DataFrame transformations
    wiki_df = (
        invoices_df
        .select(
            "Enriched_Customer_Name",
            "Extracted_customer_id",
            "EDW_Customer_Name",
            "Extracted_bill_to"
        )
        .distinct()
        .withColumn("company_name", extract_company_name(col("Extracted_bill_to")))
        .withColumn( "wikipedia_text",
            coalesce(
                search_wikipedia_and_get_content(col("Enriched_Customer_Name")),
                search_wikipedia_and_get_content(col("company_name"))
            )
        )
    )

    wiki_df.createOrReplaceTempView("staging_customer_wiki_data")

    return spark.sql("""
        SELECT
            Extracted_customer_id,
            Enriched_Customer_Name,
            COALESCE(get_json_object(structured_info, '$.industry'), 'Unknown')          AS Wikipedia_industry,
            COALESCE(get_json_object(structured_info, '$.facilities_count'), '0')        AS Wikipedia_facilities_count,
            COALESCE(get_json_object(structured_info, '$.employee_count'), '0')          AS Wikipedia_employee_count,
            COALESCE(get_json_object(structured_info, '$.country_count'), '0')           AS Wikipedia_country_count,
            COALESCE(get_json_object(structured_info, '$.revenue'), '0')                 AS Wikipedia_revenue,
            COALESCE(category_info, 'Unknown')                                           AS Wikipedia_category,
            COALESCE(get_json_object(structured_info, '$.competitors'), '[]')            AS Wikipedia_competitors
        FROM (
            SELECT
                Extracted_customer_id,
                coalesce(Enriched_Customer_Name, EDW_Customer_Name) as Enriched_Customer_Name,
                ai_query(
                    "databricks-meta-llama-3-3-70b-instruct",
                    "You are extracting structured company info. \\n\\n
                    Extract into JSON with fields:\\n
                      - industry\\n
                      - facilities_count\\n
                      - employee_count\\n
                      - country_count\\n
                      - competitors\\n
                      - revenue\\n\\n
                    - Industry is the primary work of company like software solution, consulting service, healthcare, retail store, fitness technology etc\\n
                    - employee count is number of employees that the company has overall globally, employee count if 0, set to null\\n
                    - country count is number of countries that the company is operating in, set to null if it is 0 \\n
                    - competitors are the alternative solutions that the company is competing with in same industry\\n
                    - get revenue of company in format for eg billion usd, million usd, hundered thousand usd\\n
                    - revenue can't be 0, if it is set to 0, then make UNKNOWN
                    - Only return JSON data (no commentary, no ```json fences)\\n\\n
                    Text: " || wikipedia_text
                ) AS structured_info,

                ai_query(
                    "databricks-meta-llama-3-3-70b-instruct",
                    "Classify this company into a single category based on its overall size and reach. \\n
                     Categories:\\n
                       - Small Business (< 500 employees)\\n
                       - Mid-Market (500 – 5,000 employees)\\n
                       - Enterprise (5,000 – 50,000 employees or global revenue > 10 billion dollar)\\n
                       - Global Enterprise (> 50,000 employees or operates in multiple continents)\\n\\n
                     - Only Return only the category name (Small Business, Mid-Market, Enterprise, Global Enterprise) based on company, not parent company\\n
                     - Do not show any other content\\n
                     Text: " || wikipedia_text
                ) AS category_info
            FROM staging_customer_wiki_data
        )
    """)

## 02. Gold Layer
#### Create production ready curated and enriched customer insights from Invoice PDF nd Wikipedia Source

In [0]:
@dlt.table(
    comment="Gold: Final enriched invoices with invoice + Wikipedia data"
)
def dlt_gold_enriched_customer_insights():
    return spark.sql("""
      SELECT
        inv.*,
        wiki.Wikipedia_industry         AS Wikipedia_industry,
        wiki.Wikipedia_facilities_count AS Wikipedia_facilities_count,
        wiki.Wikipedia_employee_count   AS Wikipedia_employee_count,
        wiki.Wikipedia_country_count    AS Wikipedia_country_count,
        wiki.Wikipedia_competitors      AS Wikipedia_competitors,
        wiki.Wikipedia_revenue          AS Wikipedia_revenue,
        wiki.Wikipedia_category         AS Wikipedia_category
      FROM LIVE.dlt_silver_invoices_info inv
      LEFT JOIN LIVE.dlt_silver_wikipedia_customer_info wiki
        ON inv.Extracted_customer_id = wiki.Extracted_customer_id
    """)