<a href="https://colab.research.google.com/github/dfulmer/pita-colab/blob/main/Amazon_Invoices_Processing_Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Step One
First, upload the PDF(s) you want to process. Click the Files icon on the left to open the Files window, then drag and drop the file(s) in or click the "Upload to session storage" icon.

## Step Two
Click the arrow below to produce the EDI files.

In [None]:
!pip install pypdf
import logging
import os
import re
import sys
from datetime import datetime
from pathlib import Path

from pypdf import PdfReader


class PDFInvoiceScanner:
    def __init__(self, log_file: str = "pita.log"):
        self.log_file = log_file
        self.logger = self.setup_logger(log_file)
        self.directory = Path(".")

    def setup_logger(self, log_file: str) -> logging.Logger:
        logger_name = log_file
        logger = logging.getLogger(logger_name)
        logger.setLevel(logging.INFO)

        # Remove all handlers if they exist (important for pytest isolation)
        logger.handlers.clear()

        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            "%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # Create console handler
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(logging.INFO)
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)

        return logger

    def log(self, message):
        self.logger.info(message)

    def get_pdf_files(self):
        return list(self.directory.glob("*.pdf"))

    def extract_text(self, file_path):
        reader = PdfReader(str(file_path))
        return "\n".join(
            page.extract_text(extraction_mode="layout") or "" for page in reader.pages
        )

    def extract_invoices(self, text):
        pattern = re.compile(r"Invoice\s+Invoice #\s+[\w-]+\s+\| .*?FAQs", re.DOTALL)
        return re.findall(pattern, text)

    def extract_invoice_number(self, invoice_text):
        """
        Extracts the invoice number from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice number, or None if not found.
        """
        match = re.search(r"Invoice\s+Invoice #\s+([\w-]+)", invoice_text)
        if match:
            return match.group(1)
        else:
            return None

    def extract_invoice_date(self, invoice_text):
        """
        Extracts the invoice date from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice date, or None if not found.
        """
        match = re.search(
            r"Invoice\s+Invoice #\s+[\w-]+\s+\|\s*(\w+ \d\d, \d\d\d\d).*", invoice_text
        )
        if match:
            return match.group(1)
        else:
            return None

    def extract_invoice_purchase_order_line_number(self, invoice_text):
        """
        Extracts the invoice purchase order line number from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice purchase order line number, or None if not found.
        """
        match = re.search(
            r"Invoice\s+Invoice #\s+[\w-]+\s+\|\s*\w+ \d\d, \d\d\d\d.*PO #\s+([\w-]+)\s.*?",
            invoice_text,
            re.DOTALL,
        )
        if match:
            return match.group(1)
        else:
            return None

    def extract_invoice_total(self, invoice_text):
        """
        Extracts the invoice total from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice total, or None if not found.
        """
        match = re.search(
            r"Invoice\s+Invoice #\s+[\w-]+\s+\|\s*\w+ \d\d, \d\d\d\d.*PO #\s+[\w-]+\s.*?Amount due\s*\$\s?(\d+.\d+)",
            invoice_text,
            re.DOTALL,
        )
        if match:
            return match.group(1)
        else:
            return None

    def extract_invoice_shipping_and_handling(self, invoice_text):
        """
        Extracts the invoice shipping and handling from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice shipping and handling, or None if not found.
        """
        match = re.search(
            r"Shipping & handling\s+\$\s+([\d.]+)", invoice_text, re.DOTALL
        )
        if match:
            return match.group(1)
        else:
            return None

    def extract_invoice_quantity(self, invoice_text):
        """
        Extracts the invoice quantity from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice quantity, or None if not found.
        """
        match = re.search(
            r"Invoice details.*?([\d]+)\s+\$[\d.]+\s+\$[\d.]+\s+[\d.]+%\s+",
            invoice_text,
            re.DOTALL,
        )
        if match:
            return match.group(1)
        else:
            # default to 1
            return "1"

    def extract_invoice_line_price(self, invoice_text):
        """
        Extracts the invoice line price from the given invoice text using a regular expression.

        Args:
          invoice_text (str): The text of the invoice.

        Returns:
          str or None: The extracted invoice line price, or None if not found.
        """
        match = re.search(
            r"Invoice details.*?[\d]+\s+\$[\d.]+\s+\$([\d.]+)\s+[\d.]+%\s+",
            invoice_text,
            re.DOTALL,
        )
        if match:
            return match.group(1)
        else:
            return None

    def extract_yyyymmdd(self, invoice_date):
        try:
            date_object = datetime.strptime(invoice_date, "%B %d, %Y")
            invoicedateyyyymmdd = date_object.strftime("%Y%m%d")
            return invoicedateyyyymmdd
        except Exception:
            return None

    def get_missing_invoice_fields(
        self,
        invoice_number,
        invoice_date,
        invoice_purchase_order_line_number,
        invoice_total,
        invoice_shipping_and_handling,
        invoice_quantity,
        invoice_line_price,
        invoice_date_yyyymmdd,
    ):
        """
        Returns a list of missing invoice fields.
        """
        missing = []
        if invoice_number is None:
            missing.append("invoice_number")
        if invoice_date is None:
            missing.append("invoice_date")
        if invoice_purchase_order_line_number is None:
            missing.append("invoice_purchase_order_line_number")
        if invoice_total is None:
            missing.append("invoice_total")
        if invoice_shipping_and_handling is None:
            missing.append("invoice_shipping_and_handling")
        if invoice_quantity is None:
            missing.append("invoice_quantity")
        if invoice_line_price is None:
            missing.append("invoice_line_price")
        if invoice_date_yyyymmdd is None:
            missing.append("invoice_date_yyyymmdd")
        return missing

    def run(self):
        # Removed: filename = os.path.basename(__file__)
        self.log("Starting PDF Invoice Scanner") # Modified log message
        pdfs = self.get_pdf_files()
        if not pdfs:
            self.log("No PDF files found.")
            return

        for pdf in pdfs:
            self.log(f"Processing {pdf.name}")
            invoice_counter = 0
            text = self.extract_text(pdf)
            invoice_list = self.extract_invoices(text)

            if not invoice_list:
                self.log(f"No invoices found in {pdf.name}")
                continue  # Skip to next PDF

            for invoice in invoice_list:
                invoice_counter += 1

                invoice_number = self.extract_invoice_number(invoice)
                invoice_date = self.extract_invoice_date(invoice)
                invoice_purchase_order_line_number = (
                    self.extract_invoice_purchase_order_line_number(invoice)
                )
                invoice_total = self.extract_invoice_total(invoice)
                invoice_shipping_and_handling = (
                    self.extract_invoice_shipping_and_handling(invoice)
                )
                invoice_quantity = self.extract_invoice_quantity(invoice)
                invoice_line_price = self.extract_invoice_line_price(invoice)
                invoice_date_yyyymmdd = self.extract_yyyymmdd(invoice_date)

                missing = self.get_missing_invoice_fields(
                    invoice_number,
                    invoice_date,
                    invoice_purchase_order_line_number,
                    invoice_total,
                    invoice_shipping_and_handling,
                    invoice_quantity,
                    invoice_line_price,
                    invoice_date_yyyymmdd,
                )
                if missing:
                    self.log(
                        f"Missing fields in {pdf.name} invoice number {invoice_counter}: {', '.join(missing)}"
                    )
                    continue  # on to the next invoice

                edifile = f"""UNA:+.? '
UNB+UNOC:3+AMAZ:31B+LIBRDMF:ZZ+240101:0000+110'
UNH++INVOIC:D:96A:UN:EAN008'
BGM+380+{invoice_number}'
DTM+137:{invoice_date_yyyymmdd}:102'
CUX+2:USD:4'
ALC+C++++DL::28:Freight Charges'
MOA+8:{invoice_shipping_and_handling}'
LIN+1'
QTY+47:{invoice_quantity}'
MOA+203:{invoice_line_price}'
PRI+AAB:{invoice_line_price}'
RFF+LI:{invoice_purchase_order_line_number}'
UNS+S'
CNT+2:2'
MOA+79:{invoice_total}'
MOA+9:{invoice_total}'
UNT+23+1'
UNZ+1+1'
"""

                edi_filename = self.directory / f"{invoice_number}.edi"
                try:
                    with open(edi_filename, "w", encoding="utf-8") as edi_file:
                        edi_file.write(edifile)
                    self.log(f"Successfully wrote {edi_filename}")
                except Exception as e:
                    self.log(f"Error writing {edi_filename}: {e}")

            self.log(f"Finished with pdf file: {pdf.name}. Invoices: {invoice_counter}")


if __name__ == "__main__":
    scanner = PDFInvoiceScanner()
    scanner.run()

## Step Three
After the arrow icon above stops spinning, click the arrow below to upload the EDI files.

In [None]:
!pip install paramiko

import paramiko
import os
import glob # Import the glob module
from google.colab import userdata # Import userdata to access secrets

# --- 1. Establish SSH Client and SFTP Connection ---
hostname = 'MY_HOSTNAME'
port = 10022
username = 'MY_USERNAME'

# Define the destination
local_directory = '.' # Represents the current working directory in Colab

# Comment out 2 of the options below.
# remote_base_path = 'sandbox/edi/' # The remote directory where EDI files will be stored. This is just for testing.
# remote_base_path = 'sandbox/edi/amazedi/tester/' # This is the folder to use in the Sandbox
remote_base_path = 'production/edi/amazedi' # This is the folder to use in Production

# Retrieve the private key content from Colab Secrets
private_key_content = userdata.get('ExL_sftp_is_rsa_file')
private_key_passphrase = None # Set to your passphrase if your key is encrypted

# Write the private key content to a temporary file
private_key_path = 'temp_id_rsa'
with open(private_key_path, 'w') as f:
    f.write(private_key_content)

# Create an SSH client instance
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())  # Accepts host keys automatically

# Connect to the SFTP server using the private key
try:
    # Load the private key explicitly
    private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=private_key_passphrase)
    print("Private key loaded successfully!")

    # Connect to the SFTP server using the loaded private key
    ssh_client.connect(
        hostname,
        port,
        username=username,
        pkey=private_key  # Use the loaded private key object
    )
    print("SSH connection established successfully using private key!")

    # Open an SFTP session
    sftp_client = ssh_client.open_sftp()
    print("SFTP session opened.")

    # --- Check and create remote directory if it doesn't exist ---
    try:
        sftp_client.stat(remote_base_path)
        print(f"Remote directory '{remote_base_path}' already exists.")
    except FileNotFoundError:
        print(f"Remote directory '{remote_base_path}' not found. Creating it...")
        sftp_client.mkdir(remote_base_path)
        print(f"Remote directory '{remote_base_path}' created.")
    except Exception as e:
        print(f"Error checking or creating remote directory: {e}")
        # Depending on the error, you might want to exit or handle it differently
        sftp_client.close()
        ssh_client.close()
        if os.path.exists(private_key_path):
            os.remove(private_key_path)
        exit() # Exit if directory handling fails

    # --- 2. Perform File Transfers: Upload all .edi files ---

    # Find all .edi files in the local directory
    edi_files = glob.glob(os.path.join(local_directory, '*.edi'))

    if not edi_files:
        print(f"No .edi files found in the local directory: {local_directory}")
    else:
        print(f"Found {len(edi_files)} .edi files to upload.")
        for local_file_path in edi_files:
            # Extract just the filename from the full path
            filename = os.path.basename(local_file_path)

            # Construct the remote file path
            remote_upload_path = os.path.join(remote_base_path, filename).replace('\\', '/') # Ensure forward slashes for remote path

            try:
                sftp_client.put(local_file_path, remote_upload_path)
                print(f"File '{filename}' uploaded to '{remote_upload_path}'")
            except Exception as e:
                print(f"Error uploading '{filename}': {e}")

    # --- 3. Close Connections ---
    sftp_client.close()
    print("SFTP session closed.")
    ssh_client.close()
    print("SSH connection closed.")

except paramiko.AuthenticationException:
    print("Authentication failed. Please verify your username, private key path, and passphrase (if applicable).")
except paramiko.SSHException as e:
    print(f"SSH connection failed: {e}")
except Exception as e:
    print(f"An error occurred: {e}")

# Clean up the temporary private key file
finally:
    if os.path.exists(private_key_path):
        os.remove(private_key_path)
        print(f"Temporary private key file '{private_key_path}' removed.")

## Step Four
After the arrow icon above stops spinning, run the EDI job in Alma.
In Alma, go to **Vendors > All > AMAZ**, click **AMAZ**, open the **EDI Information** tab, and click **Run Now** to start the EDI import job.


## Troubleshooting
To start fresh, click on **Runtime > Disconnect and delete runtime**. Click **Yes**. This will delete any files including any PDF(s) uploaded and any .edi files created.

Then click Reconnect in the upper right hand corner and start over from the top.
