In [None]:
!pip install pdfplumber


In [None]:
!pip install tabula-py

# LinkedIn Invoice PDF Table Extraction

In [21]:
import pdfplumber # Library for extraction from PDF files
import pandas as pd
import re
import os

**Goal**: To extract line-item data from LinkedIn PDF invoice and sum up campaigns per region, and exported as an Excel file

This code has been assisted by AI [Claude 3.7 Sonnet & Gemini 2.5 pro preview 05-06]

Disclaimer: There are some unnecessary lines of code for verification processes that I decided to leave to use as references for future projects

**Important**: the code consider the next specific formats:
- Campaign name format: "region_channel_name_date"
- A LinkedIn invoice of 5 pages (If the invoice is larger, you should modify the "if" condition in line 47 of the Main Script)

In [22]:
PDF_FILE_PATH = "SampleInvoice_LinkedIn.pdf"

## Helper Functions

In [23]:
# --- Helper Functions (mostly unchanged) ---
# Extract the region from the campaign name
def extract_region(desc_text):
    if not isinstance(desc_text, str) or not desc_text.strip(): # Is actually a string
        return 'unknown'
    # This is because the "Description" cell can have multiple lines
    first_line = desc_text.split('\n')[0].strip()
    # Campaign name format: "region_channel_name_date"
    match = re.search(r'Campaign:\s*([a-zA-Z0-9_]+)', first_line, re.IGNORECASE) # Match object
    if match:
        full_campaign_tag = match.group(1)
        region = full_campaign_tag.split('_')[0]
        return region.lower()
    return 'unknown'

# Convert monetary amount into float
def clean_billed_amount(amount_text):
    if pd.isna(amount_text):
        return 0.0
    if isinstance(amount_text, (int, float)):
        return float(amount_text)
    if isinstance(amount_text, str):
        try:
            return float(amount_text.replace(',', '').replace('\n', '').strip())
        except ValueError:
            print(f"Warning: Could not convert '{amount_text}' to float.")
            return 0.0
    return 0.0

# Find a desired column name within a list of extracted column names
# NOTE: Only useful when the header names are not defined
def find_column_name(available_cols, target_keywords, exact_match_first=None):
    #Robustly finds a column name
    if not available_cols: return None

    cleaned_available_cols = [str(col).replace('\n', ' ').strip() for col in available_cols if col]
    # specific name to prioritize if found exactly
    if exact_match_first:
        for col_name in cleaned_available_cols:
            if exact_match_first.lower() == col_name.lower():
                return col_name # Return the cleaned version that matched

    for keyword in target_keywords:
        for col_name in cleaned_available_cols:
            if keyword.lower() in col_name.lower():
                return col_name # Return the cleaned version that matched
    return None

## Main Script

In [24]:
# -- Main Script --
if not os.path.exists(PDF_FILE_PATH):
    print(f"Error: PDF file not found at '{PDF_FILE_PATH}'")
    exit()

all_extracted_dataframes = []
# List of header keywords expected
TABLE_HEADER_KEYWORDS = ["Line", "Description", "Qty", "Unit Price", "Billed Amount"] # Key parts of the target table header

print(f"Opening PDF: {PDF_FILE_PATH}")

try:
    with pdfplumber.open(PDF_FILE_PATH) as pdf:
        pages_to_process = pdf.pages[:-1] # Exclude the last LinkedIn Invoice summary page

        for i, page in enumerate(pages_to_process):
            page_num_for_display = i + 1
            print(f"--- Processing Page {page_num_for_display}/{len(pages_to_process)} ---")

            table_data = None
            crop_box = None

            ## 1. Find the top of the actual table by looking for its header ##
            header_line_top = None
            # Using extract_words for more granular control if needed, but lines is often good.
            # Each item in text_lines is a dictionary containing information about the line
            text_lines = page.extract_text_lines(layout=True, strip=True, return_chars=False)
            if not text_lines: text_lines = [] # Ensure it's iterable

            for line_info in text_lines:
                line_text_lower = line_info["text"].lower()
                # Check if a good number of table header keywords are in this line
                if sum(keyword.lower() in line_text_lower for keyword in TABLE_HEADER_KEYWORDS) >= 3: # Heuristic
                    header_line_top = line_info["top"] # Stores y-coordinate of this presumed header line
                    print(f"  Found potential table header line: '{line_info['text']}' at top: {header_line_top}")
                    break

            if header_line_top:
              # *PDF COORDINATES: in pdfplumber, the origin [0,0] is at the bottom-left of the page.
                # 2. Define crop box
                crop_y_start = header_line_top - 10 # Start slightly above the detected header
                crop_y_end = page.height * 0.92    # Default end, before typical footers

                ## -- NOTE:  adjust this part depending on how many pages the LinkedIn invoice has
                ## and where the table ends
                # Adjust crop_y_end for page 5 specifically
                if page_num_for_display == 5:
                    temp_crop_y_end_page5 = crop_y_end # Start with default
                    for line_info in text_lines:
                        line_text_lower = line_info["text"].lower()
                        # Look for markers indicating end of table data on page 5
                        # "special instructions" the end of the table
                        if "special instructions" in line_text_lower or \
                           ("total" in line_text_lower and line_info["x0"] > page.width * 0.4): # "Total" on right side
                            # If this marker is above our current crop_y_start
                            if line_info["top"] > crop_y_start: # Ensure this end-of-table marker is below the starting point
                                temp_crop_y_end_page5 = min(temp_crop_y_end_page5, line_info["top"] - 5)
                    if temp_crop_y_end_page5 < crop_y_end : # If we found a better end
                        crop_y_end = temp_crop_y_end_page5
                        print(f"  Page 5: Adjusted crop_y_end to {crop_y_end} based on content.")

                # For a valid crop crop_y_end should be less than crop_y_start
                if crop_y_start < crop_y_end :
                  # pdfplumber's .crop() method expects (x0, top, x1, bottom)
                    crop_box = (page.bbox[0] + 10, crop_y_start, page.bbox[2] - 10, crop_y_end) # Slight L/R margin
                    print(f"  Applying crop_box: {crop_box}")

                    try:
                        cropped_page = page.crop(crop_box)

                        # 3. Extract table from the cropped page
                        # Try text-based strategy first on the cleaner, cropped area
                        crop_table_settings_text = {
                            "vertical_strategy": "text", "horizontal_strategy": "text",
                            "text_x_tolerance": 3, "text_y_tolerance": 3,
                            "snap_tolerance": 3, "join_tolerance": 3,
                        }
                        table_data = cropped_page.extract_table(table_settings=crop_table_settings_text)

                        # If text strategy fails or yields little, try lines strategy (as table has some lines)
                        if not table_data or len(table_data) < 2:
                            print(f"  Text strategy on cropped page yielded minimal data. Trying lines strategy...")
                            crop_table_settings_lines = {
                                "vertical_strategy": "lines", "horizontal_strategy": "lines",
                                "snap_tolerance": 3, "join_tolerance": 3,
                            }
                            table_data = cropped_page.extract_table(table_settings=crop_table_settings_lines)
                    except Exception as crop_ex:
                        print(f"  Error during cropping or extracting from cropped page: {crop_ex}")
                        table_data = None # Ensure table_data is None if crop fails
                else:
                    print(f"  Invalid crop box (start_y >= end_y) for page {page_num_for_display}. Crop_y_start: {crop_y_start}, Crop_y_end: {crop_y_end}. Skipping crop.")
            else:
                print(f"  Could not find table header keywords to define crop_top on page {page_num_for_display}.")


            # 4. Validate and process extracted table_data
            if table_data and len(table_data) >= 1:
                raw_header = table_data[0]
                data_rows = table_data[1:]
                cleaned_header = [str(col).replace('\n', ' ').strip() if col is not None else f"Unnamed_{j}" for j, col in enumerate(raw_header)]

                print(f"  Extracted with Cleaned Header: {cleaned_header}")

                # Validate if this header looks like our target table's header
                header_match_count = sum(keyword.lower() in str(ch).lower() for ch in cleaned_header for keyword in TABLE_HEADER_KEYWORDS)

                if header_match_count >= 3 and len(data_rows) > 0: # Expect at least 3 keyword matches
                    df_page = pd.DataFrame(data_rows, columns=cleaned_header)
                    all_extracted_dataframes.append(df_page)
                    print(f"  SUCCESS: Valid table extracted and added from page {page_num_for_display}.")

                else:
                    print(f"  WARNING: Extracted table header (match_count: {header_match_count}) does not sufficiently match target. Discarding.")
                    # For debugging:
                    # if 'cropped_page' in locals() and cropped_page:
                    #     img = cropped_page.to_image(resolution=150)
                    #     img.save(f"debug_discarded_crop_page_{page_num_for_display}.png")
                    #     print(f"  Saved debug_discarded_crop_page_{page_num_for_display}.png")
            else:
                print(f"  No valid table data extracted from page {page_num_for_display} with current methods.")
                # If crop was attempted but failed, or no header found, save an image of the full page for review
                # img = page.to_image(resolution=150)
                # img.save(f"debug_full_page_failed_extraction_{page_num_for_display}.png")
                # print(f"  Saved debug_full_page_failed_extraction_{page_num_for_display}.png for review.")

except Exception as e:
    print(f"\nAn error occurred during PDF processing loop: {e}")
    import traceback
    traceback.print_exc()

Opening PDF: SampleInvoice_LinkedIn.pdf
--- Processing Page 1/5 ---
  Found potential table header line: 'Line Description                        Qty    Unit Price Billed Amount VAT Amount' at top: 266.5621609450001
  Applying crop_box: (10, 256.5621609450001, 602, 728.6400561522)
  Extracted with Cleaned Header: ['Line', 'Description', 'Qty Unit', 'Price Billed', 'Amount VAT', 'Amount']
  SUCCESS: Valid table extracted and added from page 1.
--- Processing Page 2/5 ---
  Found potential table header line: 'Line Description                        Qty    Unit Price Billed Amount VAT Amount' at top: 123.762160945
  Applying crop_box: (10, 113.762160945, 602, 728.6400561522)
  Extracted with Cleaned Header: ['Line', 'Description', 'Qty Unit', 'Price Billed', 'Amount VAT', 'Amount']
  SUCCESS: Valid table extracted and added from page 2.
--- Processing Page 3/5 ---
  Found potential table header line: 'Line Description                        Qty    Unit Price Billed Amount VAT Amount' at t

In [29]:
# --- This section will now only proceed if tables were actually extracted ---
if not all_extracted_dataframes:
    print("\nCRITICAL ERROR: No tables were successfully extracted from the PDF after all attempts.")
    print("This means the cropping and table extraction logic could not identify and parse the target table.")
    print("Suggestions for debugging:")
    print("1. Uncomment the `.to_image().save(...)` lines in the loop to visually inspect what is being cropped and what `pdfplumber` 'sees'.")
    print("2. Check the `TABLE_HEADER_KEYWORDS` and the logic for finding `header_line_top` (e.g., the `sum(...) >= 3` heuristic).")
    print("3. Fine-tune the `crop_y_start` and `crop_y_end` calculations, especially the bottom boundary for page 5.")
    print("4. Experiment with different `crop_table_settings_*` on the `cropped_page`.")
    print("5. Ensure `pdfplumber` is up-to-date: `pip install --upgrade pdfplumber`.")
    exit() # Explicitly exit to prevent the pd.concat error

# Combine all successfully extracted DataFrames
df_combined = pd.concat(all_extracted_dataframes, ignore_index=True)

print("\n--- Combined DataFrame (First 5 rows before processing) ---")
print(df_combined.head())
print("\n--- Combined DataFrame Info ---")
df_combined.info()
## Renaming headers correctly
df_combined.columns = ['Line', 'Description', 'Qty', 'Unit Price', 'Billed Amount', 'VAT Amount']
print("\n--- Updated Column Names ---")
print(df_combined.columns)
print("\n--- Combined DataFrame Columns ---")
extracted_cols = df_combined.columns.tolist()
print(f"Original extracted columns: {extracted_cols}")


#NOTE: Only use if the column names change or there are variations with original names
# Dynamically find the actual column names for 'Description' and 'Billed Amount'
# using the cleaned column names we would have created during table processing
#actual_description_col_name = find_column_name(extracted_cols, ["description"], exact_match_first="Description")
#actual_billed_amount_col_name = find_column_name(extracted_cols, ["billed amount", "amount"], exact_match_first="Billed Amount")

# Using Col names
actual_description_col_name = "Description"
actual_billed_amount_col_name = "Billed Amount"

# Merge multi-line descriptions
print("\n--- Merging multi-line descriptions ---")
actual_line_col_name = "Line"





# Note: Leaving the If condition only for future references
# Fallback to positional guessing if keyword search fails (less ideal)
if not actual_description_col_name and len(extracted_cols) > 1:
    actual_description_col_name = extracted_cols[1] # Assuming Description is often 2nd col
    print(f"  Warning: 'Description' column identified by position: {actual_description_col_name}")
if not actual_billed_amount_col_name and len(extracted_cols) > 4:
    actual_billed_amount_col_name = extracted_cols[4] # Assuming Billed Amount is often 5th col
    print(f"  Warning: 'Billed Amount' column identified by position: {actual_billed_amount_col_name}")

if not actual_description_col_name:
    print(f"\nFATAL ERROR: 'Description' column could not be identified. Aborting. Available: {extracted_cols}")
    exit()
if not actual_billed_amount_col_name:
    print(f"\nFATAL ERROR: 'Billed Amount' column could not be identified. Aborting. Available: {extracted_cols}")
    exit()


print(f"\nUsing column '{actual_description_col_name}' for descriptions.")
print(f"Using column '{actual_billed_amount_col_name}' for billed amounts.")
print(f"Using column '{actual_line_col_name}' as the Line item identifier.")

# Ensure description and line columns exist
if actual_description_col_name not in df_combined.columns or actual_line_col_name not in df_combined.columns:
    print("Error: 'Description' or 'Line' column not found in df_combined. Skipping description merge.")
else:
    processed_rows = []
    current_description_parts = []
    current_line_item_data = None

    for index, row in df_combined.iterrows():
        line_value = row[actual_line_col_name]
        description_text = str(row[actual_description_col_name] or '').strip() # Handle None and strip

        # Check if line_value is not NaN, not None, and not an empty string
        is_new_line_item = pd.notna(line_value) and str(line_value).strip() != ""

        if is_new_line_item:
            # If there was a previous item being built, finalize and add it
            if current_line_item_data is not None:
                # Join collected description parts, ensuring no leading/trailing newlines from empty parts
                full_description = "\n".join(filter(None, current_description_parts)) # filter(None,...) removes empty strings
                current_line_item_data[actual_description_col_name] = full_description
                processed_rows.append(current_line_item_data)

            # Start a new item
            current_line_item_data = row.to_dict() # Store the entire row data for the main line item
            current_description_parts = [description_text] if description_text else [] # Start new list, only if text exists
        else:
            # This is a continuation of the description for the current_line_item
            if current_line_item_data is not None: # Only append if we are actively tracking an item
                if description_text: # Only append if there's actual text
                    current_description_parts.append(description_text)
            # else:
                # This is a row without a line number and no active item,
                # could be an orphaned description part.
                # Depending on requirements, you might want to handle this (e.g., log it or try to assign it)
                # For now, it will be ignored if it doesn't follow a line item.

    # Add the last processed item after the loop finishes
    if current_line_item_data is not None:
        full_description = "\n".join(filter(None, current_description_parts))
        current_line_item_data[actual_description_col_name] = full_description
        processed_rows.append(current_line_item_data)

    if processed_rows:
        df_combined = pd.DataFrame(processed_rows)
        print("  SUCCESS: Multi-line descriptions merged.")
        print("\n--- Combined DataFrame (First 5 rows after description merge) ---")
        print(df_combined.head())
        print("\n--- Combined DataFrame Info (After description merge) ---")
        df_combined.info()
    else:
        print("  WARNING: No rows were processed during description merge. df_combined might be empty or incorrectly structured.")


# --- Data Processing on the combined DataFrame ---
df_combined['Region'] = df_combined[actual_description_col_name].apply(extract_region)
df_combined['Processed Billed Amount'] = df_combined[actual_billed_amount_col_name].apply(clean_billed_amount)

print("\n--- DataFrame with 'Region' and 'Processed Billed Amount' (First 10 rows) ---")
# Ensure the columns exist before trying to display them
cols_to_display = []
if actual_description_col_name in df_combined.columns: cols_to_display.append(actual_description_col_name)
if 'Region' in df_combined.columns: cols_to_display.append('Region')
if actual_billed_amount_col_name in df_combined.columns: cols_to_display.append(actual_billed_amount_col_name)
if 'Processed Billed Amount' in df_combined.columns: cols_to_display.append('Processed Billed Amount')

if cols_to_display:
    print(df_combined[cols_to_display].head(10))
else:
    print("Could not display processed columns as key columns were not found.")


# Sum by region
# At this point df_combined should have Region and Billed Amount columns
# Just an extra verification
if 'Region' in df_combined.columns and 'Processed Billed Amount' in df_combined.columns:
    region_totals = df_combined.groupby('Region')['Processed Billed Amount'].sum().reset_index()
    region_totals = region_totals.sort_values(by='Processed Billed Amount', ascending=False)
    print("\n--- Totals by Region ---")
    print(region_totals)
else:
    print("\nCould not calculate totals by region due to missing 'Region' or 'Processed Billed Amount' columns.")

print("\n--- Script Finished ---")

# Export as Excel file
df_combined.to_excel("LinkedIn_Invoice_table2.xlsx", index=False)


--- Combined DataFrame (First 5 rows before processing) ---
  Line                               Description Qty Unit Price Billed  \
0                                                                        
1    1  Campaign: americas_paid-social_document-   4665.4            1   
2                                                                        
3                       management_gartner-mq-24                         
4                                                                        

  Amount VAT Amount  
0                    
1   4,665.40   0.00  
2                    
3             0.00%  
4                    

--- Combined DataFrame Info ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 434 entries, 0 to 433
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Line          434 non-null    object
 1   Description   434 non-null    object
 2   Qty Unit      434 non-null    object
 3   Price Bill