# Invoice Parsing Notebook
The notebook automatically extracts structured data from advertising invoices (PDFs) issued by advertising platforms like Meta, Google Ads, Microsoft Ads, and DerbySoft. 
<br>
After extraction, it reads the data into a table for further querying/transformations (optional), and it also sends the extracted data as a file attachment directly to the specified email address (optional). Additionally, the data can be downloaded directly from the notebook's Output window.
<br>
<br>
### **About the solution** <br>

The code implements a robust, flexible invoice parsing system capable of extracting structured data from diverse invoice formats, including tables with missing headers, multiple currency and decimal formats, and vendor-specific variations. The solution intelligently identifies the required attributes, such as invoice number, invoice currency, advertising company, account name/ID; it normalizes amounts, and optionally filters summary or regulatory rows, reducing manual processing time and error risk. Its design is modular and production-ready, enabling easy adaptation to new invoice layouts without rewriting core logic.

### **Usage**  
- The notebook can be run in any Databricks environment (including Free Edition). Ensure the `ai_parse_document` function is available in your Databricks environment.
- Modify the `INVOICE_KEYWORDS`, `CURRENCY_CODES`, `ACCOUNT_KEYWORDS`, `MANUAL_OVERRIDES`, `VENDOR_PATTERNS`,`ROW_EXCLUDE_KEYWORDS`, `ROW_OPTIONAL_EXCLUDE_KEYWORDS`, `EXACT_EXCLUDE`, `HOTEL_DESC_KEYWORDS`,`DESC_KEYWORDS`,`AMT_KEYWORDS` lists to customize the code to fit your needs. 
- To improve OCR accuracy, you can convert to higher resolution PDFs in a local environment (e.g. PyCharm) or other, install the latest [Poppler library for Windows](https://github.com/oschwartz10612/poppler-windows/releases/), then import the _pdf2image_ module and run your code. Additionally, run `%pip install pdf2image` in Databricks before executing the notebook with your higher resolution files. To check DPI information, you can follow [this link](https://apitemplate.io/pdf-tools/free-pdf-dpi-analyzer/).

### **Additional notes**<br>

`ai_parse_document` can be cluster-dependent.
Different Databricks clusters may have different versions of the AI runtime or model. Even a different cluster in the same workspace can have slightly different parsing behavior, especially with PDFs.  Small differences in how the AI runtime handles tables, whitespace, fonts, or line breaks - all of this can lead to slightly different outputs.
<br>
<br>

## Workflow

**Create a Volume and upload the files**
<br>
Use the UI or Notebook to create the volume and upload the files

**Convert the PDFs (Optional)**<br>Install Poppler library<br>Install the module<br>Run the code in your chosen environment to convert to DPI 300

**Install the pdf2image module in Databricks (Optional)**

In [0]:
# Check if the module is installed already
try:
    import pdf2image
    print("pdf2image is installed")
except ImportError:
    print("pdf2image is NOT installed")

In [0]:
# Run the following command to install the module
%pip install pdf2image

**Execute the code**<br>Modify the necessary variables

In [0]:
"""
This code and its underlying logic are the intellectual property of Anete Mikelsone.
Unauthorized use, modification of core logic, or redistribution is prohibited without explicit permission.

The code may be used internally by the team for internal business purposes, provided that:
- The original author is credited
- The code is not redistributed publicly

Configuration-level adjustments (such as modifying keyword lists, vendor names, or filtering parameters)
are permitted for internal use and do not constitute modification of the core logic.

Created by: Anete Mikelsone
Date: January 13, 2026
"""

from bs4 import BeautifulSoup
from pyspark.sql.functions import udf, explode, col, expr
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
import re

# =========================
# INVOICE NUMBER EXTRACTION
# =========================

# Put "invoice #" as the first keyword because Meta has "Invoice number must be referenced..." phrase
INVOICE_KEYWORDS = [
    "invoice #",
    "invoice number",
    "rechnungsnummer",
    "invoice no",
    "rechnung nr"
]


def extract_invoice_number(elements):
    if not elements or not isinstance(elements, list):
        return None

    # Normalize elements into plain dicts
    norm_elements = []
    for el in elements:
        if isinstance(el, dict):
            norm_elements.append(el)
        else:
            norm_elements.append(el.asDict())

    for i, el in enumerate(norm_elements):
        content = el.get("content")
        if not content:
            continue

        content_lower = content.lower()

        for kw in INVOICE_KEYWORDS:
            if kw not in content_lower:
                continue

            # Combine current + next element in case it's split between two elements
            combined = content
            if i + 1 < len(norm_elements):
                combined += " " + (norm_elements[i + 1].get("content") or "")

            # Strict regex - MUST have a keyword + separator
            match = re.search(
                rf"{re.escape(kw)}\s*[:#\-]\s*([A-Za-z0-9\-]+)",
                combined,
                flags=re.IGNORECASE
            )
            if match:
                return match.group(1)

            # Relaxed regex - allows no separator, but MUST contain digits
            match = re.search(
                rf"{re.escape(kw)}.*?([A-Za-z]{{0,3}}\d{{5,}})",
                combined,
                flags=re.IGNORECASE
            )
            if match:
                return match.group(1)

    return None

invoice_udf = udf(extract_invoice_number, StringType())

# =========================
# CURRENCY CODE EXTRACTION
# =========================

CURRENCY_CODES = ["EUR", "USD", "GBP", "CHF", "CAD", "AUD", "JPY"]

INLINE_CURRENCY_REGEX = re.compile(
    r"\b(EUR|USD|GBP|CHF|CAD|AUD|JPY)\b|\((EUR|USD|GBP|CHF|CAD|AUD|JPY)\)",
    re.IGNORECASE
)


def extract_currency(elements):
    if not elements:
        return None

    texts = []
    for el in elements:
        try:
            text = el["content"]
        except Exception:
            text = None

        if text:
            texts.append(text)

    # Look for explicit currency codes
    for text in texts:
        m = INLINE_CURRENCY_REGEX.search(text)
        if m:
            return (m.group(1) or m.group(2)).upper()

    #Look for currency symbols
    for text in texts:
        if "€" in text:
            return "EUR"
        if "£" in text:
            return "GBP"
        if "$" in text:
            return "USD"

    return None

currency_udf = udf(extract_currency, StringType())

# =========================
# ACCOUNT ID/NAME EXTRACTION
# =========================

ACCOUNT_KEYWORDS = [
    "account id",
    "account id / group",
    "account name",
    "billing account",
    "account:",
    "customer",
    "advertiser",
    "werbekunde"
]
# Hard-coded overrides for tricky invoices
MANUAL_OVERRIDES = {
    "facebook": "951120803112976",
    "derbysoft": "Jakala UK Ltd"
}


def extract_advertising_account(elements):
    if not elements:
        return None

    norm_elements = [
        el if isinstance(el, dict) else el.asDict()
        for el in elements
    ]

    # =========================
    # META: Account Id / Group is split across table cells
    # =========================
    for el in norm_elements:
        content = el.get("content") or ""
        if el.get("type") == "table" and "account id / group" in content.lower():
            soup = BeautifulSoup(content, "html.parser")
            for tr in soup.find_all("tr"):
                tds = tr.find_all("td")
                if len(tds) == 2:
                    label = tds[0].get_text(strip=True).lower()
                    value = tds[1].get_text(strip=True)
                    if "account id / group" in label:
                        m = re.search(r"\b\d{6,}\b", value)
                        if m:
                            return m.group(0)

    # =========================
    # GOOGLE ADS: dashed account id (e.g., 138-458-8188)
    # =========================
    for el in norm_elements:
        content = el.get("content") or ""
        if "account id" in content.lower():
            m = re.search(r"\b\d{3}-\d{3}-\d{4}\b", content)
            if m:
                return m.group(0)
    # ==========================
    # MICROSOFT ADS account id
    # =========================
    BING_ACCOUNT_ID_LABELS = ["kontonummer", "account number", "account id"]
    BING_ACCOUNT_ID_EXCLUDE_CONTENT = ["swift", "iban","bank"]
    BING_ACCOUNT_ID_REGEX = re.compile(r"\b[A-Z0-9]{6,12}\b")

    for i, el in enumerate(norm_elements):
        content = (el.get("content") or "").strip().lower()
        if el.get("type") != "text":
            continue

        if any(lbl in content for lbl in BING_ACCOUNT_ID_LABELS):
            # Look ahead a few elements (ordering is not guaranteed)
            for j in range(i + 1, min(i + 4, len(norm_elements))):
                candidate = (norm_elements[j].get("content") or "").strip()
                candidate_lower = candidate.lower()

                if any(excl in candidate_lower for excl in BING_ACCOUNT_ID_EXCLUDE_CONTENT):
                    continue

            # Match candidate with regex
                if BING_ACCOUNT_ID_REGEX.match(candidate):
                    return candidate

    # =========================
    # Fallback: keyword + value in the same string
    # =========================
    for el in norm_elements:
        content = el.get("content") or ""
        text_clean = " ".join(content.split())
        text_lower = text_clean.lower()

        for kw in ACCOUNT_KEYWORDS:
            if kw not in text_lower:
                continue

            m = re.search(
                rf"{re.escape(kw)}\s*[:\-]?\s*([A-Za-z0-9&.,()'\-/ ]{{3,80}})",
                text_clean,
                flags=re.IGNORECASE
            )
            if m:
                candidate = m.group(1).split("/")[0].strip()
                candidate_norm = candidate.lower().replace(".", "").replace(",", "")
                if "derbysoft" in candidate_norm:
                    continue
                return candidate
    # =========================
    # MANUAL OVERRIDES
    # =========================
    for el in norm_elements:
        text_lower = (el.get("content") or "").lower()
        for keyword, value in MANUAL_OVERRIDES.items():
            if keyword in text_lower:
                return value
    return None

advertising_account_udf = udf(extract_advertising_account, StringType())

# =========================
# ADVERT.COMPANY EXTRACTION
# =========================

VENDOR_PATTERNS = [
    ("Meta", [
        "meta platforms",
        "facebook",
        "instagram",
        "payment@fb.com"
    ]),
    ("Google Ads", [
        "google ads",
        "google ireland",
        "google llc",
        "adwords"
    ]),
    ("Microsoft Ads", [
        "microsoft advertising",
        "bing ads",
        "microsoft ireland"
    ]),
    ("Derbysoft", [
        "derbysoft"
    ])
]


def extract_advertising_company(elements):
    if not elements:
        return None

    texts = []
    for el in elements:
        try:
            text = el["content"]
        except Exception:
            text = None

        if text:
            texts.append(text.lower())

    full_text = " ".join(texts)

    for company, keywords in VENDOR_PATTERNS:
        if any(k in full_text for k in keywords):
            return company

    return "Unknown"

advertising_company_udf = udf(extract_advertising_company, StringType())

# =========================
# AMOUNT NORMALIZATION
# =========================
def clean_amount(amt_raw):
    if not amt_raw:
        return None

    # Removes any character that is not digit, dot, comma, or minus
    amt_raw = re.sub(r"[^\d.,\-]", "", amt_raw)
    # If comma is used as thousands separator and dot as decimal
    if amt_raw.count(",") > 0 and amt_raw.count(".") == 1 and amt_raw.find(",") < amt_raw.find("."):
        amt_clean = amt_raw.replace(",", "")
    # If dot is used as thousands separator and comma as decimal
    elif amt_raw.count(".") > 0 and amt_raw.count(",") == 1 and amt_raw.find(".") < amt_raw.find(","):
        amt_clean = amt_raw.replace(".", "").replace(",", ".")
    # If only comma is present, treat as decimal
    elif amt_raw.count(",") == 1 and amt_raw.count(".") == 0:
        amt_clean = amt_raw.replace(",", ".")
    # If only dot is present, treat as decimal
    else:
        amt_clean = amt_raw
    try:
        return round(float(amt_clean), 2)
    except ValueError:
        return None

# =========================
# HTML ELEMENT EXTRACTION
# =========================
def extract_items_from_table_html(html, advertising_company=None):
    if not html:
        return []
    soup = BeautifulSoup(html, "html.parser")
    items = []

    # Filtering out rows with fee, vat, etc. in the description as an option
    ROW_EXCLUDE_KEYWORDS = [
        "subtotal",
        "grand total",
        "total in",
        "vat",
        "tax",
    ]

    ROW_OPTIONAL_EXCLUDE_KEYWORDS = [
        "fee",
        "regulatory"
    ]

    # Define keywords for fallback filtering; use only when headers are missing to pick up valid line items using kw-s
    # ITEMS_KEYWORDS = ["kempinski"]

    EMBEDDED_AMOUNT_REGEX = re.compile(
        r"(USD|EUR|GBP)\s*([0-9,]+\.\d{2})",
        re.IGNORECASE
    )
    EXCLUDE_OPTIONAL_ROWS = True  # set to False if you want to keep fee rows

    def should_exclude_row(description: str, amount: float, advertising_company: str) -> bool:
        if not description:
            return True

        # Normalize description for comparison
        def normalize(s):
            return " ".join(s.lower().replace(",", ".").split())

        desc_norm = normalize(description)
        # desc_norm = " ".join(description.lower().split())
        # For Meta: only include rows with 'instagram' or 'kempinski'
        if advertising_company == "Meta":
            if "instagram" in desc_norm or "kempinski" in desc_norm:
                return False
            return True

        # Always exclude summary keywords
        if any(k in desc_norm for k in ROW_EXCLUDE_KEYWORDS):
            return True

        # Optionally exclude fee / regulatory rows
        if EXCLUDE_OPTIONAL_ROWS and any(k in desc_norm for k in ROW_OPTIONAL_EXCLUDE_KEYWORDS):
            return True

        # Exact description + amount exclusions
        EXACT_EXCLUDE = [
            ("meta platforms ireland limited", 181958.06),
            ("meta platforms ireland limited", 0),
            ("acct name: meta platforms ireland limited", 3),
            ("acct name: meta platforms ireland limited", 218349.69),
            ("acct name: meta platforms ireland limited", 9399006600000000.0),
            ("meta platforms ireland limited", 0),
            ("bank of america europe dac", 0),
            ("bank of america europe dac", 181958.06),
            ("2 park place", 0),
            ("2 park place", 181958.06),
            ("account number (iban: ie93 bofa 9900 6154 8780 17", 218349.69),
            ("account number (iban: ie93 bofa 9900 6154 8780 17", 31758.26),
            ("account number iban: ie93 bofa 9900 6154 8780 17", 218349.69),
            ("account number: iban: ie93 bofa 9900 6154 8780 17", 218349.69),
            ("merrion road", 0),
            ("merrion road", 181958.06),
            ("merrion road", 22),
            ("dublin 2", 9399006600000000.0),
            ("dublin 4, d04 x2k5", 36391.62),
            ("dublin 4, d04 x2k5", 0),
            ("dublin 2", 36391.62),
            ("dublin 2", 9399006600000000.0),
            ("hatch street", 36391.62),
            ("hatch street", 0),
            ("ireland", 218349.69),
            ("ireland", 9399006600000000.0),
            ("ireland", 36391.62),
            ("ireland", 20),
            ("ireland", 0),
            ("ie9692928f", 9399006600000000.0),
            ("payment@fb.com", None),
            ("account number iban: ie93 bofa 9900 6154 8780 17", 31758.26),
            ("swift code: bofaie3x", 31758.26),
            ("swift code: bofaie3x", 218349.69),
            ("swift code (international transfer): bofaie3x", 218349.69),
            ("swift code (international transfer): bofaie3x", 31758.26),
        ]
        for ex_desc, ex_amt in EXACT_EXCLUDE:
            ex_desc_norm = normalize(ex_desc)
            if ex_desc_norm in desc_norm and (ex_amt is None or abs(ex_amt - amount) < 0.01):
                return True

        return False

    header_indices = None
    HOTEL_DESC_KEYWORDS = [
        "hotelname",
        "hotel name",
        "property",
        "propertyname"
        ]
    DESC_KEYWORDS = [
            "description", "item", "lineitem", "product", "service",
            "campaign", "ad", "details","beschreibung"
        ]
    AMT_KEYWORDS = [
            "amount", "total", "cost", "price", "charge", "net", "subtotal","gesamt"
        ]
    for table in soup.find_all("table"):
        rows = table.find_all("tr")
        if not rows or len(rows) < 2:
            continue

        merged_rows = rows
    # =========================
        # MS ADS ROW EXTRACTION
        # =========================
        if advertising_company == "Microsoft Ads":
            merged_rows = []
            i = 0
            while i < len(rows):
                tr = rows[i]
                tds = tr.find_all("td")

                # Extract description and amount from current row
                desc = " ".join([td.get_text(strip=True) for td in tds])
                amt = None
                for td in tds:
                    m = re.search(r"-?[0-9,]+\.\d", td.get_text())
                    if m:
                        amt = float(m.group().replace(",", ""))

                # Case 1: Empty amount, next row is Gutschrift with negative amount
                if (amt is None or abs(amt) < 0.01) and i + 1 < len(rows):
                    next_tr = rows[i + 1]
                    next_tds = next_tr.find_all("td")
                    next_desc = " ".join([td.get_text(strip=True) for td in next_tds])
                    next_amt = None
                    for td in next_tds:
                        m = re.search(r"-?[0-9,]+\.\d+", td.get_text())
                        if m:
                            next_amt = float(m.group().replace(",", ""))
                    if "gutschrift" in next_desc.lower() and next_amt is not None and next_amt < 0:
                        # Use Gutschrift row's description and amount only
                        next_tr.merged_text = next_desc
                        next_tr.merged_amount = next_amt
                        merged_rows.append(next_tr)
                        i += 2
                        continue
                """
                # Case 2: Current row has amount, next row has same description and negative amount
                if amt is not None and amt > 0 and i + 1 < len(rows):
                    next_tr = rows[i + 1]
                    next_tds = next_tr.find_all("td")
                    next_desc_raw = " ".join([td.get_text(strip=True) for td in next_tds])
                    next_amt = None
                    for td in next_tds:
                        m = re.search(r"-?[0-9,]+\\.\\d+", td.get_text())
                        if m:
                            next_amt = float(m.group().replace(",", ""))
                    # If next row is Gutschrift or has same description and negative amount
                    if (
                        (next_desc_raw == desc or "gutschrift" in next_desc_raw.lower())
                        and next_amt is not None and next_amt < 0
                    ):
                        # Optionally, concatenate Gutschrift description
                        if "gutschrift" in next_desc_raw.lower():
                            desc = desc + " " + next_desc_raw
                        tr.merged_text = desc
                        tr.merged_amount = amt
                        merged_rows.append(tr)
                        # Add the negative row as a separate item
                        next_tr.merged_text = next_desc_raw
                        next_tr.merged_amount = next_amt
                        merged_rows.append(next_tr)
                        i += 2
                        continue
                """
                # Otherwise, keep the row as is
                tr.merged_text = desc
                tr.merged_amount = amt
                merged_rows.append(tr)
                i += 1
        if merged_rows is not None:
            rows = merged_rows


        # GOOGLE ADS ROW EXTRACTION (stream-based robust Invalid activity handling)
        # =========================
        if advertising_company == "Google Ads":
            merged_rows = []
            i = 0
            while i < len(rows):
                tr = rows[i]
                tds = tr.find_all("td")
                desc = tds[0].get_text(strip=True) if tds else ""
                amt = clean_amount(tds[-1].get_text(strip=True)) if tds else None

                # Start merge: accumulate description until we hit an amount
                merged_desc = desc
                merged_amt = amt
                j = i + 1
                while j < len(rows):
                    next_tr = rows[j]
                    next_tds = next_tr.find_all("td")
                    next_desc = next_tds[0].get_text(strip=True) if next_tds else ""
                    next_amt = clean_amount(next_tds[-1].get_text(strip=True)) if next_tds else None

                    # Heuristic: continuation row has empty amount OR amount matches first row
                    if (next_amt is None) or (merged_amt is not None and abs(next_amt) < 0.01):
                        merged_desc += " " + next_desc
                        # pick the amount if current merged_amt is None
                        if merged_amt is None and next_amt is not None:
                            merged_amt = next_amt
                        j += 1
                    else:
                        break

                # Append merged row
                tr.merged_text = merged_desc
                tr.merged_amount = merged_amt
                merged_rows.append(tr)
                i = j  # skip merged rows

            rows = merged_rows


        # =========================
        # END
        # =========================
        # Find header row: row with most cells
        header_row = next((r for r in rows if r.find("th")), None)
        desc_idx = amt_idx = None

        if header_row:
            headers = [
                cell.get_text(strip=True).lower().replace(" ", "").replace("_", "")
                for cell in header_row.find_all(["th", "td"])
            ]

            desc_idx = next(
                (i for i, h in enumerate(headers) if any(k in h for k in HOTEL_DESC_KEYWORDS)),
                None
            )
            if desc_idx is None:
                desc_idx = next(
                    (i for i, h in enumerate(headers) if any(k in h for k in DESC_KEYWORDS)),
                    None
                )

            amt_idx = next(
                (i for i, h in enumerate(headers) if any(k in h for k in AMT_KEYWORDS)),
                None
            )

            if desc_idx is not None:
                header_indices = (desc_idx, amt_idx)

        if header_indices is None:
            continue

        desc_idx, amt_idx = header_indices

        for tr in rows[1:]:
            tds = tr.find_all("td")

            # Use merged text/amount if available
            desc = getattr(tr, "merged_text", None)

            if not desc and len(tds) > desc_idx:
                desc = tds[desc_idx].get_text(strip=True)

            amt = getattr(tr, "merged_amount", None)
            # Normal amount column
            if amt is None and amt_idx is not None and len(tds) > amt_idx:
                amt = clean_amount(tds[amt_idx].get_text(strip=True))

            # Embedded amount fallback
            if amt is None:
                for td in tds:
                    m = EMBEDDED_AMOUNT_REGEX.search(td.get_text())
                    if m:
                        amt = clean_amount(m.group(2))
                        break

            if amt is None or not desc:
                continue

            # filtering happens based on the exclude descriptions function above
            if should_exclude_row(desc,amt, advertising_company):
                continue
            items.append({
                "Description": desc,
                "Amount": amt
            })
    return items

items_schema = ArrayType(
    StructType([
        StructField("Description", StringType()),
        StructField("Amount", FloatType())
    ])
)

items_udf = udf(extract_items_from_table_html, items_schema)

# =========================
# SPARK DATAFRAMES
# =========================

df = spark.read.format("binaryFile").load("/Volumes/workspace/default/invoices_highres/*.pdf")

df = df.withColumn(
    "parsed",
    expr("ai_parse_document(content, map('version', '2.0'))")
)

df = df.withColumn(
    "elements_struct",
    expr("""
        try_cast(
            parsed:document:elements
            AS ARRAY<STRUCT<content:STRING,type:STRING>>
        )
    """)
)

df = df.withColumn(
    "invoice_number",
    invoice_udf(col("elements_struct"))
)

df = df.withColumn(
    "advertising_company",
    advertising_company_udf(col("elements_struct"))
)

df = df.withColumn(
    "advertising_account",
    advertising_account_udf(col("elements_struct"))
)

df = df.withColumn(
    "currency",
    currency_udf(col("elements_struct"))
)

df = df.withColumn(
    "table_html",
    expr("""
        concat_ws(
            '\n\n',
            transform(
                filter(
                    try_cast(parsed:document:elements AS ARRAY<VARIANT>),
                    e -> cast(e:type as string) = 'table'
                ),
                e -> cast(e:content as string)
            )
        )
    """)
)

df_items = df.withColumn("items", items_udf("table_html"))

df_flat = df_items.select(
    "path",
    "invoice_number",
    "advertising_company",
    "advertising_account",
    "currency",
    explode("items").alias("item")
).select(
    "path",
    "invoice_number",
    "advertising_company",
    "advertising_account",
    "currency",
    "item.Description",
    "item.Amount"
)

display(df_flat)

**Explore how the PDFs get parsed**<br>Retrieve the path, parsed result (raw), text

In [0]:
%sql
WITH parsed_documents AS (
  SELECT
    path,
    ai_parse_document(
      content,
      map('version', '2.0')
    ) AS parsed
  FROM READ_FILES(
    '/Volumes/workspace/default/invoices_highres/*.{pdf}',
    format => 'binaryFile'
  )
), parsed_text AS (
  SELECT
    path,
    parsed,
    concat_ws(
      '\n\n',
      transform(
        try_cast(parsed:document:elements AS ARRAY<VARIANT>),
        element -> try_cast(element:content AS STRING)
      )
    ) AS parsed_text
  FROM parsed_documents
  WHERE try_cast(parsed:error_status AS STRING) IS NULL
)
SELECT path, parsed, parsed_text FROM parsed_text;

### Next steps

**Extract the result using the Output window**<br>Download as CSV, Excel, Copy results to clipboard, add to notebook dashboard

**Perform further transformations to the DF as necessary**<br>E.g., Drop null values, drop duplicates, filter results, perform computations and aggregations

**Write the DF to a table for further querying / transformation**<br> Run SQL commands to further transform the dataset

In [0]:
# Writes the dataframe to a table
df_flat.write.mode("overwrite").saveAsTable("CATALOG.SCHEMA.TABLE_NAME")

**Send the output as a file attachment via email**<br>Install and configure Databricks CLI; create and use a Databricks secret

In [0]:
# Retrieve Gmail app password from Databricks secrets
# This securely loads your app password for use in the email function

gmail_app_password = dbutils.secrets.get(scope="gmail", key="app_password")


In [0]:
# Save the output DataFrame to a CSV file for email attachment
# This ensures the file is accessible by Python's open()

df_flat.toPandas().to_csv('/tmp/invoice_items.csv', index=False)


In [0]:
"""
STEPS
Create an app password for your email vendor (microsoft, google)
Generate a token in Databricks (go to the account settings)
Open command line to configure databricks

    EXAMPLE (gmail):
    pip install databricks-cli

    databricks --version

    databricks configure --token 
    (provide the Hostname, the token generated earlier; input your databricks username and the email app password)

    databricks secrets create-scope --scope gmail
    databricks secrets put --scope gmail --key gmail_app_password

"""

import smtplib
from email.message import EmailMessage

def send_email_with_attachment(to_email, subject, body, file_path, password):
    msg = EmailMessage()
    msg['From'] = 'SENDER.@gmail.com'
    msg['To'] = to_email
    msg['Subject'] = subject
    msg.set_content(body)

    with open('/tmp/invoice_items.csv', 'rb') as f:
        msg.add_attachment(
            f.read(),
            maintype='application',
            subtype='octet-stream',
            filename='invoice_items.csv'
        )

    # Connect to Gmail SMTP
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login('SENDER@gmail.com', password)
        server.send_message(msg)
        print("Email sent successfully!")

# Call the function
send_email_with_attachment(
    to_email='RECIPIENT@gmail.com',
    subject='Invoice Items Export',
    body='Please find the invoice items attached.',
    file_path='/tmp/invoice_items.csv',
    password=gmail_app_password
)
