In [1]:
import pdfplumber
import pandas as pd

def extract_tables_from_pdf(pdf_path):
    tables = []
    with pdfplumber.open(pdf_path) as pdf:
        for page_number, page in enumerate(pdf.pages, start=1):
            print(f"Processing page {page_number}...")
            # Extract tables from the page
            page_tables = page.extract_tables()
            for i, table in enumerate(page_tables):
                print(f"Extracted table {i + 1} from page {page_number}")
                # Convert the table into a Pandas DataFrame
                df = pd.DataFrame(table)
                tables.append(df)
                print(df.head())  # Display the first few rows of the table
    return tables

# Example usage
pdf_path = "example.pdf"  # Replace with your PDF file
tables = extract_tables_from_pdf(pdf_path)

# Save the first table to a CSV file (optional)
if tables:
    tables[0].to_csv("extracted_table.csv", index=False)
    print("First table saved to 'extracted_table.csv'")
else:
    print("No tables found in the PDF.")


FileNotFoundError: [Errno 2] No such file or directory: 'example.pdf'

In [8]:
showConfidence = True

from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential

# Azure credentials
endpoint = "https://westeurope.api.cognitive.microsoft.com/"
key = "7614bee5e8c042439d637938ae2bb3af"

# Initialize the Document Analysis Client
document_analysis_client = DocumentAnalysisClient(
    endpoint=endpoint, credential=AzureKeyCredential(key)
)

# Path to your PDF document
document_path = "9484630 Barnehurst Junior School Risk Assessment 22.pdf"

# Read the document content
with open(document_path, "rb") as f:
    document = f.read()

# Analyze the document using the prebuilt-document model to detect explicit tables
poller = document_analysis_client.begin_analyze_document(
    model_id="prebuilt-document", document=document
)
result = poller.result()

# Print detected tables
if result.tables:
    print("\nDetected Tables:")
    for idx, table in enumerate(result.tables):
        print(f"\nTable {idx + 1}:")
        print(f"Dimensions: {table.row_count} rows x {table.column_count} columns")
        
        # Initialize a 2D grid to store table data
        table_data = [["" for _ in range(table.column_count)] for _ in range(table.row_count)]
        
        # Populate the grid with cell content
        for cell in table.cells:
            table_data[cell.row_index][cell.column_index] = cell.content
        
        # Print the table content row by row
        for row in table_data:
            print("\t".join(row))
else:
    print("No tables detected in the document.")


###############

# Analyze the document using layout mode for more granular structure
poller = document_analysis_client.begin_analyze_document(
    model_id="prebuilt-layout", document=document
)
layout_result = poller.result()

# Gather detected table regions
detected_table_regions = []
for table in result.tables:
    for region in table.bounding_regions:
        detected_table_regions.append(region)

# Analyze lines to identify high-density regions (potential missing tables)
potential_table_regions = []

for page in layout_result.pages:
    print(f"Analyzing Page {page.page_number} for potential tables...")
    line_density_threshold = 5  # Number of lines per region to consider as a table
    line_groups = {}

    # Group lines by proximity based on spans
    for line in page.lines:
        # Use the span's offset as a unique identifier
        line_span = line.spans[0]  # Assuming one span per line
        span_start = line_span.offset
        span_end = span_start + line_span.length

        # Group lines based on vertical proximity (using span offsets)
        found_group = False
        for group_key in line_groups:
            group_start, group_end = group_key
            if abs(group_start - span_start) < 10 or abs(group_end - span_end) < 10:
                line_groups[group_key].append(line)
                found_group = True
                break

        if not found_group:
            line_groups[(span_start, span_end)] = [line]

    # Check each group for line density
    for group_key, lines in line_groups.items():
        if len(lines) > line_density_threshold:
            potential_table_regions.append((page.page_number, group_key, lines))

# Compare detected tables with potential table regions
missing_tables = []
for page_number, group_key, lines in potential_table_regions:
    span_start, span_end = group_key
    is_detected = False
    for detected_region in detected_table_regions:
        if (
            detected_region.page_number == page_number
            and detected_region.bounding_box[0][1] <= span_start
            and detected_region.bounding_box[2][1] >= span_end
        ):
            is_detected = True
            break

    if not is_detected:
        missing_tables.append((page_number, span_start, span_end, lines))

# Output missing tables
if missing_tables:
    print("\nPotential missing tables identified:")
    for page_number, span_start, span_end, lines in missing_tables:
        print(f"Page {page_number}, Region: {span_start} - {span_end}")
        for line in lines:
            print(f"  Line: {line.content}")
else:
    print("\nNo potential missing tables found.")



################
# # Analyze the document using the prebuilt model
# poller = document_analysis_client.begin_analyze_document(
#     model_id="prebuilt-layout", document=document
# )
# result = poller.result()

# low_confidence_flag = False
# low_confidence_threshold = 0.5  # Adjust as needed

# # Iterate through detected tables
# for table_idx, table in enumerate(result.tables):
#     print(f"\nChecking Table {table_idx + 1}:")
#     for cell in table.cells:
#         # Collect confidences of words within the cell
#         cell_confidences = []

#         if cell.spans:
#             for span in cell.spans:
#                 span_start = span.offset
#                 span_end = span.offset + span.length
#                 for page in result.pages:
#                     for word in page.words:
#                         word_start = word.span.offset
#                         word_end = word.span.offset + word.span.length
#                         if word_start >= span_start and word_end <= span_end:
#                             cell_confidences.append(word.confidence)

#         # Calculate the average confidence for the cell
#         if cell_confidences:
#             avg_confidence = sum(cell_confidences) / len(cell_confidences)
#         else:
#             avg_confidence = None

#         # Flag if confidence is below the threshold
#         if avg_confidence is not None and avg_confidence < low_confidence_threshold:
#             low_confidence_flag = True
#             print(
#                 f"Low-confidence cell detected in Table {table_idx + 1} "
#                 f"at Row {cell.row_index + 1}, Column {cell.column_index + 1} "
#                 f"(Conf: {avg_confidence:.2f}): {cell.content}"
#             )

# if low_confidence_flag:
#     print("\nWarning: Some tables in the document contain low-confidence cells.")
# else:
#     print("\nAll detected tables have acceptable confidence levels.")

##########

# # Iterate through tables and extract cell data with confidence scores
# for idx, table in enumerate(result.tables):
#     print(f"\nTable {idx + 1}:")

#     # Get the number of rows and columns
#     row_count = table.row_count
#     column_count = table.column_count
#     print(f"Dimensions: {row_count} rows x {column_count} columns")

#     # Create a 2D list to hold cell contents and confidences
#     cells = [["" for _ in range(column_count)] for _ in range(row_count)]
#     confidences = [[None for _ in range(column_count)] for _ in range(row_count)]

#     # Populate the cells and confidences
#     for cell in table.cells:
#         row = cell.row_index
#         column = cell.column_index
#         cells[row][column] = cell.content

#         # Initialize a list to hold confidences of words in the cell
#         cell_confidences = []

#         # Access the elements (words) within the cell
#         if cell.spans:
#             for span in cell.spans:
#                 # Get the start and end offsets of the span
#                 span_start = span.offset
#                 span_end = span.offset + span.length

#                 # Find words within the span
#                 for page in result.pages:
#                     for word in page.words:
#                         word_start = word.span.offset
#                         word_end = word.span.offset + word.span.length
#                         if (word_start >= span_start) and (word_end <= span_end):
#                             cell_confidences.append(word.confidence)
#         # Calculate average confidence
#         if cell_confidences:
#             avg_confidence = sum(cell_confidences) / len(cell_confidences)
#         else:
#             avg_confidence = None

#         confidences[row][column] = avg_confidence


#     # Print the table with cell contents and confidence scores
#     for row_idx in range(row_count):
#         row_cells = cells[row_idx]
#         row_confidences = confidences[row_idx]
#         row_output = ""
#         for col_idx in range(column_count):
#             cell_text = row_cells[col_idx]
#             confidence = row_confidences[col_idx]
#             if confidence is not None:
#                 if showConfidence:
#                     row_output += f"[{cell_text} (Conf: {confidence:.2f})]\t"
#                 else:
#                     row_output += f"[{cell_text}]\t"
#             else:
#                 if showConfidence:
#                     row_output += f"[{cell_text} (Conf: N/A)]\t"
#                 else:
#                     row_output += f"[{cell_text}]\t"
#         print(row_output)

    
# print("--------------------------------")

# low_confidence_flag = False

# for page in result.pages:
#     for word in page.words:
#         if word.confidence < 0.5:  # Adjust threshold as needed
#             low_confidence_flag = True
#             print(f"Low-confidence word detected: {word.content} (Conf: {word.confidence:.2f})")

# if low_confidence_flag:
#     print("Warning: Some areas in the document have low confidence.")



Detected Tables:

Table 1:
Dimensions: 21 rows x 2 columns
Management Details	
This risk assessment has been conducted on behalf of:	T & D Barrs Plumbing & Heating Ltd
	Unit 7 Thundridge Business Park Thundridge Nr Ware Hertfordshire
	SG12 0SS
	
Site Details	
This risk assessment relates to the hot and cold water services	Barnehurst Junior School Barnehurst Close
	Erith
for the following site:	DA8 3NL
Site Contact	Mr L Polden
Responsible Person	Mr L Polden
Date of Survey	29/10/2022
Surveyor(s)	Chris Helmore, Jamie Helmore
Survey Review Date	29/10/2024
	
Administrative Details	
Job Reference	9484630
Risk Assessment Produced by	Chris Helmore
Helmore Water Site Contacts	lan Helmore - 07778 381851
	Chris Helmore - 07540 403870
	Jamie Helmore - 07810 501335 Head Office - 01462 895588

Table 2:
Dimensions: 7 rows x 2 columns
General Building information	
Site name:	Barnehurst Junior School
Building Use:	School
Number of Floors:	2
Age of Building	1920s
Building Occupancy	Occupied on a daily 

In [9]:
import boto3
import io
from PIL import Image
import streamlit as st

def upload_to_s3(uploaded_file, bucket):
    """
    Upload a local file to S3
    
    Args:
        uploaded_file: File uploaded by user
        bucket (str): S3 bucket name
    
    Returns:
        str: S3 object key
    """
    # s3_client = boto3.client('s3')

    s3_client = boto3.client(
        's3',
        aws_access_key_id='AKIAYXWBNYPDQSE47NE7',
        aws_secret_access_key='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',
        region_name='us-east-1'
    )
    
    
    # Generate a unique filename
    filename = f"uploads/{uploaded_file.name}"
    
    # Upload file to S3
    s3_client.upload_fileobj(uploaded_file, bucket, filename)
    
    return filename

def extract_tables_from_pdf(bucket, document):
    """
    Extract tables from a PDF document stored in an S3 bucket
    
    Args:
        bucket (str): S3 bucket name
        document (str): PDF document path in S3
    
    Returns:
        list: Extracted tables
    """
    # Create Textract client
    textract = boto3.client('textract')
    
    try:
        # Analyze document for tables
        response = textract.analyze_document(
            Document={
                'S3Object': {
                    'Bucket': bucket,
                    'Name': document
                }
            },
            FeatureTypes=['TABLES']
        )
        
        # Process and extract tables
        tables = []
        for page in response['Blocks']:
            if page['BlockType'] == 'TABLE':
                current_table = []
                
                # Get table cells
                table_cells = [cell for cell in response['Blocks'] 
                               if cell['BlockType'] == 'CELL' and cell['TableIndex'] == page['TableIndex']]
                
                # Sort cells by row and column
                table_cells.sort(key=lambda x: (x['RowIndex'], x['ColumnIndex']))
                
                # Track current row
                current_row = []
                prev_row_index = 1
                
                for cell in table_cells:
                    # Extract cell text
                    cell_text = ''
                    for relationship in cell.get('Relationships', []):
                        if relationship['Type'] == 'CHILD':
                            for child_id in relationship['Ids']:
                                for word in response['Blocks']:
                                    if word['Id'] == child_id and word['BlockType'] == 'WORD':
                                        cell_text += word['Text'] + ' '
                    cell_text = cell_text.strip()
                    
                    # Handle row changes
                    if cell['RowIndex'] != prev_row_index:
                        if current_row:
                            tables.append(current_row)
                        current_row = []
                        prev_row_index = cell['RowIndex']
                    
                    current_row.append(cell_text)
                
                # Add last row
                if current_row:
                    tables.append(current_row)
        
        return tables
    
    except Exception as e:
        st.error(f"Error extracting tables: {e}")
        return []

def print_tables(tables):
    """
    Print extracted tables in a readable format
    
    Args:
        tables (list): List of tables to print
    """
    if not tables:
        st.warning("No tables found.")
        return
    
    st.write(f"Total Tables Found: {len(tables)}")
    
    for i, table in enumerate(tables, 1):
        st.write(f"\n### Table {i}")
        
        # Display table using Streamlit
        st.dataframe(table)

def main():
    st.title("Amazon Textract PDF Table Extractor")
    
    # AWS Configuration
    BUCKET_NAME = st.secrets["AWS_S3_BUCKET"]
    
    # File uploader
    uploaded_file = st.file_uploader("Choose a PDF file", type="pdf")
    
    if uploaded_file is not None:
        # Upload file to S3
        try:
            st.info("Uploading file to S3...")
            s3_filename = upload_to_s3(uploaded_file, BUCKET_NAME)
            st.success("File uploaded successfully!")
            
            # Extract tables
            st.info("Extracting tables...")
            tables = extract_tables_from_pdf(BUCKET_NAME, s3_filename)
            
            # Print tables
            print_tables(tables)
        
        except Exception as e:
            st.error(f"An error occurred: {e}")

if __name__ == "__main__":
    main()

2024-12-10 14:27:23.806 
  command:

    streamlit run /Users/roelrotteveel/Documents/Odyss/Legionella/venv/lib/python3.11/site-packages/ipykernel_launcher.py [ARGUMENTS]
