# Install missing packages

In [None]:
!pip install openai 
!pip install llama-index 
!pip install qdrant-client

# Create collection

Create a local Qdrant collection to save email embeddings. Use llama-index for splitting the documents.

In [2]:
import os
import uuid
import numpy as np
from openai import AzureOpenAI
from tqdm import tqdm
from qdrant_client import QdrantClient, models
from llama_index.core.schema import Document
from llama_index.core.node_parser import SentenceSplitter

SQL_QUERY = "SELECT * FROM Main_LH.Enron_emails_full_list_v1 LIMIT 5000"
COLLECTION_NAME = "enron_5000"
COLLECTION_PATH = "/lakehouse/default/Files/enron_5000"
EMBEDDING_MODEL = "text-embedding-ada-002"
EMBEDDING_SIZE = 1536

azure_client = AzureOpenAI(
  azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
  api_version="2023-09-01-preview"
)

class DocumentChunk:
    def __init__(self, text, doc_id, metadata):
        self.text = text
        self.doc_id = doc_id
        self.metadata = metadata

def load_documents_from_sql():
    psdf = spark.sql(SQL_QUERY)
    df = psdf.toPandas()
    documents = []
    metadata = []

    for _, row in df.iterrows():
        email_text = f"Subject: {row['Subject']}\nFrom: {row['From']}\nTo: {row['To']}\nDate: {row['Date']}\n\n{row['content']}"
        doc_metadata = {
            "message_id": row["Message-ID"],
            "date": row["Date"],
            "sender": row["From"],
            "recipient": row["To"],
            "subject": row["Subject"]
        }
        documents.append(email_text)
        metadata.append(doc_metadata)

    return documents, metadata


def create_document_chunks(documents, metadata):
    doc_chunks = []
    sentence_splitter = SentenceSplitter(chunk_size=2048)

    for doc_id, (text, meta) in enumerate(zip(documents, metadata)):
        doc = Document(text=text, metadata=meta)
        nodes = sentence_splitter.get_nodes_from_documents([doc])
        
        for node in nodes:
            chunk_text = node.get_content()
            doc_chunks.append(DocumentChunk(chunk_text, doc_id, meta))

    return doc_chunks

def generate_embeddings(doc_chunks, batch_size=64, model='text-embedding-ada-002'):
    chunk_texts = [chunk.text for chunk in doc_chunks]
    embeddings = []

    for i in tqdm(range(0, len(chunk_texts), batch_size), desc="Generating embeddings"):
        batch_texts = chunk_texts[i:i + batch_size]
        print(f"Processing batch {i // batch_size + 1}: texts {i} to {i + len(batch_texts) - 1}")

        try:
            response = azure_client.embeddings.create(input=batch_texts, model=model)
            if hasattr(response, 'data'):
                batch_embeddings = [data_item.embedding for data_item in response.data]
                embeddings.extend(batch_embeddings)
            else:
                print(f"Unexpected response structure: {response}")
        except Exception as e:
            print(f"Failed to generate embeddings for batch {i // batch_size + 1}: {e}")

    print(f"Total texts processed for embeddings: {len(embeddings)}")
    print(f"Total embeddings generated: {len(embeddings)}")

    return np.array(embeddings)


def upload_embeddings_in_batches(doc_chunks, embeddings, batch_size=100):
    qdrant_client = QdrantClient(path=COLLECTION_PATH)
    # Recreate collection
    qdrant_client.recreate_collection(
        collection_name=COLLECTION_NAME,
        vectors_config=models.VectorParams(size=EMBEDDING_SIZE, distance=models.Distance.COSINE),
        optimizers_config=models.OptimizersConfigDiff(indexing_threshold=0)
        )

    for i in tqdm(range(0, len(embeddings), batch_size), desc="Uploading embeddings"):
        batch_embeddings = embeddings[i:i + batch_size]
        batch_chunks = doc_chunks[i:i + batch_size]

        records = [
            models.Record(
                id=str(uuid.uuid4()),
                vector=embedding.tolist(),
                payload={"text": chunk.text, "metadata": chunk.metadata}
            ) for chunk, embedding in zip(batch_chunks, batch_embeddings)
        ]

        qdrant_client.upsert(
            collection_name=COLLECTION_NAME,
            points=records
        )

    qdrant_client.update_collection(
        collection_name=COLLECTION_NAME,
        optimizers_config=models.OptimizersConfigDiff(indexing_threshold=20000)
    )

StatementMeta(, 6de0f622-d4cc-40d5-b792-4d90f703b1fd, 5, Finished, Available)

In [None]:
documents, metadata = load_documents_from_sql()

doc_chunks = create_document_chunks(documents, metadata)

embeddings = generate_embeddings(doc_chunks)

upload_embeddings_in_batches(doc_chunks, embeddings)

# RAG

In [1]:
import numpy as np
from openai import AzureOpenAI
from qdrant_client import QdrantClient

COLLECTION_NAME = "enron_5000"
COLLECTION_PATH = "/lakehouse/default/Files/enron_5000"

azure_client = AzureOpenAI(
  azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
  api_version="2023-09-01-preview"
)
    
qdrant_client = QdrantClient(path=COLLECTION_PATH)

StatementMeta(, a4b80e4e-0b7d-4b54-8637-467dad348a63, 5, Finished, Available)

### Define functions for document retrieval and generating the answer

In [2]:
class Document:
    def __init__(self, text, metadata):
        self.text = text
        self.metadata = metadata

def retrieve_documents(collection_name, query_emb, top_n=100, batch_size=20):
    all_retrieved_docs = []
    for offset in range(0, top_n, batch_size):
        hits = qdrant_client.search(
            collection_name=collection_name,
            query_vector=np.array(query_emb).tolist(),
            limit=batch_size,
            offset=offset
        )

        batch_docs = [
            {"text": hit.payload["text"], 
             "metadata": hit.payload["metadata"],
             "id": hit.id}
            for hit in hits
        ]
        all_retrieved_docs.extend(batch_docs)
    return all_retrieved_docs

def generate_answer(prompt):
    response = azure_client.chat.completions.create(
        model="gpt-35-turbo-16k",
        messages=[
            {"role": "system", "content": "You are a fraud detection expert."},
            {"role": "user", "content": prompt}
        ]
    )
    
    response_text = response.choices[0].message.content.strip()
    return response_text

def answer_question(question, collection_name, promt_template=PROMPT_TEMPLATE, top_n=60, batch_size=15):
    embedded_query = azure_client.embeddings.create(
        input=[question],
        model="text-embedding-ada-002"
    ).data[0].embedding

    top_chunks = retrieve_documents(collection_name, embedded_query, top_n, batch_size)

    context = ' '.join([chunk["text"] for chunk in top_chunks])

    prompt = promt_template.format(question=question, context=context)
    answer = generate_answer(prompt)

    return answer

StatementMeta(, a4b80e4e-0b7d-4b54-8637-467dad348a63, 6, Finished, Available)

### Create a list of queries for semantic search

In [None]:
queries = [
    "Identify emails with potential fraud indicators such as unusual patterns, confidentiality breaches, or financial irregularities.",
    "Highlight emails that mention unusual financial transactions, large sums, or unexpected financial activities.",
    "Search for emails indicating potential confidentiality breaches, including unauthorized sharing of sensitive information.",
    "Analyze sudden spikes or drops in email volume from specific senders that may indicate fraudulent activities.",
    "Identify outlier subjects or phrases in emails that deviate from the dataset's common patterns. Highlight the most rare or unexpected subjects.",
    "Analyze any significant changes in email tone, or vocabulary from specific senders, compared to their usual communication patterns within the dataset.",
    "Detect emails sent to uncommon recipient combinations that stand out as anomalies in the dataset's overall communication network.",
    "Highlight emails containing links that appear infrequently or are unusual, potentially indicating phishing attempts or malicious content."
]

### Create a prompt for detailed email analysis

First, perform semantic search using the list of queries. Then, analyze the retrieved emails by using LLM.

In [3]:
PROMPT_TEMPLATE = """
Analyze the following email for potential fraud indicators:
Email ID: {email_id}
Subject: {subject}
Content: {context}

Please assess the email for any signs of the following:
1. Unusual patterns, confidentiality breaches, or financial irregularities.
2. Mention of unusual financial transactions, large sums, or unexpected financial activities.
3. Indications of potential confidentiality breaches, including unauthorized sharing of sensitive information.
4. Sudden spikes or drops in email volume from specific senders that may suggest fraudulent activities.
5. Outlier subjects or phrases that deviate significantly from common patterns observed in the dataset. Highlight any rare or unexpected topics.
6. Significant changes in the tone or vocabulary used by specific senders, compared to their typical communication patterns within the dataset.
7. Emails sent to uncommon combinations of recipients that could indicate anomalies in the communication network of the dataset.
8. Presence of links that are infrequent or unusual within the dataset, which may hint at phishing attempts or malicious content.

If the email has any signs of the above, provide a brief explanation and mark this email as suspicious.
"""

def answer_questions_with_ids(queries, collection_name, total_docs=60, batch_size=15):
    suspicious_emails = {}
    for query in queries:
        embedded_query = azure_client.embeddings.create(
            input=[query],  # Using the original query directly
            model="text-embedding-ada-002"
        ).data[0].embedding

        top_chunks = retrieve_documents(collection_name, embedded_query, top_n=total_docs, batch_size=batch_size)

        for chunk in top_chunks:
            context = chunk["text"]
            metadata = chunk["metadata"]
            email_id = metadata.get("message_id", "Unknown ID")
            subject = metadata.get("subject")
            sender = metadata.get("sender")
            recipient = metadata.get("recipient")

            prompt = PROMPT_TEMPLATE.format(email_id=email_id, subject=subject, context=context)
            answer = generate_answer(prompt)
            
            if "suspicious" in answer.lower():
                if query not in suspicious_emails:
                    suspicious_emails[query] = []
                suspicious_emails[query].append((email_id, subject, sender, recipient, answer))

    return suspicious_emails


StatementMeta(, a4b80e4e-0b7d-4b54-8637-467dad348a63, 7, Finished, Available)

In [None]:
suspicious_emails = answer_questions_with_ids(queries, COLLECTION_NAME, total_docs=100, batch_size=20)
print(suspicious_emails)

Optionally, save the retrieved and analyzed emails as a json file.

In [17]:
import json

def save_dict_to_json(data_dict, filename):
    with open(filename, "w") as json_file:
        json.dump(data_dict, json_file, indent=4)

save_dict_to_json(suspicious_emails, filename="/lakehouse/default/Files/suspicious_emails_5000.json")

StatementMeta(, 6de0f622-d4cc-40d5-b792-4d90f703b1fd, 20, Finished, Available)

In [None]:
for question, emails in suspicious_emails.items():
    print(f"Question: {question}")
    for email_info in emails:
        email_id, subject, sender, recipient, reason = email_info
        print(f"Email ID: {email_id}, Sender: {sender}, Recipient: {recipient}, Reason: {reason}")
    print("\n")

### Instruct LLM to categorize emails into themes

In [30]:
import pandas as pd

PROMPT_TEMPLATE = """
Analyze the following email and categorize its content based on the themes or topics it covers:
Email ID: {email_id}
Subject: {subject}
Sender: {sender}
Recipient: {recipient}
Content: {content}

Please assess the email and categorize it into relevant themes such as 'Project Updates', 
'Meeting Schedules', 'Financial Reports', 'Client Communications', 'Internal Announcements', etc. 
You can create other categories when suitable.
For each category identified, provide a brief explanation. Format your response as a list 
of entries, where each entry contains the category and its explanation. 
Use the format "Email ID: [ID], Subject: [Subject], Category: [Category], Reason: [Explanation]" for each entry.
"""

def analyze_suspicious_emails_and_extract_categories(suspicious_emails):
    all_structured_data = []

    for _, emails_info in suspicious_emails.items():
        for email_info in emails_info:
            email_id, subject, sender, recipient, content = email_info
            prompt = PROMPT_TEMPLATE.format(
                email_id=email_id,
                subject=subject,
                sender=sender, 
                recipient=recipient, 
                content=content)
            
            response = generate_answer(prompt)

            lines = response.split('\n')
            for line in lines:
                if "Email ID:" in line and "Subject:" in line and "Category:" in line and "Reason:" in line:
                    parts = line.split(',')
                    email_id_part = parts[0]
                    subject_part = parts[1] if len(parts) > 1 else ""
                    category_part = parts[2] if len(parts) > 2 else ""
                    reason_part = parts[3] if len(parts) > 3 else ""

                    email_id = email_id_part.replace("Email ID: ", "").strip()
                    subject = subject_part.replace("Subject: ", "").strip()
                    category = category_part.replace("Category: ", "").strip()
                    reason = reason_part.replace("Reason: ", "").strip()

                    all_structured_data.append({"email_id": email_id, "subject": subject, "category": category, "reason": reason})

    df_suspicious = pd.DataFrame(all_structured_data)

    return df_suspicious

StatementMeta(, a4b80e4e-0b7d-4b54-8637-467dad348a63, 34, Finished, Available)

In [22]:
df_suspicious_categories = analyze_suspicious_emails_and_extract_categories(suspicious_emails)

StatementMeta(, 6de0f622-d4cc-40d5-b792-4d90f703b1fd, 25, Finished, Available)

In [23]:
df_suspicious_categories.head()

StatementMeta(, 6de0f622-d4cc-40d5-b792-4d90f703b1fd, 26, Finished, Available)

Unnamed: 0,email_id,subject,category,reason
0,- <31927023.1075853082806.JavaMail.evans@thyme>,Confidential - Mid Year Overview,Internal Announcements,The email addresses a confidential mid-year ov...
1,<25447472.1075856582182.JavaMail.evans@thyme>,Security question,Security Concerns,The email discusses a security question relate...
2,<2654330.1075846153519.JavaMail.evans@thyme>,Curve Validation,Financial Operations,The email content is related to the validation...
3,<2654330.1075846153519.JavaMail.evans@thyme>,Curve Validation,Internal Communication,This email serves as an internal communication...
4,<2654330.1075846153519.JavaMail.evans@thyme>,Curve Validation,Risk Management,The validation of curve processes is crucial f...


In [24]:
df_suspicious_categories.shape

StatementMeta(, 6de0f622-d4cc-40d5-b792-4d90f703b1fd, 27, Finished, Available)

(291, 4)

Optionally, save the table to the lakehouse.

In [25]:
spark_session_results = spark.createDataFrame(df_suspicious_categories)
spark_session_results.write.mode("overwrite").format("delta").save("Tables/suspicious_emails_analysis_categories")

StatementMeta(, 6de0f622-d4cc-40d5-b792-4d90f703b1fd, 28, Finished, Available)

### Instruct the model to asses the risk level of each email as high, medium, low or N/A

In [None]:
def assess_risk_and_update_df(df):
    RISK_ASSESSMENT_PROMPT = """
    Given the following email details:
    Email ID: {email_id}
    Subject: {subject}
    Category: {category}
    Summary: {reason}

    Please assess the potential risk level of this email as 'Low', 'Medium', or 'High', 
    considering factors such as fraud, data leaks, information security issues, financial risks, 
    security breaches, confidentiality breaches, phishing, etc. Write only "Low", "Medium", "High" or "N/A"
    """

    risk_assessments = []

    for _, row in df.iterrows():
        prompt = RISK_ASSESSMENT_PROMPT.format(
            email_id=row['email_id'],
            subject=row['subject'],
            category=row['category'],
            reason=row['reason']
        )

        response = generate_answer(prompt)
        risk_assessment = response.strip()
        risk_assessments.append(risk_assessment)

    df['risk_assessment'] = risk_assessments

    return df


In [56]:
df_updated = assess_risk_and_update_df(df_suspicious_categories)
df_updated.head()

StatementMeta(, a4b80e4e-0b7d-4b54-8637-467dad348a63, 60, Finished, Available)

Unnamed: 0,email_id,subject,category,reason,risk_assessment
0,- <31927023.1075853082806.JavaMail.evans@thyme>,Confidential - Mid Year Overview,Internal Announcements,The email addresses a confidential mid-year ov...,"Based on the given email details, the potentia..."
1,<31927023.1075853082806.JavaMail.evans@thyme>,Confidential - Mid Year Overview,Internal Announcements,The email is marked as confidential and provid...,"Based on the provided email details, the poten..."
2,<31927023.1075853082806.JavaMail.evans@thyme>,Confidential - Mid Year Overview,Financial Reports,"The subject ""Mid Year Overview"" implies the em...",Medium
3,<31927023.1075853082806.JavaMail.evans@thyme>,Confidential - Mid Year Overview,Communication Updates,The email likely includes updates or summaries...,Medium
4,<31927023.1075853082806.JavaMail.evans@thyme>,Confidential - Mid Year Overview,Strategic Planning,The email provides a mid-year overview,"Based on the provided information, it is not p..."


In [None]:
spark_session_results = spark.createDataFrame(df_updated)
spark_session_results.write.mode("overwrite").format("delta").save("Tables/suspicious_emails_analysis_risk")