In [0]:
%pip install lxml html5lib beautifulsoup4 pypdf weasyprint
%restart_python

In [0]:
import pickle
import pandas as pd
import html
from bs4 import BeautifulSoup
from pypdf import PdfReader, PdfWriter
import os
from io import BytesIO
from weasyprint import HTML
from weasyprint.document import DocumentMetadata
from datetime import datetime, timezone, timedelta

In [0]:
# Move a directory and its contents
# dbutils.fs.mv(
#     "dbfs:/Volumes/sandbox_catalog/default/globaledit-policy-docs/medicare-coverage-database/",
#     "dbfs:/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/",
#     recurse=True
# )


In [0]:
LCD_PKL_PATH="/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/lcd.pkl"
LCD_HTML_PATH="/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/lcds"
LCD_PDF_PATH="/Volumes/sandbox_catalog/default/globaledit-policy-docs/lcd"

LCD_ARTICLES_PKL_PATH="/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/lcd_articles.pkl"
LCD_ARTICLES_HTML_PATH="/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/lcd-articles"
LCD_ARTICLES_PDF_PATH="/Volumes/sandbox_catalog/default/globaledit-policy-docs/lcd-articles"

NCD_METADATA_PKL_PATH="/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/ncd_metadata.pkl"
NCD_CSV_PATH="/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/ncds/ncds.csv"
NCD_PDF_PATH="/Volumes/sandbox_catalog/default/globaledit-policy-docs/ncd"

In [0]:
file = open(LCD_PKL_PATH, 'rb')
lcd = pickle.load(file)
file.close()

file = open(LCD_ARTICLES_PKL_PATH, 'rb')
lcd_articles = pickle.load(file)
file.close()

file = open(NCD_METADATA_PKL_PATH, 'rb')
ncd_metadata = pickle.load(file)
file.close()

In [0]:
lcd_articles.head()

In [0]:
from pyspark.sql.functions import collect_list
lcd_jurisdictions = spark.read.format("delta").table("sandbox_catalog.default.lcd_jurisdiction")
article_jurisdictions = spark.read.format("delta").table("sandbox_catalog.default.article_jurisdiction")

lcd_jurisdictions_grouped = lcd_jurisdictions.groupBy("lcd_id").agg(collect_list("jurisdiction_cd").alias("jurisidiction_cds"), collect_list("contract_number").alias("contract_numbers")).toPandas()
article_jurisdictions_grouped = article_jurisdictions.groupBy("article_id").agg(collect_list("jurisdiction_cd").alias("jurisidiction_cds"), collect_list("contract_number").alias("contract_numbers")).toPandas()

In [0]:
lcd['document_id'] = pd.to_numeric(lcd['document_id'])
lcd_articles['document_id'] = pd.to_numeric(lcd_articles['document_id'])
lcd = lcd.merge(lcd_jurisdictions_grouped, left_on="document_id", right_on="lcd_id")
lcd_articles = lcd_articles.merge(article_jurisdictions_grouped, left_on="document_id", right_on="article_id")

In [0]:
lcd.head()

In [0]:
def add_metadata(input_pdf, output_pdf, metadata):
    reader = PdfReader(input_pdf)
    writer = PdfWriter()
    writer.append_pages_from_reader(reader)

    writer.add_metadata({
        "/Title": metadata.get("Title", ""),
        "/Author": metadata.get("Author", ""),
        "/Subject": metadata.get("Subject", ""),
        "/Keywords": metadata.get("Keywords", ""),
        "/Creator": metadata.get("Creator", ""),
        "/Producer": metadata.get("Producer", ""),
    })

    with open(output_pdf, "wb") as f:
        writer.write(f)


In [0]:
def format_pdf_datetime(raw_date_str, tz_offset_minutes=-240):
    """
    Converts input like '20250523155244' or '2025-05-29' to PDF metadata format.
    Returns: 'D:YYYYMMDDHHmmSS±HH'mm''
    """
    raw_date_str = raw_date_str.strip()

    # Try to detect format
    if len(raw_date_str) == 14 and raw_date_str.isdigit():
        # Format: YYYYMMDDhhmmss
        dt = datetime.strptime(raw_date_str, "%Y%m%d%H%M%S")
    elif len(raw_date_str) == 10 and '-' in raw_date_str:
        # Format: YYYY-MM-DD
        dt = datetime.strptime(raw_date_str, "%Y-%m-%d")
        dt = dt.replace(hour=0, minute=0, second=0)
    else:
        raise ValueError(f"Unsupported date format: {raw_date_str}")

    # Timezone formatting
    offset = timedelta(minutes=tz_offset_minutes)
    sign = '+' if tz_offset_minutes >= 0 else '-'
    hours_offset = abs(tz_offset_minutes) // 60
    minutes_offset = abs(tz_offset_minutes) % 60

    return f"D:{dt.strftime('%Y%m%d%H%M%S')}{sign}{hours_offset:02d}'{minutes_offset:02d}'"

def html_to_pdf(html_path, output_path, metadata):
    with open(html_path, "r", encoding="utf-8") as file:
        soup = BeautifulSoup(file)
        cleaned_html = soup.prettify()
        buffer = BytesIO()
        HTML(string=cleaned_html).write_pdf(buffer)
        buffer.seek(0)

        reader = PdfReader(buffer)
        writer = PdfWriter()
        writer.append_pages_from_reader(reader)
        writer.add_metadata({f"/{k}": v for k, v in metadata.items()})
        with open(output_path, "wb") as f:
            writer.write(f)
            
def process_html(input_path, output_path, policy_metadata):
    for i, line in policy_metadata.iterrows():
        doc_id = line['document_display_id']
        pdf_filename = f"{output_path}/{doc_id}.pdf"
        html_filename = f"{input_path}/{doc_id}.html"
        metadata = {
            "Title": line['title'],
            "Author": line['contractor_name_type'],
            "Subject": f"{line['document_type']}, Version{line['document_version']}",
            "Keywords": line['note'],
            "Creator": "GEMHTMLtoPDF",
            "Producer": "WeasyPrint",
            "CreationDate": format_pdf_datetime(str(line['effective_date'])),
            "ModDate": format_pdf_datetime(str(line['updated_on_sort']))
        }
        html_to_pdf(html_filename, pdf_filename, metadata)

In [0]:
process_html(LCD_HTML_PATH, LCD_PDF_PATH, lcd)


In [0]:
process_html(LCD_ARTICLES_HTML_PATH, LCD_ARTICLES_PDF_PATH, lcd_articles)

In [0]:
ncds = pd.read_csv("/Volumes/sandbox_catalog/default/non-pdf-policy-docs/medicare-coverage-database/ncds/ncds.csv")

In [0]:
ncds.head()

In [0]:
ncd_metadata.head()

In [0]:
def create_doc_text(metadata, data):
    text = f"""
    <h1>{data.document_display_id} {data.title}</h1>
    <h2>Chapter {metadata.chapter}</h2>
    <br>Version:<br>
    {data.document_version}
    <br>Publication Number:<br>
    {data.publication_number}
    <br>Effective Date:<br>
    {data.effective_date}
    <br>Effective End Date:<br>
    {data.effective_end_date}
    <br>Implementation Date:<br>
    {data.implementation_date}
    <br>QR Modifier Date:<br>
    {data.qr_modifier_date}
    <br>Benefit Category: <br>
    {data.benefit_category}
    <br>Item Service Description: <br>
    {data.item_service_description}
    <br>Indications Limitations: <br>
    {data.indications_limitations}
    <br>Cross Reference: <br>
    {data.cross_reference}
    <br>Transmittal Number:<br>
    {data.transmittal_number}
    <br>Transmittal URL:<br>
    {data.transmittal_url}
    <small>
    <br>Revision Histroy:<br>
    {data.revision_history}
    </small>
    <br>Other Text:<br>
    {data.other_text}
    <br>AMA Statement:<br>
    {data.ama_statement}
    <br>Reasons for Denial:<br>
    {data.reasons_for_denial}
    """
    return text

In [0]:
for i, row in ncd_metadata.iterrows():
    doc_id = row['document_display_id']
    line_data = ncds.loc[ncds['document_display_id'] == doc_id]

    pdf_filename = f"{NCD_PDF_PATH}/ncd-{doc_id}.pdf"

    pdf_metadata = DocumentMetadata(
        title=row['title'],
        description=f"{row['document_type']}, Version{row['document_version']}",
        authors=["CMS"],

    )

    text = create_doc_text(row, line_data.iloc[0])
    html_text = HTML(string=html.unescape(text))

    html_text.write_pdf(pdf_filename)
