<a href="https://colab.research.google.com/github/EashwarPrabu/gaied-lpmcai-main/blob/main/GenAI_LPMC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install streamlit transformers accelerate bitsandbytes xformers pyYAML

Collecting streamlit
  Downloading streamlit-1.44.0-py3-none-any.whl.metadata (8.9 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.45.4-py3-none-manylinux_2_24_x86_64.whl.metadata (5.0 kB)
Collecting xformers
  Downloading xformers-0.0.29.post3-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (1.0 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.0.0->accelerate)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.

In [2]:
import torch
print("CUDA Available:", torch.cuda.is_available())
print("Device:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

CUDA Available: True
Device: Tesla T4


In [None]:
!wget -q -O - ipv4.icanhazip.com

In [4]:
%%writefile app.py
import streamlit as st
import email
import os
import logging
from email import policy
from email.parser import BytesParser
from email.message import Message
import base64
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch
import json
import yaml
import ast

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@st.cache_resource
def load_model():
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,  # Use 4-bit quantization
        bnb_4bit_compute_dtype=torch.float16,  # Use float16 for computation
        bnb_4bit_quant_type="nf4",  # More efficient quantization type
        bnb_4bit_use_double_quant=True  # Extra compression
    )

    model_name = "mistralai/Mistral-7B-Instruct-v0.3"
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        quantization_config=bnb_config,
        device_map="auto"
    )
    logger.info(f"Loading model from cache...")
    return model, tokenizer

model, tokenizer = load_model()

def config_parser():
  logger.info(f"Parsing configuration from yml file...")
  with open("DetectionTypes.yml", "r") as f:
    config = yaml.safe_load(f)
  return config


config = config_parser()
request_types = [ request for request in config.keys()]

# Define Prompt Template with a Placeholder
classification_prompt = """
You are a expert helping users to categorize the text based on predefined classes. Only categorize the content based on the available list of classes.
Given an input text, classify it into one of the **main request types**: {request_types} and, if applicable, further classify it into a **sub-request** type.
Your response **must only** be a JSON dictionary with: {outputFormat}

### **Classification Rules**:
- **Request Type:** Match the text to the most relevant **Request type** based on its semantics, meaning and the sample list of keywords.
- **Sub-Request Type:** If the text aligns with a sub-category, classify it accordingly.
- The following are some of the example Request-Types, their sample reference keywords and Sub-Request Types.
{config}

Text: "{input_text}"

Output:
"""

ner_prompt = """
You are a financial expert who is responsible identifying all the important key value pairs associated with Finance/Banking, etc.
Extract financial details from the given content and return them **strictly as a dictionary of key-value pairs**.
Text: "{input_text}"

Output:
"""

classificationOutputFormat = {
    "output": {
    "category": "string",
    "subcategory": ["string"],
    "probability": "float",
    "reasoning": "string"
  }
}


def invoke(prompt):
  # Ensure pad_token is properly set
  if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token  # Assign pad_token explicitly

  # Tokenize input with attention mask
  inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True)
  input_ids = inputs["input_ids"].to(model.device)
  attention_mask = inputs["attention_mask"].to(model.device)

  # Generate output with explicit attention mask
  output = model.generate(
    input_ids,
    attention_mask=attention_mask,  # Pass attention_mask explicitly
    max_length=4096,
    pad_token_id=tokenizer.pad_token_id  # Use correct pad_token_id
  )

  # Decode and Extract JSON Output
  response = tokenizer.decode(output[0], skip_special_tokens=True)

  # Extract only the JSON output
  if "Output:" in response:
      response = response.split("Output:")[-1].strip()

  return response


def classify(input_text):
    formatted_prompt = classification_prompt.format(
        input_text=input_text,
        outputFormat=classificationOutputFormat,
        request_types=request_types,
        config=config)
    classifiedOutput = invoke(formatted_prompt)
    logger.info(f"Classified results: {classifiedOutput}")
    return ast.literal_eval(classifiedOutput)

def extract(input_text):
    formatted_prompt = ner_prompt.format(input_text=input_text)
    extractedOutput = invoke(formatted_prompt)
    logger.info(f"Extracted results: {extractedOutput}")
    return json.loads(extractedOutput)

def classifyAndExtract(input_text):
    # Classification
    classifiedResult = classify(input_text)
    extractedResult = extract(input_text)
    logger.info(f"Final classification results: {classifiedResult}")
    logger.info(f"Final extraction results: {extractedResult}")

    result = {
        "classification": classifiedResult,
        "extraction": extractedResult
    }
    print(result)
    return result

def parse_eml(file):
    try:
        msg: Message = BytesParser(policy=policy.default).parse(file)
        logger.info("Email content classifier")

        subject = msg["subject"] or "No Subject"
        sender = msg["from"]
        recipient = msg["to"]
        date = msg["date"]
        body = ""
        attachments = []

        # Extract body and attachments
        for part in msg.walk():
            if part.get_content_type() == "text/plain":
                body += part.get_payload(decode=True).decode(errors="ignore")
            elif not part.get_content_type().startswith("multipart"):
                attachment_name = part.get_filename()
                attachment_data = part.get_payload(decode=True)
                if not attachment_name or not attachment_data:
                  continue  # Skip if no valid attachment

                attachment_content = None
                attachment_type = "unknown"

                try:
                    # Try decoding as a text file
                    attachment_content = attachment_data.decode(errors="ignore")
                    attachment_type = "text"
                except UnicodeDecodeError:
                    # If not a text file, encode in Base64
                    attachment_content = base64.b64encode(attachment_data).decode()
                    attachment_type = "binary"

                # Log after ensuring `attachment_content` is assigned
                logger.info(f"Attachment found: {attachment_name} (Type: {attachment_type}, Size: {len(attachment_data)} bytes)")

                attachments.append({
                    "name": attachment_name,
                    "type": attachment_type,
                    "content": attachment_content,  # String or Base64
                    "size": len(attachment_data),
                    "raw_data": attachment_data  # For file downloads
                })

        email_data = {
            "subject": subject,
            "from": sender,
            "to": recipient,
            "date": date,
            "body": body,
            "attachments": [name for name in attachments]  # Display names only
        }
        logger.info(f"Email components extracted: {subject}")

        return email_data, attachments
    except Exception as e:
        logger.error(f"Error parsing email: {e}")
        return {"error": str(e)}, []

# Streamlit UI
st.title("Email content classifier")

uploaded_file = st.file_uploader("Upload an .eml file", type=["eml"])

if uploaded_file:
    with uploaded_file:
        email_data, attachments = parse_eml(uploaded_file)
        with st.spinner("Processing email... Please wait ⏳"):
            result = classifyAndExtract(email_data["body"])
            output = {
                "emailContents": email_data,
                "result": result
            }
            st.json(output)

Writing app.py


In [5]:
!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) n
Token is valid (permission: fineGrained).
The token `Hackathon-25` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `Hacka

In [7]:
!streamlit run app.py & npx localtunnel --port 8501


Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.145.121.132:8501[0m
[0m
[1G[0K⠦[1G[0K[1G[0JNeed to install the following packages:
localtunnel@2.0.2
Ok to proceed? (y) [20Gy

[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0Kyour url is: https://wide-queens-attack.loca.lt
tokenizer_config.json: 100% 141k/141k [00:00<00:00, 2.28MB/s]
tokenizer.model: 100% 587k/587k [00:00<00:00, 19.1MB/s]
tokenizer.json: 100% 1.96M/1.96M [00:00<00:00, 10.5MB/s]
special_tokens_map.json: 100% 414/414 [00:00<00:00, 2.97MB/s]
config.json: 100% 601/601 [00:00<00:00, 4.68MB/s]
2025-03-2