In [1]:
import os
import email
import re
import phonenumbers
import csv

def extract_email_data(eml_file):
    with open(eml_file, 'rb') as f:
        msg = email.message_from_binary_file(f)
        email_data = {
            'Subject': msg['Subject'],
            'From': '',
            'Sender Name': '',
            'Sender Email': '',
            'To': msg['To'],
            'Date': msg['Date'],
            'Body': '',
            'Phone Numbers': []
        }

        # Extract sender information
        if 'From' in msg:
            email_data['From'] = msg['From']
            sender_match = re.match(r'(.+?) <(.+?)>', msg['From'])
            if sender_match:
                email_data['Sender Name'] = sender_match.group(1)
                email_data['Sender Email'] = sender_match.group(2)
            else:
                email_data['Sender Email'] = msg['From']

        # Extract body content
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition"))

                if "attachment" not in content_disposition:
                    payload = part.get_payload(decode=True)
                    if payload is not None:
                        try:
                            body = payload.decode('utf-8')
                        except UnicodeDecodeError:
                            body = str(payload)
                        email_data['Body'] += body
        else:
            payload = msg.get_payload(decode=True)
            if payload is not None:
                try:
                    email_data['Body'] = payload.decode('utf-8')
                except UnicodeDecodeError:
                    email_data['Body'] = str(payload)

        # Extract phone numbers
        phone_numbers = re.findall(r'(\+?\d{1,2}[\s.-]?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4})', email_data['Body'])
        for phone_number_str in phone_numbers:
            try:
                phone_number_obj = phonenumbers.parse(phone_number_str, None)
                if phonenumbers.is_valid_number(phone_number_obj):
                    email_data['Phone Numbers'].append(phone_number_str)
            except phonenumbers.NumberParseException:
                pass

        return email_data

def extract_data_from_eml_files(directory):
    eml_files = [f for f in os.listdir(directory) if f.endswith('.eml')]
    email_data_list = []

    for eml_file in eml_files:
        eml_path = os.path.join(directory, eml_file)
        email_data = extract_email_data(eml_path)
        email_data_list.append(email_data)

    return email_data_list

def save_to_csv(email_data_list, csv_file):
    with open(csv_file, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['Subject', 'From', 'Sender Name', 'Sender Email', 'To', 'Date', 'Body', 'Phone Numbers']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        writer.writeheader()
        for email_data in email_data_list:
            writer.writerow(email_data)

# Example usage:
directory_path = 'C:\\Users\\hrish\\Downloads\\eml'
output_csv_file = 'email_data.csv'

email_data_list = extract_data_from_eml_files(directory_path)
save_to_csv(email_data_list, output_csv_file)


In [2]:
import os
import email
import re
import phonenumbers
import csv
import spacy

# Load the English language model
nlp = spacy.load("en_core_web_sm")

def extract_email_data(eml_file):
    with open(eml_file, 'rb') as f:
        msg = email.message_from_binary_file(f)
        email_data = {
            'Subject': msg['Subject'],
            'From': '',
            'Sender Name': '',
            'Sender Email': '',
            'To': msg['To'],
            'Date': msg['Date'],
            'Body': extract_body_text(msg),
            'Phone Numbers': extract_phone_numbers(msg),
            'Locations': extract_locations(msg)
        }

        # Extract sender information
        if 'From' in msg:
            email_data['From'] = msg['From']
            sender_match = re.match(r'(.+?) <(.+?)>', msg['From'])
            if sender_match:
                email_data['Sender Name'] = sender_match.group(1)
                email_data['Sender Email'] = sender_match.group(2)
            else:
                email_data['Sender Email'] = msg['From']

        return email_data

def extract_body_text(msg):
    body_text = ''
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == 'text/plain':
                payload = part.get_payload(decode=True)
                if payload is not None:
                    body_text += payload.decode('utf-8', errors='ignore')
    else:
        payload = msg.get_payload(decode=True)
        if payload is not None:
            body_text = payload.decode('utf-8', errors='ignore')
    return body_text

def extract_phone_numbers(msg):
    body_text = extract_body_text(msg)
    phone_numbers = re.findall(r'(\+?\d{1,2}[\s.-]?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4})', body_text)
    valid_phone_numbers = []
    for phone_number_str in phone_numbers:
        try:
            phone_number_obj = phonenumbers.parse(phone_number_str, None)
            if phonenumbers.is_valid_number(phone_number_obj):
                valid_phone_numbers.append(phone_number_str)
        except phonenumbers.NumberParseException:
            pass
    return valid_phone_numbers

def extract_locations(msg):
    body_text = extract_body_text(msg)
    doc = nlp(body_text)
    locations = []
    for ent in doc.ents:
        if ent.label_ == 'GPE':  # GPE refers to geopolitical entity (e.g., countries, cities)
            locations.append(ent.text)
    return locations

def extract_data_from_eml_files(directory):
    eml_files = [f for f in os.listdir(directory) if f.endswith('.eml')]
    email_data_list = []

    for eml_file in eml_files:
        eml_path = os.path.join(directory, eml_file)
        email_data = extract_email_data(eml_path)
        email_data_list.append(email_data)

    return email_data_list

def write_to_csv(email_data_list, output_file):
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['Subject', 'From', 'Sender Name', 'Sender Email', 'To', 'Date', 'Body', 'Phone Numbers', 'Locations']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for email_data in email_data_list:
            writer.writerow(email_data)

# Example usage:
directory_path = 'C:\\Users\\hrish\\Downloads\\eml'
output_file_path = 'output.csv'

email_data_list = extract_data_from_eml_files(directory_path)
write_to_csv(email_data_list, output_file_path)
print("Data exported to", output_file_path)


Data exported to output.csv


In [6]:
import os
import email
import re
import phonenumbers
import csv
import spacy

# Load the English language model
nlp = spacy.load("en_core_web_sm")

def extract_email_data(eml_file):
    with open(eml_file, 'rb') as f:
        msg = email.message_from_binary_file(f)
        email_data = {
            'Subject': msg['Subject'],
            'From': '',
            'Sender Name': '',
            'Sender Email': '',
            'To': msg['To'],
            'Date': msg['Date'],
            'Body': extract_body_text(msg),
            'Phone Numbers': extract_phone_numbers(msg),
            'Locations': extract_locations(msg)
        }

        # Extract sender information
        if 'From' in msg:
            email_data['From'] = msg['From']
            sender_match = re.match(r'(.+?) <(.+?)>', msg['From'])
            if sender_match:
                email_data['Sender Name'] = sender_match.group(1)
                email_data['Sender Email'] = sender_match.group(2)
            else:
                email_data['Sender Email'] = msg['From']

        return email_data

def extract_body_text(msg):
    body_text = ''
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == 'text/plain':
                payload = part.get_payload(decode=True)
                if payload is not None:
                    body_text += payload.decode('utf-8', errors='ignore')
    else:
        payload = msg.get_payload(decode=True)
        if payload is not None:
            body_text = payload.decode('utf-8', errors='ignore')
    return body_text

def extract_phone_numbers(msg):
    body_text = extract_body_text(msg)
    phone_numbers = re.findall(r'(\+?\d{1,2}[\s.-]?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4})', body_text)
    valid_phone_numbers = []
    for phone_number_str in phone_numbers:
        try:
            phone_number_obj = phonenumbers.parse(phone_number_str, None)
            if phonenumbers.is_valid_number(phone_number_obj):
                valid_phone_numbers.append(phone_number_str)
        except phonenumbers.NumberParseException:
            pass
    return valid_phone_numbers

def extract_locations(msg):
    body_text = extract_body_text(msg)
    doc = nlp(body_text)
    locations = []
    for ent in doc.ents:
        if ent.label_ == 'GPE':  # GPE refers to geopolitical entity (e.g., countries, cities)
            locations.append(ent.text)
    return locations

def extract_data_from_eml_files(directory):
    eml_files = [f for f in os.listdir(directory) if f.endswith('.eml')]
    email_data_list = []

    for eml_file in eml_files:
        eml_path = os.path.join(directory, eml_file)
        email_data = extract_email_data(eml_path)
        email_data_list.append(email_data)

    return email_data_list

def write_to_csv(email_data_list, output_file):
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['Subject', 'From', 'Sender Name', 'Sender Email', 'To', 'Date', 'Body', 'Phone Numbers', 'Locations']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for email_data in email_data_list:
            writer.writerow(email_data)

# Example usage:
directory_path = 'C:\\Users\\hrish\\Downloads\\novdec18'
output_file_path = 'output6.csv'

email_data_list = extract_data_from_eml_files(directory_path)
write_to_csv(email_data_list, output_file_path)
print("Data exported to", output_file_path)


Data exported to output6.csv
