In [None]:
!pwd

In [None]:
import pandas as pd
import os
from azure.storage.blob import BlobServiceClient
from pypdf import PdfReader, PdfWriter
from io import BytesIO
import logging
from dotenv import load_dotenv

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

load_dotenv("../.env")

AZURE_ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
AZURE_SAS_TOKEN = os.getenv("AZURE_SAS_TOKEN")


class AzurePDFPageExtractor:
    def __init__(self, container_name, output_directory):
        """
        Initialize the PDF page extractor

        Args:
            connection_string (str): Azure Storage connection string
            container_name (str): Name of the blob container
            output_directory (str): Local directory to save extracted pages
        """
        account_url = f"https://{AZURE_ACCOUNT_NAME}.blob.core.windows.net"
        self.blob_service_client = BlobServiceClient(
            account_url=account_url,
            credential=AZURE_SAS_TOKEN,
            connection_timeout=60,
            read_timeout=300,
        )
        self.container_name = container_name
        self.output_directory = output_directory

        # Create output directory if it doesn't exist
        os.makedirs(output_directory, exist_ok=True)

    def extract_page_from_pdf(self, pdf_bytes, page_number):
        """
        Extract a specific page from PDF bytes

        Args:
            pdf_bytes (bytes): PDF file content as bytes
            page_number (int): Page number to extract (1-based)

        Returns:
            bytes: PDF bytes containing only the specified page
        """
        try:
            # Read PDF from bytes
            pdf_reader = PdfReader(BytesIO(pdf_bytes))

            # Check if page number is valid
            if page_number > len(pdf_reader.pages) or page_number < 1:
                logger.warning(
                    f"Page {page_number} not found. PDF has {len(pdf_reader.pages)} pages."
                )
                return None

            # Create a new PDF with only the specified page
            pdf_writer = PdfWriter()
            pdf_writer.add_page(
                pdf_reader.pages[page_number - 1]
            )  # Convert to 0-based index

            # Write to bytes
            output_buffer = BytesIO()
            pdf_writer.write(output_buffer)
            return output_buffer.getvalue()

        except Exception as e:
            logger.error(f"Error extracting page {page_number}: {str(e)}")
            return None

    def download_and_extract_page(
        self, file_path, page_number, output_filename, document_type
    ):
        """
        Download file from Azure Blob and extract specific page

        Args:
            file_path (str): Path to file in blob storage
            page_number (int): Page number to extract
            output_filename (str): Name for the output file
            document_type (str): Type of document (for logging purposes)

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Download blob
            blob_client = self.blob_service_client.get_blob_client(
                container=self.container_name, blob=file_path
            )

            logger.info(f"Downloading {file_path}...")
            blob_data = blob_client.download_blob().readall()

            # Extract the specified page
            page_pdf_bytes = self.extract_page_from_pdf(blob_data, page_number)
            os.makedirs(
                os.path.join(self.output_directory, "pdf", document_type), exist_ok=True
            )

            if page_pdf_bytes:
                # Save the extracted page
                output_path = os.path.join(
                    self.output_directory, "pdf", document_type, output_filename
                )
                with open(output_path, "wb") as f:
                    f.write(page_pdf_bytes)

                logger.info(
                    f"Successfully extracted page {page_number} to {output_path}"
                )
                return True
            else:
                logger.error(f"Failed to extract page {page_number} from {file_path}")
                return False

        except Exception as e:
            logger.error(f"Error processing {file_path}: {str(e)}")
            return False

    def process_csv_file(self, csv_file_path):
        """
        Process the CSV file and extract pages for each file

        Args:
            csv_file_path (str): Path to the CSV file
        """
        try:
            # Read the CSV file
            df = pd.read_csv(csv_file_path)

            # Validate required columns
            required_columns = ["filename", "page_number"]
            if not all(col in df.columns for col in required_columns):
                raise ValueError(f"CSV must contain columns: {required_columns}")

            success_count = 0
            total_files = len(df)

            for index, row in df.iterrows():
                filename = row["filename"]
                page_number = int(row["page_number"])
                document_type = (
                    row["document_type"]
                    if not pd.isna(row["document_type"])
                    else "no_type"
                )

                # Create output filename (original name + page number)
                base_name = os.path.splitext(os.path.basename(filename))[0]
                output_filename = f"{base_name}_page_{page_number}.pdf"

                if os.path.exists(
                    os.path.join(
                        self.output_directory, "pdf", document_type, output_filename
                    )
                ):
                    logger.info(
                        f"File {output_filename} already exists. Skipping download."
                    )
                    success_count += 1
                    continue

                logger.info(f"Processing file {index + 1}/{total_files}: {filename}")

                if self.download_and_extract_page(
                    filename, page_number, output_filename, document_type
                ):
                    success_count += 1

            logger.info(
                f"Processing complete! {success_count}/{total_files} files processed successfully."
            )

        except Exception as e:
            logger.error(f"Error processing CSV file: {str(e)}")

In [None]:
def main():
    # Configuration
    CONTAINER_NAME = "bronze"
    CSV_FILE_PATH = "/Users/odunayoogundepo/Downloads/train_images_karanta/karanta_doc_stats - train_samples.csv"
    OUTPUT_DIRECTORY = "/Users/odunayoogundepo/Downloads/train_images_karanta"

    # Initialize extractor
    extractor = AzurePDFPageExtractor(
        container_name=CONTAINER_NAME, output_directory=OUTPUT_DIRECTORY
    )

    # Process the CSV file
    extractor.process_csv_file(CSV_FILE_PATH)


if __name__ == "__main__":
    main()

# Generate JSON Output Using Azure

In [None]:
# input_directory = "/Users/odunayoogundepo/Downloads/test_images_karanta/pdf"
# output_directory = "/Users/odunayoogundepo/Downloads/test_images_karanta/json_output"

input_directory = "/Users/odunayoogundepo/Downloads/train_images_karanta/pdf"
output_directory = "/Users/odunayoogundepo/Downloads/train_images_karanta/json_output"

document_types = ["legal"]

In [None]:
import os
import json
from pathlib import Path
from tqdm import tqdm

import sys

sys.path.append("..")

from karanta.data.utils import openai_response_format_schema_multipages
from karanta.data.test_prompts import (
    test_build_page_query_azure,
    test_build_page_query_openai,
)

prompt_path = "../configs/prompts/open_ai_data_generation.yaml"


async def run_extraction():
    for directory in document_types:
        dir_path = Path(os.path.join(input_directory, directory))

        if dir_path.exists() and dir_path.is_dir():
            num_files = len(os.listdir(dir_path))
            print(f"Number of files in {directory}: {num_files}")
        else:
            print(f"Directory {dir_path} does not exist.")

        for file_name in tqdm(os.listdir(dir_path)):
            if not file_name.lower().endswith(".pdf"):
                print(f"Skipping non-PDF file: {file_name}")
                continue

            input_file = dir_path / file_name

            file_name_stem = ".".join(file_name.split(".")[:-1])
            output_json = Path(output_directory) / directory / f"{file_name_stem}.json"

            if output_json.exists():
                print(f"Output JSON already exists for {file_name}, skipping...")
                continue

            print(f"Processing {file_name}...")

            try:
                ocr_output = await test_build_page_query_azure(
                    local_pdf_path=str(input_file),
                    page=1,
                    response_schema=openai_response_format_schema_multipages,
                    prompt_path=prompt_path,
                    convert_to_grayscale=True,
                    verbose=False,
                )
                output_json.parent.mkdir(parents=True, exist_ok=True)

                with open(output_json, "w") as f:
                    f.write(ocr_output[0].to_json())
            except Exception:
                try:
                    ocr_output = test_build_page_query_openai(
                        local_pdf_path=str(input_file),
                        page=1,
                        response_schema=openai_response_format_schema_multipages,
                        prompt_path=prompt_path,
                        convert_to_grayscale=True,
                        verbose=False,
                        model_name="gpt-4.1-2025-04-14",
                    )

                    ocr_output = {
                        "generation": json.loads(ocr_output.choices[0].message.content),
                        "model": ocr_output.model,
                    }

                    output_json.parent.mkdir(parents=True, exist_ok=True)
                    with open(output_json, "w") as f:
                        f.write(json.dumps(ocr_output, ensure_ascii=False))

                except Exception as e:
                    print(
                        f"Error processing {file_name} with both Azure and OpenAI: {e}"
                    )
                    continue


if __name__ == "__main__":
    await run_extraction()