# Email Pipeline

This notebook defines the pipeline for extracting the different components (header, body, attachments, etc.) of an email (`.eml` file). This notebook contains both exploration code and the code for defining the API. Code cells marked with `#pipeline-api` are included in the API definition.

To demonstrate how off-the-shelf Unstructured Bricks extract meaningful data from complex source documents, we will apply a series of Bricks with explanations before defining the API.

#### Table of Contents

1. [Take a Look at a Raw EML File](#explore)
1. [Custom Partitioning Bricks](#custom)
1. [Cleaning Bricks](#cleaning)
1. [Staging Bricks](#staging)
1. [Define the Pipeline API](#pipeline)

## Section 1: Take a Look at a Raw EML File <a id="explore"></a>

Let's take a look at an email with an attachment. As you will see below there is metadata about the email at the top (sender, recipient, subject, etc.) and if you scroll down, you will will see there are different sections of the email and it's metadata.


In [None]:
import json
import os

def get_filename(directory, filename):
    cwd = os.getcwd()
    local_directory = os.path.join(os.path.split(cwd)[0], directory)
    ci_directory = os.path.join(cwd, directory)

    if os.path.exists(local_directory) and filename in os.listdir(local_directory):
        return os.path.join(local_directory, filename)
    elif os.path.exists(ci_directory) and filename in os.listdir(ci_directory):
        return os.path.join(ci_directory, filename)
    else:
        raise FileNotFoundError

In [None]:
filename = get_filename("sample-docs", "family-day.eml")

In [None]:
import email

# Take a look at file 2135.eml
with open(filename) as f:
    msg = email.message_from_file(f)

In [None]:
# Take a look at the eml file with all the metadata and content
for part in msg.walk():
    print(part)

MIME-Version: 1.0
Date: Wed, 21 Dec 2022 10:28:53 -0600
Message-ID: <CAPgNNXQKR=o6AsOTr74VMrsDNhUJW0Keou9n3vLa2UO_Nv+tZw@mail.gmail.com>
Subject: Family Day
From: Mallori Harrell <mallori@unstructured.io>
To: Mallori Harrell <mallori@unstructured.io>
Content-Type: multipart/alternative; boundary="0000000000005c115405f0590ce4"

--0000000000005c115405f0590ce4
Content-Type: text/plain; charset="UTF-8"

Hi All,

Get excited for our first annual family day!

There will be face painting, a petting zoo, funnel cake and more.

Make sure to RSVP!

Best.

-- 
Mallori Harrell
Unstructured Technologies
Data Scientist

--0000000000005c115405f0590ce4
Content-Type: text/html; charset="UTF-8"
Content-Transfer-Encoding: quoted-printable

<div dir=3D"ltr">Hi All,<div><br></div><div>Get excited for our first annua=
l family day!=C2=A0</div><div><br></div><div>There will be face painting, =
a petting zoo, funnel cake and more.</div><div><br></div><div>Make sure to =
RSVP!</div><div><br></div><div>Best.<br

In [None]:
# Take a closer look at the header section of the eml file
for part in msg.raw_items():
    print(part)

('MIME-Version', '1.0')
('Date', 'Wed, 21 Dec 2022 10:28:53 -0600')
('Message-ID', '<CAPgNNXQKR=o6AsOTr74VMrsDNhUJW0Keou9n3vLa2UO_Nv+tZw@mail.gmail.com>')
('Subject', 'Family Day')
('From', 'Mallori Harrell <mallori@unstructured.io>')
('To', 'Mallori Harrell <mallori@unstructured.io>')
('Content-Type', 'multipart/alternative; boundary="0000000000005c115405f0590ce4"')


## Section 2: Custom Partition Bricks

Let's take a look at the body text of the eml file.

In [None]:
from unstructured.partition.email import partition_email

elements = partition_email(filename=filename)

In [None]:
elements

[<unstructured.documents.html.HTMLText>,
 <unstructured.documents.html.HTMLNarrativeText>,
 <unstructured.documents.html.HTMLNarrativeText>,
 <unstructured.documents.html.HTMLNarrativeText>,
 <unstructured.documents.html.HTMLTitle>,
 <unstructured.documents.html.HTMLTitle>,
 <unstructured.documents.html.HTMLTitle>,
 <unstructured.documents.html.HTMLTitle>]

In [None]:
print(elements[0].text)

Hi All,


In [None]:
for element in elements:
    print(element)

Hi All,
Get excited for our first annual family day! 
There will be face painting, a petting zoo, funnel cake and more.
Make sure to RSVP!
Best.
Mallori Harrell
Unstructured Technologies
Data Scientist


We can use the same code with extra parameters to also extract the header of the eml file

In [None]:
elements_with_header = partition_email(filename=filename, include_headers=True)
elements_with_header

[<unstructured.documents.email_elements.MetaData>,
 <unstructured.documents.email_elements.MetaData>,
 <unstructured.documents.email_elements.MetaData>,
 <unstructured.documents.email_elements.Subject>,
 <unstructured.documents.email_elements.Sender>,
 <unstructured.documents.email_elements.Recipient>,
 <unstructured.documents.email_elements.MetaData>,
 <unstructured.documents.html.HTMLText>,
 <unstructured.documents.html.HTMLNarrativeText>,
 <unstructured.documents.html.HTMLNarrativeText>,
 <unstructured.documents.html.HTMLNarrativeText>,
 <unstructured.documents.html.HTMLTitle>,
 <unstructured.documents.html.HTMLTitle>,
 <unstructured.documents.html.HTMLTitle>,
 <unstructured.documents.html.HTMLTitle>]

## Section 3: Cleaning Bricks <a id="cleaning"></a>

In addition to partitioning bricks, the Unstructured library has
***cleaning*** bricks for removing unwanted content from text. In this
case, we'll solve our whitespace problem by using the 
`clean_extra_whitespace`. Other uses for cleaning bricks include
cleaning out boilerplate, sentence fragments, and other segments
of text that could impact labeling tasks or the accuracy of
machine learning models. As with partitioning bricks, users can
include custom cleaning bricks in a pipeline.

In [None]:
#This element has a lot of new line characters
elements[0].text

'Hi All,'

In [None]:
from unstructured.cleaners.core import clean_extra_whitespace

clean_extra_whitespace(elements[0].text)

'Hi All,'

In [None]:
# Or let's extract all information before a new line character
from unstructured.cleaners.extract import extract_text_before, extract_text_after
from unstructured.partition.text import split_by_paragraph

print(split_by_paragraph(elements[0].text))

[' Hi All,']


## Section 4: Staging Bricks<a id="staging"></a>

In [None]:
elements[2].text

'There will be face painting, a petting zoo, funnel cake and more.'

In [None]:
from unstructured.staging.label_studio import stage_for_label_studio

label_studio_data = stage_for_label_studio(elements)
label_studio_data

[{'data': {'text': 'Hi All,', 'ref_id': 'db1ca22813f01feda8759ff04a844e56'}},
 {'data': {'text': 'Get excited for our first annual family day!\xa0',
   'ref_id': '9ec31559e889d2fd004f1911524143ba'}},
 {'data': {'text': 'There will be face painting, a petting zoo, funnel cake and more.',
   'ref_id': '1ed755f351e19ae96f0dae15b26fc9e3'}},
 {'data': {'text': 'Make sure to RSVP!',
   'ref_id': 'e945c67e6bca859e2d39c4ed33a02346'}},
 {'data': {'text': 'Best.', 'ref_id': '5550577db69c2c8aabcd90979698120a'}},
 {'data': {'text': 'Mallori Harrell',
   'ref_id': 'ca1c571d993b6c1ed8ef56a06c16ba22'}},
 {'data': {'text': 'Unstructured Technologies',
   'ref_id': 'd5b612de8cd918addd9569b0255b65b2'}},
 {'data': {'text': 'Data Scientist',
   'ref_id': 'd69b468e295fa01cdb3b7c3f0bd34114'}}]

## Section 5: Defining the Pipeline API<a id="pipeline"></a>

This API will be able to handle `.txt`, `.docx`, `.pptx`, `.jpg`, `.png`, `.eml`, `.html`, and `.pdf` documents. The following lines of code will demonstrate this for a couple of file types. To learn how to use the specific partition functions (e.g. `partition_email`, `partition_html`, etc.) See the notebooks in the `exploration-notebooks` directory.

In [None]:
# pipeline-api

from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pypdf import PdfReader, PdfWriter
from unstructured.partition.auto import partition
from unstructured.staging.base import convert_to_isd, convert_to_dataframe, elements_from_json
import tempfile
import pdfminer
import requests
import time

In [None]:
# pipeline-api

DEFAULT_MIMETYPES = "application/pdf,application/msword,image/jpeg,image/png,text/markdown," \
                    "text/x-markdown,text/html," \
                    "application/vnd.openxmlformats-officedocument.wordprocessingml.document," \
                    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet," \
                    "application/vnd.ms-excel,application/vnd.openxmlformats-officedocument." \
                    "presentationml.presentation," \
                    "application/json," \
                    "application/vnd.ms-powerpoint," \
                    "text/html,message/rfc822,text/plain,image/png," \
                    "application/epub,application/epub+zip," \
                    "application/rtf,text/rtf," \
                    "application/vnd.oasis.opendocument.text," \
                    "text/csv,text/x-csv,application/csv,application/x-csv," \
                    "text/comma-separated-values,text/x-comma-separated-values," \
                    "application/xml,text/xml,text/x-rst,text/prs.fallenstein.rst," \
                    "text/tsv,text/tab-separated-values," \
                    "application/x-ole-storage,application/vnd.ms-outlook,"

if not os.environ.get("UNSTRUCTURED_ALLOWED_MIMETYPES", None):
    os.environ["UNSTRUCTURED_ALLOWED_MIMETYPES"] =  DEFAULT_MIMETYPES

In [None]:
# pipeline-api

def get_pdf_splits(pdf, split_size=1):
    '''
    Given a pdf (PdfReader) with n pages, split it into pdfs each with split_size # of pages
    Return the files with their page offset in the form [( BytesIO, int)]
    '''
    split_pdfs = []

    offset = 0

    while offset < len(pdf.pages):
        new_pdf = PdfWriter()
        pdf_buffer = io.BytesIO()

        end = offset+split_size
        for page in pdf.pages[offset : end]:
            new_pdf.add_page(page)

        new_pdf.write(pdf_buffer)
        pdf_buffer.seek(0)

        split_pdfs.append((pdf_buffer, offset))
        offset += split_size

    return split_pdfs


def partition_file_via_api(file_tuple, request, filename, content_type, **partition_kwargs):
    '''
    Send the given file to be partitioned remotely with retry logic,
    where the remote url is set by env var.
    
    Args:
    file_tuple is in the form (file, page_offest)
    request is used to forward the api key header
    filename and content_type are passed in the file form data
    partition_kwargs holds any form parameters to be sent on
    '''
    request_url = os.environ.get("UNSTRUCTURED_PARALLEL_MODE_URL")
    
    if not request_url:
        raise HTTPException(status_code=500, detail="Parallel mode enabled but no url set!")
    
    file, page_offset = file_tuple
    
    headers = {
        "unstructured-api-key": request.headers.get("unstructured-api-key")
    }
    
    # Retry parameters
    try_attempts = int(os.environ.get("UNSTRUCTURED_PARALLEL_RETRY_ATTEMPTS", 1)) + 1
    retry_backoff_time = float(os.environ.get("UNSTRUCTURED_PARALLEL_RETRY_BACKOFF_TIME", 1.0))
    
    while try_attempts >= 0:
        response = requests.post(
            request_url,
            files={"files": (filename, file, content_type)},
            data=partition_kwargs,
            headers=headers,
        )
        try_attempts -= 1
        non_retryable_error_codes = [400, 401, 402, 403]
        status_code = response.status_code
        if status_code != 200:
            if try_attempts == 0 or status_code in non_retryable_error_codes:
                detail = response.json().get("detail") or response.text
                raise HTTPException(status_code=response.status_code, detail=detail)
            else:
                # Retry after backoff
                time.sleep(retry_backoff_time)
        else:
            break

    elements = elements_from_json(text=response.text)

    # We need to account for the original page numbers
    for element in elements:
        element.metadata.page_number += page_offset

    return elements

def partition_pdf_splits(request, file, file_filename, content_type, coordinates, **partition_kwargs):
    '''
    Split a pdf into chunks and process in parallel with more api calls, or partition
    locally if the chunk is small enough. As soon as any remote call fails, bubble up
    the error.
    
    Arguments:
    request is used to forward relevant headers to the api calls
    file, file_filename and content_type are passed on in the file argument to requests.post
    coordinates is passed on to the api calls, but cannot be used in the local partition case
    partition_kwargs holds any others parameters that will be forwarded, or passed to partition
    '''    
    pages_per_pdf = int(os.environ.get("UNSTRUCTURED_PARALLEL_MODE_SPLIT_SIZE", 1))
    pdf = PdfReader(file)

    # If it's small enough, just process locally
    if len(pdf.pages) <= pages_per_pdf:
        return partition(
            file=file,
            file_filename=file_filename,
            content_type=content_type,
            **partition_kwargs
       )

    results = []
    page_tuples = get_pdf_splits(pdf, split_size=pages_per_pdf)
    
    partition_func = partial(
        partition_file_via_api,
        request=request,
        filename=file_filename,
        content_type=content_type,
        coordinates=coordinates,
         **partition_kwargs
    )

    thread_count = int(os.environ.get("UNSTRUCTURED_PARALLEL_MODE_THREADS", 3))
    with ThreadPoolExecutor(max_workers=thread_count) as executor:
        for result in executor.map(partition_func, page_tuples):
            results.extend(result)

    return results

In [None]:
# pipeline-api

def pipeline_api(
    file, 
    request=None,
    filename='',
    m_strategy=[],
    m_coordinates=[],
    m_ocr_languages=[],
    m_encoding=[],
    m_xml_keep_tags=[],
    m_pdf_infer_table_structure = [],
    file_content_type=None,
    response_type="application/json"
):
    if filename.endswith(".msg"):
        # Note(yuming): convert file type for msg files
        # since fast api might sent the wrong one.
        file_content_type = "application/x-ole-storage"
        
    if filename.endswith(".pdf") and PdfReader(file).is_encrypted:
        raise HTTPException(
            status_code=400,
            detail=f"File: {filename} is encrypted. Please decrypt it with password."
        )
    
    strategy = (m_strategy[0] if len(m_strategy) else 'fast').lower()
    strategies = ['fast', 'hi_res', 'auto', 'ocr_only']
    if strategy not in strategies:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid strategy: {strategy}. Must be one of {strategies}"
        )
    
    show_coordinates_str = (m_coordinates[0] if len(m_coordinates) else "false").lower()
    show_coordinates = show_coordinates_str == "true"
    
    # Parallel mode is set by env variable
    enable_parallel_mode = os.environ.get("UNSTRUCTURED_PARALLEL_MODE_ENABLED", "false")
    pdf_parallel_mode_enabled = enable_parallel_mode == "true"
    
    ocr_languages= ('+'.join(m_ocr_languages) if len(m_ocr_languages) else 'eng').lower()

    encoding = m_encoding[0] if len(m_encoding) else None
    
    xml_keep_tags_str = (m_xml_keep_tags[0] if len(m_xml_keep_tags) else "false").lower()
    xml_keep_tags = xml_keep_tags_str == "true"
    
    pdf_infer_table_structure = (
        m_pdf_infer_table_structure[0] if len(m_pdf_infer_table_structure) else "false"
    ).lower()
    if strategy == "hi_res" and pdf_infer_table_structure == "true":
        pdf_infer_table_structure = True
    else:
        pdf_infer_table_structure = False
    
    try:
        if file_content_type == "application/pdf" and pdf_parallel_mode_enabled:
            elements = partition_pdf_splits(
                request,
                file=file,
                file_filename=filename,
                content_type=file_content_type,
                strategy=strategy,
                ocr_languages=ocr_languages,
                coordinates=show_coordinates,
                pdf_infer_table_structure=pdf_infer_table_structure,
                encoding=encoding,
            )
        else:
            elements = partition(
                file=file,
                file_filename=filename,
                content_type=file_content_type,
                strategy=strategy,
                ocr_languages=ocr_languages,
                pdf_infer_table_structure=pdf_infer_table_structure,
                encoding=encoding,
                xml_keep_tags=xml_keep_tags,
            )
    except ValueError as e:
        if 'Invalid file' in e.args[0]:
            raise HTTPException(status_code=400, detail=f"{file_content_type} not currently supported")
        raise e
    except pdfminer.pdfparser.PDFSyntaxError:
        raise HTTPException(status_code=400, detail=f"{filename} does not appear to be a valid PDF")

    if response_type == "text/csv":
        df = convert_to_dataframe(elements)
        df["filename"] = os.path.basename(filename)
        if not show_coordinates:
            df.drop(columns=["coordinates"], inplace=True)
        
        return df.to_csv(index=False)
    
    result = convert_to_isd(elements)
    for element in result:
        element['metadata']['filename'] = os.path.basename(filename)

        if not show_coordinates:
            del element['coordinates']
        
    return result

In [None]:
with open(filename, 'rb') as file:
    email_data = pipeline_api(file=file, filename=filename)

In [None]:
email_data

[{'type': 'UncategorizedText',
  'coordinate_system': None,
  'layout_width': None,
  'layout_height': None,
  'element_id': 'db1ca22813f01feda8759ff04a844e56',
  'metadata': {'date': '2022-12-21T10:28:53-06:00',
   'filetype': 'message/rfc822',
   'sent_from': ['Mallori Harrell <mallori@unstructured.io>'],
   'sent_to': ['Mallori Harrell <mallori@unstructured.io>'],
   'subject': 'Family Day',
   'filename': 'family-day.eml'},
  'text': 'Hi All,'},
 {'type': 'NarrativeText',
  'coordinate_system': None,
  'layout_width': None,
  'layout_height': None,
  'element_id': 'a663c393a5e143c01ef2bb5c98efa2c1',
  'metadata': {'date': '2022-12-21T10:28:53-06:00',
   'filetype': 'message/rfc822',
   'sent_from': ['Mallori Harrell <mallori@unstructured.io>'],
   'sent_to': ['Mallori Harrell <mallori@unstructured.io>'],
   'subject': 'Family Day',
   'filename': 'family-day.eml'},
  'text': 'Get excited for our first annual family day!\xa0'},
 {'type': 'NarrativeText',
  'coordinate_system': None,

In [None]:
# Setting response to be csv

with open(filename, 'rb') as file:
    email_data = pipeline_api(file=file, filename=filename, response_type="text/csv")

In [None]:
email_data

'type,text,element_id,coordinate_system,layout_width,layout_height,filename,page_number,url,sent_from,sent_to,subject,sender\nUncategorizedText,"Hi All,",db1ca22813f01feda8759ff04a844e56,,,,family-day.eml,,,[\'Mallori Harrell <mallori@unstructured.io>\'],[\'Mallori Harrell <mallori@unstructured.io>\'],Family Day,Mallori Harrell <mallori@unstructured.io>\nNarrativeText,Get excited for our first annual family day!\xa0,a663c393a5e143c01ef2bb5c98efa2c1,,,,family-day.eml,,,[\'Mallori Harrell <mallori@unstructured.io>\'],[\'Mallori Harrell <mallori@unstructured.io>\'],Family Day,Mallori Harrell <mallori@unstructured.io>\nNarrativeText,"There will be face painting, a petting zoo, funnel cake and more.",ce65ca3bef59957d3f1c2bab5725c82f,,,,family-day.eml,,,[\'Mallori Harrell <mallori@unstructured.io>\'],[\'Mallori Harrell <mallori@unstructured.io>\'],Family Day,Mallori Harrell <mallori@unstructured.io>\nNarrativeText,Make sure to RSVP!,d7bcf988af9f06042d83e25c531e5744,,,,family-day.eml,,,[\'Mal

## Now let's use the API for a pdf

In [None]:
filename_txt = get_filename("sample-docs", "fake-text.txt")

In [None]:
with open(filename_txt, 'rb') as file:
    text_elements = pipeline_api(file=file, filename=filename_txt)

In [None]:
text_elements

[{'type': 'NarrativeText',
  'coordinate_system': None,
  'layout_width': None,
  'layout_height': None,
  'element_id': '1df8eeb8be847c3a1a7411e3be3e0396',
  'metadata': {'filetype': 'text/plain', 'filename': 'fake-text.txt'},
  'text': 'This is a test document to use for unit tests.'},
 {'type': 'Title',
  'coordinate_system': None,
  'layout_width': None,
  'layout_height': None,
  'element_id': '9c218520320f238595f1fde74bdd137d',
  'metadata': {'filetype': 'text/plain', 'filename': 'fake-text.txt'},
  'text': 'Important points:'},
 {'type': 'ListItem',
  'coordinate_system': None,
  'layout_width': None,
  'layout_height': None,
  'element_id': '39a3ae572581d0f1fe7511fd7b3aa414',
  'metadata': {'filetype': 'text/plain', 'filename': 'fake-text.txt'},
  'text': 'Hamburgers are delicious'},
 {'type': 'ListItem',
  'coordinate_system': None,
  'layout_width': None,
  'layout_height': None,
  'element_id': 'fc1adcb8eaceac694e500a103f9f698f',
  'metadata': {'filetype': 'text/plain', 'fil