Extracting Email Content

In [None]:
import imapclient
import email
from email import policy
from email.header import decode_header
import os
import pdfplumber
from PIL import Image
import pandas as pd

In [None]:
from imapclient import IMAPClient
def connect_to_email(username, password, imap_server):
    client = imapclient.IMAPClient("imap.gmail.com", ssl=True)
    client.login(username, password)
    return client

In [None]:
from email import message_from_bytes
from email.policy import default as email_policy

def fetch_emails(client, folder="INBOX", max_emails=10):
 
    client.select_folder(folder)
    
    # Fetch only unread emails in the Primary inbox
    messages = client.search(['X-GM-RAW', 'category:primary is:unread'])
    
    # Sort messages by UID in descending order to process the latest emails first
    messages = sorted(messages, reverse=True)
    
  
    processed_count = 0
    for uid in messages:
        if processed_count >= max_emails:  # Stop once the limit is reached
            break
        raw_message = client.fetch(uid, "RFC822")[uid][b"RFC822"]
        message = message_from_bytes(raw_message, policy=email_policy)
        processed_count += 1
        yield message


In [None]:
import re
def clean_email_text(email_body):
    cleaned_text = re.sub(r'\[Attachment: .*?\]', '', email_body)
    cleaned_text = ' '.join(cleaned_text.split())
    return cleaned_text

In [None]:
# Function to process each email and extract subject, body, and attachments
def process_email(message):
    subject = decode_header(message["subject"])[0][0]
    if isinstance(subject, bytes):
        subject = subject.decode()

    body = ""
    attachments = []
    has_attachments=False
    
    if message.is_multipart():
        for part in message.iter_parts():
            content_type = part.get_content_type()
            content_disposition = str(part.get("Content-Disposition"))

           
            if "text" in content_type:
                body += part.get_payload(decode=True).decode()
          
            if "attachment" in content_disposition:
                has_attachments = True
                filename = part.get_filename()
                attachment_data = part.get_payload(decode=True)
                attachments.append((filename, attachment_data))
    else:
        body = message.get_payload(decode=True).decode()

   

    body= clean_email_text(body)
    subject=clean_email_text(subject)
    return subject, body, attachments, has_attachments


In [None]:
def prepare_input(subject, body, has_attachment):
    attachment_feature = f" [ATTACHMENT_SIGNAL: {'PRESENT' if has_attachment else 'ABSENT'}]"
    combined_text = f"Subject Prefix: {'PO' if 'purchase order' in subject.lower() else 'GENERIC'} Subject: {subject} Body: {body}{attachment_feature}"
    return combined_text

def tokenize_input(text, tokenizer):
    return tokenizer(text, max_length=512, truncation=True, padding="max_length", return_tensors="pt")

Preparaing Data and Model Finetuning

In [None]:
from transformers import BertForSequenceClassification
import torch
from transformers import BertTokenizer

In [None]:
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [None]:
def load_data_from_csv(csv_file):
    
    df = pd.read_csv(csv_file)
    
    df = df.dropna(how='all')
    
    df['has_attachment'] = False  

    df.loc[df['label'] == 'PO', 'has_attachment'] = True
    
    non_po_indices = df[df['label'] == 'Non-PO'].index
    non_po_true_indices = df[df['label'] == 'Non-PO'].sample(frac=0.2, random_state=42).index
    df.loc[non_po_true_indices, 'has_attachment'] = True

    
    
    texts = df['text'].tolist()  
    labels = df['label'].tolist()
    flags=df['has_attachment'].tolist()
    
    return texts, labels, flags


In [None]:
from torch.utils.data import Dataset
import torch

class EmailDataset(Dataset):
    def __init__(self, texts, labels, flags, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.flags = flags 
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        
        text = self.texts[idx]
        label = self.labels[idx]
        flag = self.flags[idx]

        inputs = self.tokenizer(
            text, 
            truncation=True, 
            padding='max_length', 
            max_length=self.max_length, 
            return_tensors="pt"
        )
        
        inputs = {key: val.squeeze(0) for key, val in inputs.items()}
        
        return {
            **inputs, 
            'labels': torch.tensor(label, dtype=torch.long),  # Convert label to tensor
            'flags': torch.tensor(flag, dtype=torch.bool)  # Convert flag to tensor
        }


In [None]:
train_csv="E:\Projects\GenAI\AI-Powered Automated PO Tool\AI-Powered-Purchase-Order-Parser\Train Data.csv"
test_csv="E:\Projects\GenAI\AI-Powered Automated PO Tool\AI-Powered-Purchase-Order-Parser\Test Data.csv"

In [None]:
train_texts, train_labels, train_flags = load_data_from_csv(train_csv)
test_texts, test_labels, test_flags = load_data_from_csv(test_csv)

In [None]:
label_mapping = {"PO": 1, "Non-PO": 0}  
train_labels = [label_mapping[label] for label in train_labels]
test_labels = [label_mapping[label] for label in test_labels]



In [None]:
train_dataset = EmailDataset(train_texts, train_labels, train_flags, tokenizer)
test_dataset = EmailDataset(test_texts, test_labels, test_flags, tokenizer)

In [None]:
from transformers import Trainer, TrainingArguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    evaluation_strategy="epoch",
    logging_dir='./logs',
    save_total_limit=3,
)

In [None]:
#trainer = Trainer(
#    model=model,
#    args=training_args,
#   train_dataset=train_dataset,
#    eval_dataset=test_dataset
#)

#trainer.train()

#model.save_pretrained('./po_email_model')

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
model_path = "./po_email_model" 
usable_model = AutoModelForSequenceClassification.from_pretrained(model_path)

In [None]:
# Function to check classifier


# def classify_text(text):
#     # Tokenize the input text
#     encoding = tokenizer.encode_plus(
#         text,
#         add_special_tokens=True,
#         max_length=128,
#         padding='max_length',
#         truncation=True,
#         return_tensors='pt'
#     )

#     # Get input ids and attention mask
#     input_ids = encoding['input_ids']
#     attention_mask = encoding['attention_mask']

#     # Predict using the model
#     with torch.no_grad():
#         outputs = usable_model(input_ids=input_ids, attention_mask=attention_mask)
#         logits = outputs.logits
#         prediction = torch.argmax(logits, dim=1).item()  # Get the predicted class

#     return prediction

# # Get custom input text
# input_text = input("Enter the text to classify: ")

# # Classify the input text
# predicted_class = classify_text(input_text)
# print(f"Predicted Class: {predicted_class}")


Email Classifier

In [None]:
def classify_email(text, usable_model, tokenizer):
    inputs = tokenize_input(text, tokenizer)
    outputs = usable_model(**inputs)
    logits = outputs.logits
    prediction = torch.argmax(logits, dim=1).item()
    return "PO Email" if prediction == 1 else "Non-PO Email"

In [None]:
import pdfplumber
import os

def extract_text_from_pdf(pdf_content):

    temp_file = "temp.pdf"
    try:
        
        with open(temp_file, "wb") as f:
            f.write(pdf_content)
        
        with pdfplumber.open(temp_file) as pdf:
            text = ""
            for page in pdf.pages:
                text += page.extract_text() or ""  
        

        lines = text.split('\n')
        
        
        if _is_table_like(lines):
            return _format_as_table(lines)
        else:
            return '\n'.join(lines)
    
    except Exception as e:
        print(f"Error processing PDF: {e}")
        return None
    
    finally:
        
        if os.path.exists(temp_file):
            os.remove(temp_file)


def _is_table_like(lines):
    
    return all('\t' in line for line in lines)

def _format_as_table(lines):

    return [line.split('\t') for line in lines]


In [None]:
import easyocr
from PIL import Image
import io
import numpy as np

def extract_text_from_image(image_content):
    
    image = Image.open(io.BytesIO(image_content))

    
    reader = easyocr.Reader(['en'])  # Supports multiple languages, 'en' for English
    
    
    result = reader.readtext(image)
    
    
    result.sort(key=lambda x: (x[0][0][1], x[0][0][0]))
    
    
    lines = []
    current_line = []
    current_y = result[0][0][0][1]
    
    for item in result:
        text = item[1]
        top_left = item[0][0]
        
        
        if abs(top_left[1] - current_y) < 20:
            current_line.append((top_left[0], text))
        else:
            
            current_line.sort(key=lambda x: x[0])
            lines.append(' '.join(item[1] for item in current_line))
            current_line = [(top_left[0], text)]
            current_y = top_left[1]
    
   
    if current_line:
        current_line.sort(key=lambda x: x[0])
        lines.append(' '.join(item[1] for item in current_line))
    
   
    if _is_table_like(lines):
        return _format_as_table(lines)
    else:
        return '\n'.join(lines)

def _is_table_like(lines):
   
    delimiter_count = sum(1 for line in lines if '|' in line or '\t' in line)
    return delimiter_count > len(lines) * 0.5

def _format_as_table(lines):
    
    table_lines = []
    for line in lines:
        
        parts = [p.strip() for p in line.replace('|', '\t').split('\t') if p.strip()]
        table_lines.append('\t'.join(parts))
    
    return '\n'.join(table_lines)


In [None]:
def email_classification_pipeline(username, password, imap_server, usable_model, tokenizer):
    client = connect_to_email(username, password, imap_server)
    
    for message in fetch_emails(client):
        
        subject, body, email_attachments, attachment_flag = process_email(message)
        
        combined_text = prepare_input(subject, body, attachment_flag)
        
        result = classify_email(combined_text, usable_model, tokenizer)
        
        print(f"Email classified as: {result}")
        
        if result == 'PO Email':  
            for filename, attachment in email_attachments:

                if filename.lower().endswith(".pdf"):
                    print(f"Processing PDF attachment: {filename}")
                    order_details = extract_text_from_pdf(attachment)
                    print(f"Extracted order details: {order_details}")
                
                elif filename.lower().endswith((".jpg", ".jpeg", ".png")):
                    print(f"Processing image attachment: {filename}")
                    order_details = extract_text_from_image(attachment)
                    print(f"Extracted order details: {order_details}")
        else:
            print(f"Skipping attachment processing for non-PO email: {subject}")


In [None]:
email_classification_pipeline("you_email_id@gmail.com", "app_password", "imap.gmail.com", usable_model, tokenizer)