In [None]:
import os
import requests
import pandas as pd
from bs4 import BeautifulSoup

In [None]:
# Configuration
tenant_id = os.getenv("TENANT_ID")
client_id = os.getenv("CLIENT_ID")
secret = os.getenv("SECRET")
shared_mailbox = os.getenv("SHARED_MAILBOX")
mail_domain = os.getenv("MAIL_DOMAIN")

# A list of mailboxes to be used for sending answers
response_mailboxes = os.getenv("RESPONSE_MAILBOXES").split(",")
project_keyword = ""
start_date = "2022-04-29"
end_date = "2024-11-24"
filtered_emails_path = f"/home/jovyan/work/notebook/evaluate_prototype/{project_keyword.replace(' ', '_')}_filtered_emails.csv"
cleaned_emails_path = f"/home/jovyan/work/notebook/evaluate_prototype/{project_keyword.replace(' ', '_')}_cleaned_emails.csv"
signature_separator = "ENGEL&VÖLKERS DIGITAL INVEST"


Get access token

In [None]:
# Authenticate with Microsoft Graph API
body = {
    "client_id": client_id,
    "scope": "https://graph.microsoft.com/.default",
    "client_secret": secret,
    "grant_type": "client_credentials"
}

response = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", data=body)
access_token = response.json().get("access_token")

Get filtered messages

In [None]:
# Query emails from the shared mailbox
headers = {
    "Authorization": f"Bearer {access_token}"
}
query = f"receivedDateTime ge {start_date} and receivedDateTime le {end_date} and (contains(subject, '{project_keyword}') or contains(body/content, '{project_keyword}')) and not(contains(from/emailAddress/address, '{mail_domain}'))"

messages_endpoint = f"https://graph.microsoft.com/v1.0/users/{shared_mailbox}/messages?$filter={query}"
email_messages = []

while True:
    response = requests.get(messages_endpoint, headers=headers).json()
    if "value" in response and response["value"]:
        for item in response["value"]:
            email_messages.append(item)

    if "@odata.nextLink" in response and response["@odata.nextLink"]:
        messages_endpoint = response["@odata.nextLink"]
    else:
        break

In [None]:
len(email_messages)

In [None]:
from typing import Any, List


def get_answer(message, answers: List[Any]) -> str:
    """ Returns an answer to the customer's question. """

    if len(answers) == 0:
        return ""

    result = ""
    ordered_answers = sorted(answers, key=lambda x: x["receivedDateTime"])
  
    for answer in ordered_answers:
        if answer["receivedDateTime"] < message["receivedDateTime"]:
            continue

        if "body" in answer and "content" in answer["body"]:
            answer_html = answer['body']['content']
            answer_soup = BeautifulSoup(answer_html, 'html.parser')
            replaced_text = answer_soup.get_text().replace("\n", " ").replace("\r", "").replace('"', "'").strip()
            result = replaced_text.split(signature_separator)[0]
            break
        
    return result
       

In [None]:
from typing import Tuple


def get_message_data(response_mailboxes, headers, message) -> Tuple[str, str, str]:

    body_html = message['body']['content']
    body_soup = BeautifulSoup(body_html, 'html.parser')
    body = body_soup.get_text().replace("\n", " ").replace("\r", "").replace('"', "'").strip()
    question_without_history = body.split(signature_separator)[0]

    result_response = None

    # Retrieve the matching responses
    for response_mailbox in response_mailboxes:
        answers = []

        response_query = f"conversationId eq '{message["conversationId"]}'"
        response_endpoint = f"https://graph.microsoft.com/v1.0/users/{response_mailbox}/messages?$filter={response_query}"
        result_response = requests.get(response_endpoint, headers=headers).json()

        if result_response is not None and 'value' in result_response and result_response['value']:
            for answer in result_response['value']:
                answers.append(answer)

    answer = get_answer(message, answers)

    return body, question_without_history, answer

In [None]:
from typing import Any, Dict, List, Sequence

from tqdm import tqdm


def get_email_data(messages: List[Any], response_mailboxes: Sequence[str], headers: Dict[str, str]) -> List[Dict[str, str]]:

    email_data = []

    for message in tqdm(messages):
        received_date = message["receivedDateTime"]
        email_from = message["from"]["emailAddress"]["address"]
        conversationId: str = message["conversationId"]

        try:
            # question details
            question, question_without_history, answer = get_message_data(response_mailboxes, headers, message)

            email_data.append({
            "received_date": received_date,
            "email_from": email_from,
            "subject": message['subject'],
            "conversation_id": conversationId,
            "question": question,
            "question_without_history": question_without_history,
            "answer": answer
        })

        except Exception as ex:
            print(ex)

    return email_data

In [None]:
email_data = get_email_data(email_messages, response_mailboxes, headers)


In [None]:
import csv

# Define the header
header = ["received_date", "conversation_id", "email_from", "subject", "question", "question_without_history", "answer"]

# Write the data to the CSV file
with open(filtered_emails_path, mode='w', newline='', encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=header, quoting=csv.QUOTE_ALL, delimiter=";")
    writer.writeheader()
    writer.writerows(email_data)

# Clean up text

In [None]:
import pandas as pd

# Import from CSV
df = pd.read_csv(filtered_emails_path, sep=";", encoding="utf-8")

In [None]:
text = ""

In [None]:
import re

def remove_warning(email_text):
    # Comprehensive list of patterns
    patterns = [
        r'You don\'t often get email from .*?Learn why this is important at https://aka\.ms/LearnAboutSenderIdentification\s*',
        r'You don\'t often get email from .*?Learn why this is important\s*',
        r'Sie erhalten nicht oft E-Mails von .*?Erfahre mehr unter https://aka\.ms/LearnAboutSenderIdentification\s*'
        r'Sie erhalten nicht oft E-Mails von .*?Erfahre mehr unter\s*'
    ]
    
    # Remove each pattern
    for pattern in patterns:
        email_text = re.sub(pattern, '', email_text, flags=re.IGNORECASE | re.DOTALL)
    
    return email_text.strip()

In [None]:
print(remove_warning(""))

In [None]:
text_without_warning = remove_warning(text)
print(text_without_warning)

In [None]:
import re

def remove_device_signatures(text):
    # Patterns for different languages and variations
    device_patterns = [
        # English patterns
        r'[Ss]ent from my (?:iPhone|iPad|Android|Mobile|Phone|Tablet)',
        r'[Ss]ent from a mobile device',
        r'[Ss]ent from (?:my )?mobile',
        
        # German patterns
        r'[Vv]on meinem (?:iPhone|iPad|Android|Handy|Mobilgerät) gesendet',
        r'[Vv]on meinem Mobil(?:telefon)? gesendet',
        r'[Gg]esendet von meinem (?:iPhone|iPad|Android|Handy) gesendet',
        r'[Vv]on meinem Smartphone gesendet',
        
        # Additional variations
        r'[Ss]ent using (?:mobile )?(?:app|device)',
        r'[Gg]esendet mit (?:mobile )?(?:App|Gerät)'
    ]
    
    # Combine patterns into a single regex
    combined_pattern = '|'.join(device_patterns)
    
    # Remove the patterns, including potential newline and whitespace
    cleaned_text = re.sub(combined_pattern, '', text, flags=re.IGNORECASE).strip()
    
    return cleaned_text

In [None]:
text_without_device_signatures = remove_device_signatures(text_without_warning)
print(text_without_device_signatures)

In [None]:
text2 = ""
print(remove_device_signatures(text2))

In [None]:
import re

def remove_engel_voelkers_signature(text):
    # Pattern to match the entire block from "ENGEL&VÖLKERS DIGITAL INVEST" 
    # to "We cannot accept any liability for virus contamination."
    pattern = r'ENGEL&VÖLKERS DIGITAL INVEST.*?We cannot accept any liability for virus contamination\.'
    
    # Remove the pattern globally (multiple occurrences)
    # Use re.DOTALL to match across multiple lines
    # Use re.IGNORECASE to handle case variations
    cleaned_text = re.sub(pattern, '', text, flags=re.DOTALL | re.IGNORECASE)
    
    # Remove any extra consecutive newlines that might be left
    cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)
    
    return cleaned_text.strip()

In [None]:
ev_signature = remove_engel_voelkers_signature(text_without_device_signatures)
print(ev_signature)