<a href="https://colab.research.google.com/github/graphlit/graphlit-samples/blob/main/python/Notebook%20Examples/Graphlit_2024_12_24_Auto_Filter_Microsoft_Emails_in_Conversation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Description**

This example shows how to ingest emails from a user's Microsoft email account, and auto-filter when prompting a conversation. This is a prototype of how you could auto-filter the knowledge base based on the user's prompt, via time range and observed entities.

**Requirements**

Prior to running this notebook, you will need to [signup](https://docs.graphlit.dev/getting-started/signup) for Graphlit, and [create a project](https://docs.graphlit.dev/getting-started/create-project).

You will need the Graphlit organization ID, preview environment ID and JWT secret from your created project.

Assign these properties as Colab secrets: GRAPHLIT_ORGANIZATION_ID, GRAPHLIT_ENVIRONMENT_ID and GRAPHLIT_JWT_SECRET.

To access your Microsoft email, assign these properties as Colab secrets: MICROSOFT_CLIENT_ID, MICROSOFT_CLIENT_SECRET and MICROSOFT_REFRESH_TOKEN.

---

Install Graphlit Python client SDK

In [None]:
!pip install --upgrade graphlit-client

Initialize Graphlit

In [None]:
import os
from google.colab import userdata
from graphlit import Graphlit
from graphlit_api import input_types, enums, exceptions

os.environ['GRAPHLIT_ORGANIZATION_ID'] = userdata.get('GRAPHLIT_ORGANIZATION_ID')
os.environ['GRAPHLIT_ENVIRONMENT_ID'] = userdata.get('GRAPHLIT_ENVIRONMENT_ID')
os.environ['GRAPHLIT_JWT_SECRET'] = userdata.get('GRAPHLIT_JWT_SECRET')

graphlit = Graphlit()

In [None]:
os.environ['MICROSOFT_CLIENT_ID'] = userdata.get('MICROSOFT_CLIENT_ID')
os.environ['MICROSOFT_CLIENT_SECRET'] = userdata.get('MICROSOFT_CLIENT_SECRET')
os.environ['MICROSOFT_REFRESH_TOKEN'] = userdata.get('MICROSOFT_REFRESH_TOKEN')

Define Graphlit helper functions

In [None]:
from typing import List, Optional
import json

async def extract_text(text: str, model_schema: str, specification_id: Optional[str] = None, prompt: Optional[str] = None):
    if graphlit.client is None:
        return;

    default_name = "extract_pydantic_model"

    default_prompt = """
    Extract data using the tools provided.
    """

    try:
        response = await graphlit.client.extract_text(
            specification=input_types.EntityReferenceInput(id=specification_id) if specification_id is not None else None,
            tools=[input_types.ToolDefinitionInput(name=default_name, schema=model_schema)],
            prompt=default_prompt if prompt is None else prompt,
            text=text
        )

        if response.extract_text is None:
            print('Failed to extract text.')
            return None

        return response.extract_text
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

    return None

async def create_google_extraction_specification(model: enums.GoogleModels):
    if graphlit.client is None:
        return;

    input = input_types.SpecificationInput(
        name=f"Google [{model}]",
        type=enums.SpecificationTypes.EXTRACTION,
        serviceType=enums.ModelServiceTypes.GOOGLE,
        google=input_types.GoogleModelPropertiesInput(
            model=model,
        )
    )

    try:
        response = await graphlit.client.create_specification(input)

        return response.create_specification.id if response.create_specification is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

    return None

async def create_openai_extraction_specification(model: enums.OpenAIModels):
    if graphlit.client is None:
        return;

    input = input_types.SpecificationInput(
        name=f"OpenAI [{model}]",
        type=enums.SpecificationTypes.EXTRACTION,
        serviceType=enums.ModelServiceTypes.OPEN_AI,
        openAI=input_types.OpenAIModelPropertiesInput(
            model=model,
        )
    )

    try:
        response = await graphlit.client.create_specification(input)

        return response.create_specification.id if response.create_specification is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

    return None

# Create entity extraction workflow using LLM specification
async def create_extraction_workflow(specification_id: str):
    if graphlit.client is None:
        return;

    input = input_types.WorkflowInput(
        name="Entity Extraction",
        extraction=input_types.ExtractionWorkflowStageInput(
            jobs=[
                input_types.ExtractionWorkflowJobInput(
                    connector=input_types.EntityExtractionConnectorInput(
                        type=enums.EntityExtractionServiceTypes.MODEL_TEXT,
                        modelText=input_types.ModelTextExtractionPropertiesInput(
                            specification=input_types.EntityReferenceInput(id=specification_id)
                        ),
                        extractedTypes=[enums.ObservableTypes.LABEL]
                    )
                )
            ]
        )
    )

    try:
        response = await graphlit.client.create_workflow(input)

        return response.create_workflow.id if response.create_workflow is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

    return None

async def create_feed(workflow_id: str):
    if graphlit.client is None:
        return;

    input = input_types.FeedInput(
        name="Microsoft Email",
        type=enums.FeedTypes.EMAIL,
        email=input_types.EmailFeedPropertiesInput(
            type=enums.FeedServiceTypes.MICROSOFT_EMAIL,
            microsoft=input_types.MicrosoftEmailFeedPropertiesInput(
                type=enums.EmailListingTypes.PAST,
                refreshToken=os.environ['MICROSOFT_REFRESH_TOKEN'],
                clientId=os.environ['MICROSOFT_CLIENT_ID'],
                clientSecret=os.environ['MICROSOFT_CLIENT_SECRET']
            ),
            readLimit=25 # limiting to 25 emails
        ),
        workflow=input_types.EntityReferenceInput(id=workflow_id) if workflow_id is not None else None,
    )

    try:
        response = await graphlit.client.create_feed(input)

        return response.create_feed.id if response.create_feed is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

    return None

async def is_feed_done(feed_id: str):
    if graphlit.client is None:
        return;

    response = await graphlit.client.is_feed_done(feed_id)

    return response.is_feed_done.result if response.is_feed_done is not None else None

async def query_contents(search_text: str):
    if graphlit.client is None:
        return;

    try:
        response = await graphlit.client.query_contents(
            filter=input_types.ContentFilter(
                search=search_text,
                searchType=enums.SearchTypes.HYBRID
            )
        )

        return response.contents.results if response.contents is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

async def query_labels(search_text: Optional[str] = None):
    if graphlit.client is None:
        return;

    try:
        response = await graphlit.client.query_labels(
            filter=input_types.LabelFilter(
                search=search_text
            )
        )

        return response.labels.results if response.labels is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

async def create_openai_specification(model: enums.OpenAIModels):
    if graphlit.client is None:
        return;

    input = input_types.SpecificationInput(
        name=f"OpenAI [{str(model)}]",
        type=enums.SpecificationTypes.COMPLETION,
        serviceType=enums.ModelServiceTypes.OPEN_AI,
        openAI=input_types.OpenAIModelPropertiesInput(
            model=model,
        ),
        retrievalStrategy=input_types.RetrievalStrategyInput(
            type=enums.RetrievalStrategyTypes.SECTION
        ),
        rerankingStrategy=input_types.RerankingStrategyInput(
            serviceType=enums.RerankingModelServiceTypes.COHERE
        ),
        searchType=enums.ConversationSearchTypes.NONE
    )

    try:
        response = await graphlit.client.create_specification(input)

        return response.create_specification.id if response.create_specification is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

    return None

async def create_conversation(specification_id: str, in_last: Optional[str], observations: Optional[List[input_types.ObservationCriteriaInput]] = None):
    if graphlit.client is None:
        return;

    input = input_types.ConversationInput(
        name="Conversation",
        specification=input_types.EntityReferenceInput(
            id=specification_id
        ),
        filter=input_types.ContentCriteriaInput(
            inLast=in_last,
            observations=observations
        ),
    )

    try:
        response = await graphlit.client.create_conversation(input)

        return response.create_conversation.id if response.create_conversation is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

async def delete_conversation(conversation_id: str):
    if graphlit.client is None:
        return;

    if conversation_id is not None:
        _ = await graphlit.client.delete_conversation(conversation_id)

async def prompt_conversation(conversation_id: str, prompt: str):
    if graphlit.client is None:
        return None, None

    try:
        response = await graphlit.client.prompt_conversation(prompt, conversation_id, include_details=True)

        message = response.prompt_conversation.message.message if response.prompt_conversation is not None and response.prompt_conversation.message is not None else None
        details = response.prompt_conversation.details if response.prompt_conversation is not None else None

        return message, details
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None, None

async def delete_all_specifications():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_specifications(is_synchronous=True)

async def delete_all_workflows():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_workflows(is_synchronous=True)

async def delete_all_contents():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_contents(is_synchronous=True)

async def delete_all_feeds():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_feeds(is_synchronous=True)

async def delete_all_conversations():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_conversations(is_synchronous=True)

async def delete_all_observables():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_persons()
    _ = await graphlit.client.delete_all_organizations()
    _ = await graphlit.client.delete_all_places()
    _ = await graphlit.client.delete_all_events()
    _ = await graphlit.client.delete_all_products()
    _ = await graphlit.client.delete_all_softwares()
    _ = await graphlit.client.delete_all_repos()
    _ = await graphlit.client.delete_all_labels()
    _ = await graphlit.client.delete_all_categories()

Execute Graphlit example

In [None]:
from IPython.display import display, Markdown
import time

# Remove any existing specifications, feeds, contents and workflows; only needed for notebook example
await delete_all_workflows()
await delete_all_feeds()
await delete_all_contents()
await delete_all_specifications()
await delete_all_conversations()
await delete_all_observables()

print('Deleted all specifications, feeds, contents, and workflows.')

#specification_id = await create_openai_extraction_specification(enums.OpenAIModels.GPT4O_128K) # NOTE: Mini doesn't generate semantic labels well enough
specification_id = await create_google_extraction_specification(enums.GoogleModels.GEMINI_2_0_FLASH_EXPERIMENTAL)

if specification_id is not None:
    print(f'Created specification [{specification_id}].')

    workflow_id = await create_extraction_workflow(specification_id)

    if workflow_id is not None:
        print(f'Created workflow [{workflow_id}].')


Ingest Microsoft emails

In [None]:
        feed_id = await create_feed(workflow_id)

        if feed_id is not None:
            print(f'Created feed [{feed_id}].')

            # Wait for feed to complete, since ingestion happens asychronously
            done = False
            time.sleep(5)
            while not done:
                done = await is_feed_done(feed_id)

                if not done:
                    time.sleep(10)

            print(f'Completed feed [{feed_id}].')

Search within ingested emails

In [None]:
# NOTE: specify the text to search for in the filtered emails
search_text = "Azure subscription"

# Query contents by search text
contents = await query_contents(search_text)

if contents is not None and len(contents) > 0:
    for content in contents:
        if content is not None:
            display(Markdown(f'### Found Microsoft email [{content.id}] that referenced search text [{search_text}].'))

            if content.original_date is not None:
                print("Date: " + content.original_date)

            metadata = content.email

            if metadata is not None:
                if metadata.subject is not None:
                    print("Subject: " + metadata.subject)
                if metadata.to is not None and len(metadata.to) > 0:
                    print("To: " + ", ".join([f'"{item.name}" <{item.email}>' for item in metadata.to if item is not None]))
                if metadata.from_ is not None and len(metadata.from_) > 0:
                    print("From: " + ", ".join([f'"{item.name}" <{item.email}>' for item in metadata.from_ if item is not None]))

            if content.observations is not None:
                for observation in content.observations:
                    if observation is not None and observation.observable is not None:
                        print(f'{observation.type}: {observation.observable.name}')

            # NOTE: uncomment to see email markdown
            #display(Markdown(content.markdown))
            print()
        else:
            print('No content found')
else:
    print(f'No contents found by search [{search_text}].')


Query for labels by name

In [None]:
ll = await query_labels("billing")

if ll is not None and len(ll) > 0:
    for l in ll:
        if l is not None:
            print(f'Found Label [{l.id}]: {l.name}')

Define your user prompt

In [None]:
#prompt = "Can you summarize my emails from today and highlight any urgent ones?"
#prompt = "can you show me emails in last 3 days where I need to follow up or that may require my response?"
prompt = "can you show me emails in last 3 days which contain an invoice which may need to be paid?"
#prompt = "what emails did I get today?"
#prompt = "can you summarize my emails from the past week?"

In [None]:
import json
import pytz
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime

local_tz = pytz.timezone('America/Los_Angeles')

now = datetime.now(local_tz)

# NOTE: append the current time to the prompt
formatted_time = now.strftime("It is now %-I:%M%p on %B %-d, %Y")

print(formatted_time)

class ContentFilter(BaseModel):
    labels: List[str] = Field(description="List of email labels inferred from the provided user prompt, for filtering content. For optimal search, don't add extra spaces into label names, if possible. Don't include any labels that reference a relative date, like 'today', 'yesterday', etc.")
    in_last: Optional[str] = Field(description="Duration of time, starting from today and going into the past, for filtering content. Use ISO 8601 format, like PT1D. This will be used to calculate a datetime range, like now - 'datetime range'. Treat today as from the previous midnight to now. Treat yesterday as from the previous day's midnight to now. Treat last week as previous 7 days from now.")

in_last = None
observations: List[input_types.ObservationCriteriaInput] = []

if specification_id is not None:
    schema = ContentFilter.model_json_schema()

    if schema is not None:
        json_schema = json.dumps(schema, indent=2)

        print('Schema:\n' + json_schema)
        print()

        extractions = await extract_text(f'{prompt} {formatted_time}', json_schema)

        if extractions is not None:
            extraction = extractions[0]

            if extraction is not None:
                json_str = extraction.value

                data = json.loads(json_str)

                content_filter = ContentFilter.model_validate(data)

                print(f'Time range:\n{content_filter.in_last}')

                in_last = content_filter.in_last

                for label in content_filter.labels:
                    print(f'Extracted Label: {label}')

                    ll = await query_labels(label)

                    if ll is not None and len(ll) > 0:
                        for l in ll:
                            if l is not None:
                                print(f'Found Label [{l.id}]: {l.name}')

                                observations.append(input_types.ObservationCriteriaInput(observable=input_types.EntityReferenceInput(id=l.id), type=enums.ObservableTypes.LABEL))
                    else:
                        print(f'No labels found with name [{label}].')

In [None]:
print(f'Found {len(observations)} observations.')

In [None]:
completion_specification_id = await create_openai_specification(enums.OpenAIModels.GPT4O_128K_20241120)

if completion_specification_id is not None:
    print(f'Created specification [{completion_specification_id}].')

    conversation_id = await create_conversation(completion_specification_id, in_last, observations)

    if conversation_id is not None:
        print(f'Created conversation [{conversation_id}].')

        message, details = await prompt_conversation(conversation_id, prompt)

        if message is not None:
            display(Markdown('### Conversation:'))
            display(Markdown(f'**User:**\n{prompt}'))
            display(Markdown(f'**Assistant:**'))
            print(message)
            print()

        if details is not None:
            display(Markdown('### Details:'))
            display(Markdown(f'**Model**: {details.model_service} {details.model}'))
            display(Markdown(f'**Token Limit**: {details.token_limit}'))
            display(Markdown(f'**Completion Token Limit**: {details.completion_token_limit}'))

            display(Markdown(f'**# Sources**: {details.source_count}'))
            display(Markdown(f'**# Rendered Sources**: {details.rendered_source_count}'))
            display(Markdown(f'**# Ranked Sources**: {details.ranked_source_count}'))

            print()

            if details.sources is not None:
                display(Markdown(f'#### Sources:'))
                print(details.sources)
                print()

            if details.specification is not None:
                display(Markdown(f'#### Specification:'))
                print(details.specification)
                print()

            if details.messages is not None:
                display(Markdown(f'#### Messages:'))

                for message in details.messages:
                    if message is not None and message.message is not None:
                        display(Markdown(f'**{message.role}:**'))
                        print(message.message)

        await delete_conversation(conversation_id)