## Loading chapter 1

In [3]:
import pymupdf # imports the pymupdf library
import os
path = "../data/raw/Chapter 1.pdf"# path to the PDF file
if os.path.exists(path):
    print("File exists, proceeding to open it.")

File exists, proceeding to open it.


In [4]:
doc = pymupdf.open(path) # open a document
for page in doc: # iterate the document pages
  text = page.get_text()

In [6]:
print(text) # print the text of the page

1/ See 9903.88.15.
Harmonized Tariff Schedule of the United States Revision 14 (2025)
Annotated for Statistical Reporting Purposes
I
Endnotes--page 1 - 8



In [1]:
import pdfplumber
import pandas as pd
import re

In [9]:
def clean_text(text):
    """Cleans cell text: replaces None with '', strips whitespace, replaces newlines."""
    if text is None:
        return ""
    return str(text).replace('\n', ' ').strip()

def remove_footnote_markers(text):
    """Removes footnote markers like 1/, 2/ etc."""
    if text is None:
        return ""
    # Regex to find a number followed by a slash, possibly with spaces
    return re.sub(r'\s*\d+/\s*', '', str(text)).strip()

def extract_hts_data(pdf_path, start_page_idx, end_page_idx):
    """
    Extracts HTS data from specified pages of a PDF.

    Args:
        pdf_path (str): Path to the PDF file.
        start_page_idx (int): 0-indexed start page.
        end_page_idx (int): 0-indexed end page.

    Returns:
        pandas.DataFrame: Extracted data.
    """
    all_rows_data = []
    # Define expected column headers based on visual inspection
    # Note: "Rates of Duty" is split, so we'll have more effective columns
    column_headers = [
        "Heading/Subheading",
        "Stat. Suffix",
        "Article Description",
        "Unit of Quantity",
        "Rates of Duty - General",
        "Rates of Duty - Special",
        "Rates of Duty - 2"
    ]

    current_main_heading = "" # To carry forward e.g., "0101"
    current_subheading_base = "" # To carry forward e.g., "0101.21.00" for stat. suffixes

    with pdfplumber.open(pdf_path) as pdf:
        for i in range(start_page_idx, end_page_idx + 1):
            if i >= len(pdf.pages):
                print(f"Warning: Page index {i} is out of bounds for the PDF.")
                continue
            
            page = pdf.pages[i]
            print(f"Processing page {i+1}...")

            # Use conservative table extraction settings
            # We might need to adjust these based on the PDF's specific structure
            table = page.extract_table(table_settings={
                "vertical_strategy": "lines_strict", # Use explicit vertical lines
                "horizontal_strategy": "lines_strict", # Use explicit horizontal lines
                "snap_tolerance": 5, # Tolerance for snapping to lines
                "join_tolerance": 5, # Tolerance for joining words
            })
            
            if not table:
                # Fallback if strict lines don't work, try text-based alignment
                table = page.extract_table(table_settings={
                    "vertical_strategy": "text",
                    "horizontal_strategy": "text",
                    "snap_tolerance": 5,
                    "join_tolerance": 5,
                })

            if not table:
                print(f"No table found on page {i+1} with current settings.")
                continue

            # The first row of the first processed page is usually the header
            # We are manually defining headers for robustness.
            # Skip actual header rows found in the table.
            
            is_first_data_row_on_page = True
            if page.page_number == start_page_idx + 1: # If it's the very first page of interest
                 # Skip the actual PDF header row if present in the extracted table
                if table and "Heading/" in clean_text(table[0][0]):
                    table = table[1:]
                    is_first_data_row_on_page = False


            for row_idx, raw_row in enumerate(table):
                # Check if it's a header row based on content and skip
                if "Heading/" in clean_text(raw_row[0]) or "Article Description" in clean_text(raw_row[2]):
                    if not (is_first_data_row_on_page and row_idx == 0 and page.page_number == start_page_idx + 1):
                        continue # Skip subsequent header rows

                cleaned_row = [clean_text(cell) for cell in raw_row]

                # Ensure row has enough columns, pad with empty strings if not
                while len(cleaned_row) < len(column_headers):
                    cleaned_row.append("")

                # Heuristic to identify data rows vs. sub-headers or empty lines
                # A data row usually has content in Article Description or Stat. Suffix or Rates
                if not any(cleaned_row[1:]): # If all cells after the first are empty
                    # Could be a main heading like "0101" or noise, let's try to capture it
                    potential_main_heading = cleaned_row[0]
                    if re.match(r'^\d{4}$', potential_main_heading) or \
                       re.match(r'^\d{4}\s*\(con\.\)$', potential_main_heading):
                        current_main_heading = potential_main_heading.replace('(con.)','').strip()
                        current_subheading_base = "" # Reset subheading base
                    continue

                # Handle combined Heading/Subheading and Stat. Suffix
                hs_text = cleaned_row[0]
                stat_suffix = cleaned_row[1]
                article_desc = remove_footnote_markers(cleaned_row[2])
                unit_qty = remove_footnote_markers(cleaned_row[3])
                rate_general = remove_footnote_markers(cleaned_row[4])
                rate_special = remove_footnote_markers(cleaned_row[5])
                rate_2 = remove_footnote_markers(cleaned_row[6])

                # Logic to construct full subheading
                full_subheading = ""

                if re.match(r'^\d{4}\.\d{2}\.\d{2}$', hs_text): # e.g., 0101.21.00
                    full_subheading = hs_text
                    current_main_heading = hs_text[:4]
                    current_subheading_base = hs_text
                elif re.match(r'^\d{4}$', hs_text): # e.g. 0101
                    current_main_heading = hs_text
                    # This row might be just the heading, description might be for its children
                    # Or it might be a general entry. Let's assume description belongs to it if present.
                    if not stat_suffix and article_desc:
                         full_subheading = current_main_heading # Or treat as placeholder
                elif hs_text: # Potentially just a suffix like ".29.00"
                    if current_main_heading and re.match(r'^\s*\.\d{2}\.\d{2}$', hs_text.replace(current_main_heading, '').strip()):
                        full_subheading = current_main_heading + hs_text.replace(current_main_heading, '').strip()
                        current_subheading_base = full_subheading
                    elif current_main_heading and not stat_suffix: # If it's like "Other..." under 0101
                         pass # full_subheading remains empty, will be inferred or context dependent

                if not full_subheading and stat_suffix and current_subheading_base:
                    # This is a statistical breakdown for the current_subheading_base
                    full_subheading = current_subheading_base
                elif not full_subheading and current_main_heading: # Default to main if nothing more specific
                    full_subheading = current_main_heading


                # Skip if no meaningful data after processing
                if not full_subheading and not stat_suffix and not article_desc:
                    continue
                
                # Consolidate Article Description if it's a continuation
                # (This can be tricky; pdfplumber often handles multiline cells correctly)
                # For now, we assume pdfplumber gives the full cell content.

                # If "Article Description" is empty but we have a suffix, it usually implies "Other" or inherits
                if stat_suffix and not article_desc and all_rows_data:
                    # Check if previous row was the parent subheading
                    prev_row = all_rows_data[-1]
                    if prev_row["Heading/Subheading"] == full_subheading and not prev_row["Stat. Suffix"]:
                         # Inherit from parent, or more commonly it's an "Other" category for that suffix
                         if "Other" in prev_row["Article Description"]: # If parent was "Other"
                             article_desc = f"Other ({stat_suffix})" # Be more specific
                         elif len(stat_suffix) == 2 and stat_suffix.isdigit(): # Common pattern
                             article_desc = "Other"

                # Special handling for "Cattle: (con.)" etc.
                if "(con.)" in hs_text and article_desc and not all_rows_data:
                     # This is likely a continuation of a previous item from another section or chapter start
                     article_desc = f"{hs_text} {article_desc}"
                elif "(con.)" in hs_text and all_rows_data:
                    # Try to prepend the relevant part of article description from a suitable parent
                    # This requires more complex state tracking. For now, just include "(con.)"
                    if not article_desc: # If the description itself is empty on a con. line
                        # Look for the last non-empty article description associated with the current_main_heading or base
                        for k in range(len(all_rows_data) -1, -1, -1):
                            if all_rows_data[k]["Heading/Subheading"].startswith(current_main_heading) and \
                               all_rows_data[k]["Article Description"]:
                                article_desc = f"{all_rows_data[k]['Article Description'].split(':')[0]}: (con.)"
                                break
                    else:
                        article_desc = f"{hs_text.split(':')[0] if ':' in hs_text else current_main_heading}: (con.) {article_desc}"


                # Special case for 0102 (con.) and similar on page 5
                # if current_main_heading and clean_text(raw_row[0]).endswith("(con.)") and not article_desc:
                #    # Try to find the base article description
                #    base_desc_found = False
                #    for k_prev in range(len(all_rows_data)-1, -1, -1):
                #        if all_rows_data[k_prev]["Heading/Subheading"].startswith(current_main_heading) and \
                #           ":" in all_rows_data[k_prev]["Article Description"]:
                #            article_desc = all_rows_data[k_prev]["Article Description"].split(":")[0] + ": (con.)"
                #            base_desc_found = True
                #            break
                #    if not base_desc_found:
                #        article_desc = f"{current_main_heading} (con.)"

                # Check for "Other" lines that belong to a previous heading.
                # Example: Under "0101.29.00 Other..."
                # Then "10 Imported for immediate slaughter."
                # Then "90 Other"
                # This "90 Other" should have a more specific article description if possible
                if stat_suffix and article_desc.lower() == "other" and all_rows_data:
                    parent_description = ""
                    # Look for the description of the parent subheading (no stat suffix)
                    for k_prev_row in range(len(all_rows_data)-1, -1, -1):
                        if all_rows_data[k_prev_row]["Heading/Subheading"] == full_subheading and \
                           not all_rows_data[k_prev_row]["Stat. Suffix"] and \
                           all_rows_data[k_prev_row]["Article Description"]:
                            parent_description = all_rows_data[k_prev_row]["Article Description"]
                            break
                    if parent_description and not parent_description.lower().endswith("other"):
                         article_desc = f"{parent_description} - Other"


                # Only add if there's some substance
                if full_subheading or stat_suffix or article_desc or rate_general:
                    all_rows_data.append({
                        "Heading/Subheading": full_subheading,
                        "Stat. Suffix": stat_suffix,
                        "Article Description": article_desc,
                        "Unit of Quantity": unit_qty,
                        "Rates of Duty - General": rate_general,
                        "Rates of Duty - Special": rate_special,
                        "Rates of Duty - 2": rate_2
                    })
                is_first_data_row_on_page = False


    return pd.DataFrame(all_rows_data, columns=column_headers)

In [None]:

# Page numbers from your document (1-indexed in PDF, 0-indexed for pdfplumber)
# Page 4 is index 3, Page 9 is index 8
start_page_human = 1
end_page_human = 4430

df_hts = extract_hts_data(path, start_page_human - 1, end_page_human - 1)

# Post-processing: remove rows that are entirely empty or just placeholders
df_hts.dropna(how='all', subset=df_hts.columns[1:], inplace=True) # Check all but first col
df_hts = df_hts[~((df_hts['Stat. Suffix'] == '') & \
                    (df_hts['Article Description'] == '') & \
                    (df_hts['Rates of Duty - General'] == ''))]

# Fill down Heading/Subheading if it's missing but previous rows indicate a continuation context
# This part is complex due to the HTS structure. A simple ffill might be too naive.
# The current logic tries to build `full_subheading` more intelligently.

# Remove footnote markers from all columns as a final pass (if missed)
for col in df_hts.columns:
    if df_hts[col].dtype == 'object': # Apply only to string columns
        df_hts[col] = df_hts[col].apply(lambda x: remove_footnote_markers(x))

output_csv_file = "hts_extracted_data.csv"
df_hts.to_csv(output_csv_file, index=False, encoding='utf-8')

print(f"\nData extracted and saved to {output_csv_file}")
print(f"Extracted {len(df_hts)} rows.")
print("\nFirst 10 rows of extracted data:")
print(df_hts.head(10))
print("\nLast 10 rows of extracted data:")
print(df_hts.tail(10))

In [15]:
df = pd.read_csv(output_csv_file)
# df.style.background_gradient()
# df.style.background_gradient(cmap='viridis', subset=['Rates of Duty - General', 'Rates of Duty - Special', 'Rates of Duty - 2'])

In [16]:
fullpdf = "../data/raw/finalCopy.pdf"
start_page_human = 1
end_page_human = 4430

df_hts = extract_hts_data(path, start_page_human - 1, end_page_human - 1)

# Post-processing: remove rows that are entirely empty or just placeholders
df_hts.dropna(how='all', subset=df_hts.columns[1:], inplace=True) # Check all but first col
df_hts = df_hts[~((df_hts['Stat. Suffix'] == '') & \
                    (df_hts['Article Description'] == '') & \
                    (df_hts['Rates of Duty - General'] == ''))]

# Fill down Heading/Subheading if it's missing but previous rows indicate a continuation context
# This part is complex due to the HTS structure. A simple ffill might be too naive.
# The current logic tries to build `full_subheading` more intelligently.

# Remove footnote markers from all columns as a final pass (if missed)
for col in df_hts.columns:
    if df_hts[col].dtype == 'object': # Apply only to string columns
        df_hts[col] = df_hts[col].apply(lambda x: remove_footnote_markers(x))

output_csv_file = "../data/processed/fullhts_extracted_data.csv"
df_hts.to_csv(output_csv_file, index=False, encoding='utf-8')

Processing page 1...
Processing page 2...
No table found on page 2 with current settings.
Processing page 3...
Processing page 4...
Processing page 5...
Processing page 6...
Processing page 7...
Processing page 8...
Processing page 9...
Processing page 10...
No table found on page 10 with current settings.


In [17]:
import re

In [9]:


# --- Configuration ---
# Columns for the final CSV
TARGET_COLUMN_HEADERS = [
    "Page Number",
    "Heading/Subheading",
    "Stat. Suffix",
    "Article Description",
    "Unit of Quantity",
    "Rates of Duty - General",
    "Rates of Duty - Special",
    "Rates of Duty - 2"
]

# Keywords to identify a HTS-like table header (lowercase for case-insensitive matching)
# These are crucial for identifying the tables of interest.
HTS_HEADER_KEYWORDS = [
    "heading/subheading", "heading/", "subheading", "stat.", "suf", "suffix",
    "article description", "article", "description",
    "unit of quantity", "unit", "quantity",
    "rates of duty", "rates", "duty", "general", "special"
]
MIN_HTS_HEADER_KEYWORDS_MATCH = 3 # Min distinct keywords from the list above to be found in a potential header row
MIN_COLUMNS_FOR_HTS_TABLE = 5   # A HTS table should have at least this many columns

# --- Helper Functions ---
def clean_text(text):
    """Cleans cell text: replaces None with '', strips whitespace, replaces newlines."""
    if text is None:
        return ""
    return str(text).replace('\n', ' ').strip()

def remove_footnote_markers(text):
    """Removes footnote markers like 1/, 2/ etc. from text."""
    if text is None:
        return ""
    return re.sub(r'\s*\d+/\s*', '', str(text)).strip()

def is_potential_hts_header_row(row_cells, keywords_list, min_matches):
    """
    Checks if a list of cell texts (a row) likely constitutes an HTS table header.
    """
    if not row_cells:
        return False
    
    match_count = 0
    # Combine all cell text in the row to check for keywords
    combined_text = " ".join(filter(None, [str(c).lower() for c in row_cells]))
    
    for keyword in keywords_list:
        if keyword in combined_text:
            match_count += 1
            
    return match_count >= min_matches

def map_extracted_row_to_target(raw_row_cells, page_num):
    """
    Maps the cells of an extracted raw_row to the TARGET_COLUMN_HEADERS structure.
    This is a heuristic-based mapping assuming HTS structure.
    It expects raw_row_cells to contain the data portion of the table.
    """
    mapped_data = {header: "" for header in TARGET_COLUMN_HEADERS}
    mapped_data["Page Number"] = page_num

    num_raw_cells = len(raw_row_cells)

    if num_raw_cells > 0:
        mapped_data["Heading/Subheading"] = remove_footnote_markers(clean_text(raw_row_cells[0]))
    if num_raw_cells > 1:
        mapped_data["Stat. Suffix"] = remove_footnote_markers(clean_text(raw_row_cells[1]))
    if num_raw_cells > 2:
        # Article Description can sometimes be split by pdfplumber or span visually
        # If fewer columns are detected than expected, later columns might be part of Article Description
        # For simplicity now, direct map. Robust solution would check number of cols vs expected.
        mapped_data["Article Description"] = remove_footnote_markers(clean_text(raw_row_cells[2]))
    if num_raw_cells > 3:
        mapped_data["Unit of Quantity"] = remove_footnote_markers(clean_text(raw_row_cells[3]))
    if num_raw_cells > 4:
        mapped_data["Rates of Duty - General"] = remove_footnote_markers(clean_text(raw_row_cells[4]))
    if num_raw_cells > 5:
        mapped_data["Rates of Duty - Special"] = remove_footnote_markers(clean_text(raw_row_cells[5]))
    if num_raw_cells > 6:
        mapped_data["Rates of Duty - 2"] = remove_footnote_markers(clean_text(raw_row_cells[6]))
    
    # If more cells exist, they are currently ignored. Could append to Article Description or a misc column.
    
    return mapped_data

def process_pdf_for_hts_tables(pdf_path):
    """
    Processes the entire PDF, identifies HTS tables, extracts data, and returns a DataFrame.
    """
    all_hts_rows_data = []
    
    # Global context for HTS codes (simplified)
    current_main_heading_context = ""
    current_subheading_base_context = ""

    if not os.path.exists(pdf_path):
        print(f"Error: PDF file not found at '{pdf_path}'")
        return pd.DataFrame()

    try:
        with pdfplumber.open(pdf_path) as pdf:
            total_pages = len(pdf.pages)
            print(f"Starting processing of '{os.path.basename(pdf_path)}' ({total_pages} pages)...")

            for i, page in enumerate(pdf.pages):
                page_num = i + 1
                if page_num % 100 == 0 or page_num == 1 or page_num == total_pages:
                    print(f"  Processing page {page_num}/{total_pages}...")

                try:
                    # Attempt to extract tables using strict line detection first
                    tables = page.extract_tables(table_settings={
                        "vertical_strategy": "lines_strict",
                        "horizontal_strategy": "lines_strict",
                        "snap_tolerance": 5, # Increased tolerance
                        "join_tolerance": 5,
                    })
                    if not tables: # Fallback to text-based if no lines found
                         tables = page.extract_tables(table_settings={
                            "vertical_strategy": "text",
                            "horizontal_strategy": "text",
                             "text_x_tolerance": 3,
                             "text_y_tolerance": 3,
                             "snap_tolerance": 5,
                             "join_tolerance": 5,
                        })
                except Exception as e:
                    # print(f"    Warning: Could not extract tables from page {page_num}. Error: {e}")
                    tables = [] # Ensure tables is an empty list to avoid further errors

                if not tables:
                    continue

                for table_idx, extracted_table in enumerate(tables):
                    if not extracted_table or len(extracted_table) < 2: # Need at least a header and one data row
                        continue
                    
                    # Check table width
                    if len(extracted_table[0]) < MIN_COLUMNS_FOR_HTS_TABLE:
                        continue

                    # Identify if this table is an HTS table by checking its first few rows for keywords
                    header_row_index = -1
                    for r_idx, row_content in enumerate(extracted_table[:3]): # Check first 3 rows
                        cleaned_row_cells = [clean_text(cell) for cell in row_content]
                        if is_potential_hts_header_row(cleaned_row_cells, HTS_HEADER_KEYWORDS, MIN_HTS_HEADER_KEYWORDS_MATCH):
                            header_row_index = r_idx
                            break
                    
                    if header_row_index == -1: # Not identified as an HTS table
                        continue

                    # If identified, process rows from (header_row_index + 1) onwards
                    for data_row_idx in range(header_row_index + 1, len(extracted_table)):
                        raw_data_cells = extracted_table[data_row_idx]
                        
                        # Basic validation: ensure it's not an empty or malformed row
                        if not any(raw_data_cells) or not any(clean_text(c) for c in raw_data_cells):
                            continue
                        
                        mapped_row = map_extracted_row_to_target(raw_data_cells, page_num)

                        # --- HTS Specific Context Logic (Simplified) ---
                        hs_text = mapped_row["Heading/Subheading"]
                        stat_suffix = mapped_row["Stat. Suffix"]
                        article_desc = mapped_row["Article Description"]
                        
                        full_subheading = ""
                        # Try to parse and maintain current HTS code context
                        if re.match(r'^\d{4}\.\d{2}\.\d{2}$', hs_text): # Full HTS e.g., 0101.21.00
                            full_subheading = hs_text
                            current_main_heading_context = hs_text[:4]
                            current_subheading_base_context = hs_text
                        elif re.match(r'^\d{4}(\s*\(con\.\))?$', hs_text): # Main heading e.g., 0101 or 0101 (con.)
                            potential_main = hs_text.replace('(con.)','').strip()
                            if re.match(r'^\d{4}$', potential_main):
                                current_main_heading_context = potential_main
                            full_subheading = current_main_heading_context # Use main as placeholder
                            # If it's a "(con.)" line for a main heading and article desc is empty, it's usually just a spacer
                        elif hs_text: # Other non-empty heading text
                            # Could be a partial like ".29.00" or just descriptive text if first col isn't code
                            if current_main_heading_context and re.match(r'^\s*\.\d{2}\.\d{2}$', hs_text.replace(current_main_heading_context, '', 1).strip()):
                                full_subheading = current_main_heading_context + hs_text.replace(current_main_heading_context, '', 1).strip()
                                current_subheading_base_context = full_subheading
                            else:
                                full_subheading = hs_text # Keep as is if can't resolve
                        
                        # If stat_suffix exists, the full_subheading should be the base HTS code
                        if not full_subheading and stat_suffix and current_subheading_base_context:
                            full_subheading = current_subheading_base_context
                        elif not full_subheading and current_main_heading_context: # Default if nothing else
                            full_subheading = current_main_heading_context
                        
                        mapped_row["Heading/Subheading"] = full_subheading
                        # --- End HTS Specific Context Logic ---

                        # Add row if it has some substance beyond just page number or context codes
                        if any([mapped_row["Stat. Suffix"], 
                                mapped_row["Article Description"], 
                                mapped_row["Unit of Quantity"],
                                mapped_row["Rates of Duty - General"]]):
                            all_hts_rows_data.append(mapped_row)
            
            print(f"  Finished processing all pages.")

    except Exception as e:
        print(f"An error occurred during PDF processing: {e}")
        return pd.DataFrame(all_hts_rows_data, columns=TARGET_COLUMN_HEADERS) # Return what we have so far

    if not all_hts_rows_data:
        print("No HTS-like tables meeting the criteria were found in the document.")
        return pd.DataFrame()

    df = pd.DataFrame(all_hts_rows_data, columns=TARGET_COLUMN_HEADERS)
    
    # Final cleanup of DataFrame
    # Drop rows where essential HTS data fields are all empty
    essential_cols = ["Heading/Subheading", "Stat. Suffix", "Article Description", 
                      "Unit of Quantity", "Rates of Duty - General"]
    df.dropna(how='all', subset=essential_cols, inplace=True)
    
    # Remove rows that might just be section headers or empty context propagations
    df = df[~((df['Stat. Suffix'] == '') & \
              (df['Article Description'] == '') & \
              (df['Unit of Quantity'] == '') & \
              (df['Rates of Duty - General'] == '') & \
              (df['Rates of Duty - Special'] == '') & \
              (df['Rates of Duty - 2'] == '')
             )]

    return df

In [12]:
fullpdf = "../data/raw/Chapter 1.pdf"
output_csv_file = "../data/processed/fullhts_extracted_data.csv"

final_df = process_pdf_for_hts_tables(fullpdf)

if not final_df.empty:
    try:
        final_df.to_csv(output_csv_file, index=False, encoding='utf-8')
        print(f"\nSuccessfully extracted {len(final_df)} HTS data rows.")
        print(f"Data saved to '{output_csv_file}'")
        print("\nExtracted data shape:")
        print(final_df.shape)
    except Exception as e:
        print(f"\nError saving data to CSV: {e}")
else:
    print("\nNo data was extracted or data was empty after cleanup.")

Starting processing of 'Chapter 1.pdf' (10 pages)...
  Processing page 1/10...
  Processing page 10/10...
  Finished processing all pages.

Successfully extracted 208 HTS data rows.
Data saved to '../data/processed/fullhts_extracted_data.csv'

Extracted data shape:
(208, 8)


In [14]:
output_csv_file = "../data/processed/fullhts_extracted_data.csv"

df = pd.read_csv(output_csv_file)

In [11]:
import os

In [15]:
df.shape, df.isna().sum()

((208, 8),
 Page Number                  0
 Heading/Subheading           1
 Stat. Suffix               105
 Article Description         62
 Unit of Quantity            67
 Rates of Duty - General    153
 Rates of Duty - Special    172
 Rates of Duty - 2          184
 dtype: int64)

In [17]:
# df.style

In [18]:
import pdfplumber
import re
import os
import sqlite3 # For SQLite database interaction
import time    # For progress and timing

# --- Configuration (same as before) ---
DB_NAME = "../data/processed/hts_schedule_data.db"
TABLE_NAME = "hts_entries"

# Columns for the database table and expected data order
# IMPORTANT: The order here must match the order of values when inserting
DB_COLUMN_DEFINITIONS = [
    ("Page_Number", "INTEGER"), # Using underscores for DB friendliness
    ("Heading_Subheading", "TEXT"),
    ("Stat_Suffix", "TEXT"),
    ("Article_Description", "TEXT"),
    ("Unit_of_Quantity", "TEXT"),
    ("Rates_of_Duty_General", "TEXT"),
    ("Rates_of_Duty_Special", "TEXT"),
    ("Rates_of_Duty_2", "TEXT")
]
# For DictWriter-like mapping from processed data (keys should match map_extracted_row_to_target output)
MAPPED_DATA_KEYS = [
    "Page Number", # These keys are from map_extracted_row_to_target
    "Heading/Subheading",
    "Stat. Suffix",
    "Article Description",
    "Unit of Quantity",
    "Rates of Duty - General",
    "Rates of Duty - Special",
    "Rates of Duty - 2"
]


HTS_HEADER_KEYWORDS = [
    "heading/subheading", "heading/", "subheading", "stat.", "suf", "suffix",
    "article description", "article", "description",
    "unit of quantity", "unit", "quantity",
    "rates of duty", "rates", "duty", "general", "special"
]
MIN_HTS_HEADER_KEYWORDS_MATCH = 3
MIN_COLUMNS_FOR_HTS_TABLE = 5

# --- Helper Functions (clean_text, remove_footnote_markers, is_potential_hts_header_row - same as before) ---
def clean_text(text):
    if text is None: return ""
    return str(text).replace('\n', ' ').strip()

def remove_footnote_markers(text):
    if text is None: return ""
    return re.sub(r'\s*\d+/\s*', '', str(text)).strip()

def is_potential_hts_header_row(row_cells, keywords_list, min_matches):
    if not row_cells: return False
    combined_text = " ".join(filter(None, [str(c).lower() for c in row_cells]))
    match_count = 0
    for keyword in keywords_list:
        if keyword in combined_text:
            match_count += 1
    return match_count >= min_matches

def map_extracted_row_to_target_dict(raw_row_cells, page_num):
    """
    Maps raw cells to a dictionary with predefined keys (MAPPED_DATA_KEYS).
    This output dictionary's keys will be used for database insertion.
    """
    mapped_data = {key: "" for key in MAPPED_DATA_KEYS} # Initialize with all keys
    mapped_data["Page Number"] = page_num # Key from MAPPED_DATA_KEYS
    
    num_raw_cells = len(raw_row_cells)
    
    if num_raw_cells > 0: mapped_data["Heading/Subheading"] = remove_footnote_markers(clean_text(raw_row_cells[0]))
    if num_raw_cells > 1: mapped_data["Stat. Suffix"] = remove_footnote_markers(clean_text(raw_row_cells[1]))
    if num_raw_cells > 2: mapped_data["Article Description"] = remove_footnote_markers(clean_text(raw_row_cells[2]))
    if num_raw_cells > 3: mapped_data["Unit of Quantity"] = remove_footnote_markers(clean_text(raw_row_cells[3]))
    if num_raw_cells > 4: mapped_data["Rates of Duty - General"] = remove_footnote_markers(clean_text(raw_row_cells[4]))
    if num_raw_cells > 5: mapped_data["Rates of Duty - Special"] = remove_footnote_markers(clean_text(raw_row_cells[5]))
    if num_raw_cells > 6: mapped_data["Rates of Duty - 2"] = remove_footnote_markers(clean_text(raw_row_cells[6]))
    return mapped_data


def setup_database(db_name, table_name, column_definitions):
    """Creates the SQLite database and table if they don't exist."""
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    
    # Create column definition string for CREATE TABLE statement
    cols_def_str = ", ".join([f'"{col_name}" {col_type}' for col_name, col_type in column_definitions])
    
    # Check if table exists
    cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
    if cursor.fetchone():
        print(f"Table '{table_name}' already exists. Appending data or consider deleting the DB file first if a fresh start is needed.")
    else:
        cursor.execute(f'''
            CREATE TABLE {table_name} (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                {cols_def_str}
            )
        ''')
        print(f"Table '{table_name}' created in database '{db_name}'.")
    conn.commit()
    return conn, cursor

def insert_row_into_db(cursor, table_name, column_names_for_insert, data_dict):
    """Inserts a single row (as a dictionary) into the database."""
    # Ensure values are in the correct order corresponding to column_names_for_insert
    values_to_insert = [data_dict.get(key_from_mapped_data, "") for key_from_mapped_data in MAPPED_DATA_KEYS]
    
    placeholders = ', '.join(['?'] * len(column_names_for_insert))
    # column_names_for_insert are the actual DB column names (e.g., "Page_Number")
    cols_sql_str = ', '.join([f'"{col}"' for col in column_names_for_insert])

    sql = f"INSERT INTO {table_name} ({cols_sql_str}) VALUES ({placeholders})"
    cursor.execute(sql, values_to_insert)


def process_pdf_and_write_to_db(pdf_path, db_conn, db_cursor):
    """
    Processes PDF, identifies HTS tables, and writes data incrementally to SQLite.
    Returns the total number of rows written.
    """
    rows_written_count = 0
    
    current_main_heading_context = ""
    current_subheading_base_context = ""
    
    db_column_names = [col_def[0] for col_def in DB_COLUMN_DEFINITIONS] # e.g., ["Page_Number", "Heading_Subheading", ...]

    if not os.path.exists(pdf_path):
        print(f"Error: PDF file not found at '{pdf_path}'")
        return 0

    try:
        with pdfplumber.open(pdf_path) as pdf:
            total_pages = len(pdf.pages)
            print(f"Starting processing of '{os.path.basename(pdf_path)}' ({total_pages} pages)...")
            print(f"Data will be written incrementally to SQLite table '{TABLE_NAME}' in '{DB_NAME}'")
            start_time = time.time()

            for i, page in enumerate(pdf.pages):
                page_num = i + 1
                if page_num % 50 == 0 or page_num == 1 or page_num == total_pages:
                    elapsed_time = time.time() - start_time
                    print(f"  Processing page {page_num}/{total_pages}... (Elapsed: {elapsed_time:.2f}s)")

                page_rows_data = [] # Temporary list for rows from the current page

                try:
                    tables = page.extract_tables(table_settings={
                        "vertical_strategy": "lines_strict", "horizontal_strategy": "lines_strict",
                        "snap_tolerance": 5, "join_tolerance": 5,
                    })
                    if not tables:
                        tables = page.extract_tables(table_settings={
                            "vertical_strategy": "text", "horizontal_strategy": "text",
                            "text_x_tolerance": 3, "text_y_tolerance": 3,
                            "snap_tolerance": 5, "join_tolerance": 5,
                        })
                except Exception:
                    tables = []

                if not tables:
                    continue

                for extracted_table in tables:
                    if not extracted_table or len(extracted_table) < 2 or len(extracted_table[0]) < MIN_COLUMNS_FOR_HTS_TABLE:
                        continue

                    header_row_index = -1
                    for r_idx, row_content in enumerate(extracted_table[:3]):
                        cleaned_row_cells = [clean_text(cell) for cell in row_content]
                        if is_potential_hts_header_row(cleaned_row_cells, HTS_HEADER_KEYWORDS, MIN_HTS_HEADER_KEYWORDS_MATCH):
                            header_row_index = r_idx
                            break
                    
                    if header_row_index == -1:
                        continue

                    for data_row_idx in range(header_row_index + 1, len(extracted_table)):
                        raw_data_cells = extracted_table[data_row_idx]
                        if not any(raw_data_cells) or not any(clean_text(c) for c in raw_data_cells):
                            continue
                        
                        mapped_row_dict = map_extracted_row_to_target_dict(raw_data_cells, page_num)

                        # --- HTS Specific Context Logic (Simplified - same as before) ---
                        hs_text = mapped_row_dict["Heading/Subheading"]
                        full_subheading = ""
                        if re.match(r'^\d{4}\.\d{2}\.\d{2}$', hs_text):
                            full_subheading = hs_text
                            current_main_heading_context = hs_text[:4]
                            current_subheading_base_context = hs_text
                        elif re.match(r'^\d{4}(\s*\(con\.\))?$', hs_text):
                            potential_main = hs_text.replace('(con.)','').strip()
                            if re.match(r'^\d{4}$', potential_main):
                                current_main_heading_context = potential_main
                            full_subheading = current_main_heading_context
                        elif hs_text:
                            if current_main_heading_context and re.match(r'^\s*\.\d{2}\.\d{2}$', hs_text.replace(current_main_heading_context, '', 1).strip()):
                                full_subheading = current_main_heading_context + hs_text.replace(current_main_heading_context, '', 1).strip()
                                current_subheading_base_context = full_subheading
                            else:
                                full_subheading = hs_text
                        
                        if not full_subheading and mapped_row_dict["Stat. Suffix"] and current_subheading_base_context:
                            full_subheading = current_subheading_base_context
                        elif not full_subheading and current_main_heading_context:
                            full_subheading = current_main_heading_context
                        
                        mapped_row_dict["Heading/Subheading"] = full_subheading
                        # --- End HTS Specific Context Logic ---
                        
                        essential_cols_check = [mapped_row_dict["Stat. Suffix"], 
                                       mapped_row_dict["Article Description"], 
                                       mapped_row_dict["Unit of Quantity"],
                                       mapped_row_dict["Rates of Duty - General"]]
                        
                        if any(essential_cols_check):
                            page_rows_data.append(mapped_row_dict)
                
                # Batch insert rows from the current page
                if page_rows_data:
                    for row_dict in page_rows_data:
                         insert_row_into_db(db_cursor, TABLE_NAME, db_column_names, row_dict)
                    rows_written_count += len(page_rows_data)
                    db_conn.commit() # Commit after processing each page's data

            total_time = time.time() - start_time
            print(f"  Finished processing all pages. Total time: {total_time:.2f}s")

    except Exception as e:
        print(f"An error occurred during PDF processing or DB writing: {e}")
        db_conn.rollback() # Rollback any uncommitted changes if error
    
    return rows_written_count

In [2]:
import pdfplumber
import re
import os
import sqlite3 # For SQLite database interaction
import time    # For progress and timing

# --- Configuration (same as before) ---
DB_NAME = "../data/processed/fill_hts_schedule_data.db"
TABLE_NAME = "hts_entries"

# Columns for the database table and expected data order
# IMPORTANT: The order here must match the order of values when inserting
DB_COLUMN_DEFINITIONS = [
    ("Page_Number", "INTEGER"), # Using underscores for DB friendliness
    ("Heading_Subheading", "TEXT"),
    ("Stat_Suffix", "TEXT"),
    ("Article_Description", "TEXT"),
    ("Unit_of_Quantity", "TEXT"),
    ("Rates_of_Duty_General", "TEXT"),
    ("Rates_of_Duty_Special", "TEXT"),
    ("Rates_of_Duty_2", "TEXT")
]
# For DictWriter-like mapping from processed data (keys should match map_extracted_row_to_target output)
MAPPED_DATA_KEYS = [
    "Page Number", # These keys are from map_extracted_row_to_target
    "Heading/Subheading",
    "Stat. Suffix",
    "Article Description",
    "Unit of Quantity",
    "Rates of Duty - General",
    "Rates of Duty - Special",
    "Rates of Duty - 2"
]


HTS_HEADER_KEYWORDS = [
    "heading/subheading", "heading/", "subheading", "stat.", "suf", "suffix",
    "article description", "article", "description",
    "unit of quantity", "unit", "quantity",
    "rates of duty", "rates", "duty", "general", "special"
]
MIN_HTS_HEADER_KEYWORDS_MATCH = 3
MIN_COLUMNS_FOR_HTS_TABLE = 5

# --- Helper Functions (clean_text, remove_footnote_markers, is_potential_hts_header_row - same as before) ---
def clean_text(text):
    if text is None: return ""
    return str(text).replace('\n', ' ').strip()

def remove_footnote_markers(text):
    if text is None: return ""
    return re.sub(r'\s*\d+/\s*', '', str(text)).strip()

def is_potential_hts_header_row(row_cells, keywords_list, min_matches):
    if not row_cells: return False
    combined_text = " ".join(filter(None, [str(c).lower() for c in row_cells]))
    match_count = 0
    for keyword in keywords_list:
        if keyword in combined_text:
            match_count += 1
    return match_count >= min_matches

def map_extracted_row_to_target_dict(raw_row_cells, page_num):
    """
    Maps raw cells to a dictionary with predefined keys (MAPPED_DATA_KEYS).
    This output dictionary's keys will be used for database insertion.
    """
    mapped_data = {key: "" for key in MAPPED_DATA_KEYS} # Initialize with all keys
    mapped_data["Page Number"] = page_num # Key from MAPPED_DATA_KEYS
    
    num_raw_cells = len(raw_row_cells)
    
    if num_raw_cells > 0: mapped_data["Heading/Subheading"] = remove_footnote_markers(clean_text(raw_row_cells[0]))
    if num_raw_cells > 1: mapped_data["Stat. Suffix"] = remove_footnote_markers(clean_text(raw_row_cells[1]))
    if num_raw_cells > 2: mapped_data["Article Description"] = remove_footnote_markers(clean_text(raw_row_cells[2]))
    if num_raw_cells > 3: mapped_data["Unit of Quantity"] = remove_footnote_markers(clean_text(raw_row_cells[3]))
    if num_raw_cells > 4: mapped_data["Rates of Duty - General"] = remove_footnote_markers(clean_text(raw_row_cells[4]))
    if num_raw_cells > 5: mapped_data["Rates of Duty - Special"] = remove_footnote_markers(clean_text(raw_row_cells[5]))
    if num_raw_cells > 6: mapped_data["Rates of Duty - 2"] = remove_footnote_markers(clean_text(raw_row_cells[6]))
    return mapped_data


def setup_database_if_needed(db_name, table_name, column_definitions):
    """Creates the SQLite database and table only if they don't exist."""
    db_exists = os.path.exists(db_name)
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    
    if db_exists:
        # Check if table exists in existing database
        cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
        if cursor.fetchone():
            print(f"Database '{db_name}' and table '{table_name}' already exist. Will append new data.")
            return conn, cursor
        else:
            print(f"Database exists but table '{table_name}' not found. Creating table...")
    else:
        print(f"Creating new database '{db_name}'...")
    
    # Create table if it doesn't exist
    cols_def_str = ", ".join([f'"{col_name}" {col_type}' for col_name, col_type in column_definitions])
    cursor.execute(f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            {cols_def_str}
        )
    ''')
    conn.commit()
    print(f"Table '{table_name}' ready in database '{db_name}'.")
    return conn, cursor

def insert_row_into_db(cursor, table_name, column_names_for_insert, data_dict):
    """Inserts a single row (as a dictionary) into the database."""
    # Ensure values are in the correct order corresponding to column_names_for_insert
    values_to_insert = [data_dict.get(key_from_mapped_data, "") for key_from_mapped_data in MAPPED_DATA_KEYS]
    
    placeholders = ', '.join(['?'] * len(column_names_for_insert))
    # column_names_for_insert are the actual DB column names (e.g., "Page_Number")
    cols_sql_str = ', '.join([f'"{col}"' for col in column_names_for_insert])

    sql = f"INSERT INTO {table_name} ({cols_sql_str}) VALUES ({placeholders})"
    cursor.execute(sql, values_to_insert)


def process_pdf_and_write_to_db(pdf_path, db_conn, db_cursor):
    """
    Processes PDF, identifies HTS tables, and writes data directly to SQLite.
    Memory optimized - processes one row at a time without storing in memory.
    Returns the total number of rows written.
    """
    rows_written_count = 0
    
    current_main_heading_context = ""
    current_subheading_base_context = ""
    
    db_column_names = [col_def[0] for col_def in DB_COLUMN_DEFINITIONS] # e.g., ["Page_Number", "Heading_Subheading", ...]

    if not os.path.exists(pdf_path):
        print(f"Error: PDF file not found at '{pdf_path}'")
        return 0

    try:
        with pdfplumber.open(pdf_path) as pdf:
            total_pages = len(pdf.pages)
            print(f"Starting processing of '{os.path.basename(pdf_path)}' ({total_pages} pages)...")
            print(f"Data will be written directly to SQLite table '{TABLE_NAME}' in '{DB_NAME}'")
            start_time = time.time()

            for i, page in enumerate(pdf.pages):
                page_num = i + 1
                if page_num % 50 == 0 or page_num == 1 or page_num == total_pages:
                    elapsed_time = time.time() - start_time
                    print(f"  Processing page {page_num}/{total_pages}... (Rows written so far: {rows_written_count}, Elapsed: {elapsed_time:.2f}s)")

                try:
                    tables = page.extract_tables(table_settings={
                        "vertical_strategy": "lines_strict", "horizontal_strategy": "lines_strict",
                        "snap_tolerance": 5, "join_tolerance": 5,
                    })
                    if not tables:
                        tables = page.extract_tables(table_settings={
                            "vertical_strategy": "text", "horizontal_strategy": "text",
                            "text_x_tolerance": 3, "text_y_tolerance": 3,
                            "snap_tolerance": 5, "join_tolerance": 5,
                        })
                except Exception:
                    tables = []

                if not tables:
                    continue

                for extracted_table in tables:
                    if not extracted_table or len(extracted_table) < 2 or len(extracted_table[0]) < MIN_COLUMNS_FOR_HTS_TABLE:
                        continue

                    header_row_index = -1
                    for r_idx, row_content in enumerate(extracted_table[:3]):
                        cleaned_row_cells = [clean_text(cell) for cell in row_content]
                        if is_potential_hts_header_row(cleaned_row_cells, HTS_HEADER_KEYWORDS, MIN_HTS_HEADER_KEYWORDS_MATCH):
                            header_row_index = r_idx
                            break
                    
                    if header_row_index == -1:
                        continue

                    for data_row_idx in range(header_row_index + 1, len(extracted_table)):
                        raw_data_cells = extracted_table[data_row_idx]
                        if not any(raw_data_cells) or not any(clean_text(c) for c in raw_data_cells):
                            continue
                        
                        mapped_row_dict = map_extracted_row_to_target_dict(raw_data_cells, page_num)

                        # --- HTS Specific Context Logic (Simplified - same as before) ---
                        hs_text = mapped_row_dict["Heading/Subheading"]
                        full_subheading = ""
                        if re.match(r'^\d{4}\.\d{2}\.\d{2}$', hs_text):
                            full_subheading = hs_text
                            current_main_heading_context = hs_text[:4]
                            current_subheading_base_context = hs_text
                        elif re.match(r'^\d{4}(\s*\(con\.\))?$', hs_text):
                            potential_main = hs_text.replace('(con.)','').strip()
                            if re.match(r'^\d{4}$', potential_main):
                                current_main_heading_context = potential_main
                            full_subheading = current_main_heading_context
                        elif hs_text:
                            if current_main_heading_context and re.match(r'^\s*\.\d{2}\.\d{2}$', hs_text.replace(current_main_heading_context, '', 1).strip()):
                                full_subheading = current_main_heading_context + hs_text.replace(current_main_heading_context, '', 1).strip()
                                current_subheading_base_context = full_subheading
                            else:
                                full_subheading = hs_text
                        
                        if not full_subheading and mapped_row_dict["Stat. Suffix"] and current_subheading_base_context:
                            full_subheading = current_subheading_base_context
                        elif not full_subheading and current_main_heading_context:
                            full_subheading = current_main_heading_context
                        
                        mapped_row_dict["Heading/Subheading"] = full_subheading
                        # --- End HTS Specific Context Logic ---
                        
                        essential_cols_check = [mapped_row_dict["Stat. Suffix"], 
                                       mapped_row_dict["Article Description"], 
                                       mapped_row_dict["Unit of Quantity"],
                                       mapped_row_dict["Rates of Duty - General"]]
                        
                        if any(essential_cols_check):
                            # Insert row immediately to database (no memory storage)
                            insert_row_into_db(db_cursor, TABLE_NAME, db_column_names, mapped_row_dict)
                            rows_written_count += 1
                            
                            # Commit every 100 rows for better performance and data safety
                            if rows_written_count % 100 == 0:
                                db_conn.commit()

            # Final commit for any remaining uncommitted rows
            db_conn.commit()
            total_time = time.time() - start_time
            print(f"  Finished processing all pages. Total time: {total_time:.2f}s")

    except Exception as e:
        print(f"An error occurred during PDF processing or DB writing: {e}")
        db_conn.rollback() # Rollback any uncommitted changes if error
    
    return rows_written_count

In [3]:
# --- Main Execution Code ---
pdf_file_path = "../data/raw/finalCopy.pdf"  # <--- SET YOUR PDF FILE PATH HERE

# Setup database - only creates if doesn't exist, otherwise just connects
db_connection, db_cursor = setup_database_if_needed(DB_NAME, TABLE_NAME, DB_COLUMN_DEFINITIONS)

total_rows = 0
if db_connection and db_cursor:
    try:
        total_rows = process_pdf_and_write_to_db(pdf_file_path, db_connection, db_cursor)
    except Exception as e:
        print(f"Error during processing: {e}")
    finally:
        db_connection.commit() # Final commit
        db_cursor.close()
        db_connection.close()
        print("Database connection closed.")

if total_rows > 0:
    print(f"\nSuccessfully processed PDF. {total_rows} HTS data rows written to SQLite.")
    print(f"Data saved to table '{TABLE_NAME}' in database '{DB_NAME}'")
else:
    print("\nNo HTS data rows were written to the database.")

Creating new database '../data/processed/fill_hts_schedule_data.db'...
Table 'hts_entries' ready in database '../data/processed/fill_hts_schedule_data.db'.


CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox


Starting processing of 'finalCopy.pdf' (4430 pages)...
Data will be written directly to SQLite table 'hts_entries' in '../data/processed/fill_hts_schedule_data.db'
  Processing page 1/4430... (Rows written so far: 0, Elapsed: 0.00s)
  Processing page 50/4430... (Rows written so far: 0, Elapsed: 11.84s)
  Processing page 100/4430... (Rows written so far: 0, Elapsed: 27.99s)
  Processing page 150/4430... (Rows written so far: 0, Elapsed: 72.19s)
  Processing page 200/4430... (Rows written so far: 0, Elapsed: 121.68s)
  Processing page 250/4430... (Rows written so far: 0, Elapsed: 171.42s)
  Processing page 300/4430... (Rows written so far: 0, Elapsed: 189.82s)
  Processing page 350/4430... (Rows written so far: 0, Elapsed: 210.83s)
  Processing page 400/4430... (Rows written so far: 0, Elapsed: 229.79s)
  Processing page 450/4430... (Rows written so far: 0, Elapsed: 245.59s)
  Processing page 500/4430... (Rows written so far: 0, Elapsed: 260.24s)
  Processing page 550/4430... (Rows writt

CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox
CropBox missing from /Page, defaulting to MediaBox


Database connection closed.

Successfully processed PDF. 68051 HTS data rows written to SQLite.
Data saved to table 'hts_entries' in database '../data/processed/fill_hts_schedule_data.db'


In [4]:
import pandas as pd
import sqlite3

conn = sqlite3.connect(DB_NAME)
df = pd.read_sql_query("SELECT * FROM hts_entries", conn)
conn.close()

In [5]:
df.shape

(68051, 9)