# Process mining with NLP

This workbook follows an example of construction equipment being delivered. Documents, emails, and voicemails are generated regarding the equipment, and NLP must be performed to extract process-mining features. The features are then ran through the inductive mining algorithm to derive a process.

In [21]:
import json
import pathlib
import pm4py
import pandas as pd
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_anthropic import ChatAnthropic
from pydantic import BaseModel
from dotenv import load_dotenv
import getpass
import os
from IPython.display import HTML, display

In [22]:
# Setup the pydantic class to validate output
class output_format(BaseModel):
    """
    Columns required by PM4Py:
    - case:concept:name (string or int): identifies the process instance
    - concept:name (string): the activity name
    - time:timestamp (datetime): when the event occurred
    """

    case_concept_name: str
    concept_name: str


# Configure the JSON output parser
output_parser = JsonOutputParser(pydantic_object=output_format)

# Create the prompt template
prompt = PromptTemplate(
    template="""Extract the following information from this event text:
1. case_concept_name: The order number or case identifier (normalize variations like "#234123", "234-123", "two-three-four-one-two-three" to just "234123")
2. concept_name: A concise activity name describing what happened (e.g., "Purchase Order Submitted", "Production Started", "Delivered")

Event text:
{event_text}

{format_instructions}""",
    input_variables=["event_text"],
    partial_variables={"format_instructions": output_parser.get_format_instructions()},
)

# Configure LLM
load_dotenv()
if "ANTHROPIC_API_KEY" not in os.environ:
    os.environ["ANTHROPIC_API_KEY"] = getpass.getpass("Enter your Anthropic API key: ")
llm = ChatAnthropic(model="claude-haiku-4-5", temperature=0)

# Create the chain
chain = prompt | llm | output_parser

# Test with a single event
test_event = {
    "content": "Purchase order submitted by Sarah Chen - Springfield Mega Mall Construction Project. Need 12 precast concrete footers, spec F-2400x800x600mm. Order ref: #234123. Budget approved by Frank in accounting. Rush delivery requested for Phase 2 foundation work.",
    "timestamp": "2011-01-01T09:15:23.000+01:00",
}

print("Testing with first event:")
print(f"Content: {test_event['content'][:100]}...")
print()

result = chain.invoke({"event_text": test_event["content"]})
print("Extracted:")
print(json.dumps(result, indent=2))

Testing with first event:
Content: Purchase order submitted by Sarah Chen - Springfield Mega Mall Construction Project. Need 12 precast...

Extracted:
{
  "case_concept_name": "234123",
  "concept_name": "Purchase Order Submitted"
}


# Classify the Events

In [23]:
# load the sample events
working_directory = pathlib.Path().resolve()
with open(f"{working_directory}/example_2.json") as file:
    data = json.load(file)

# Process all events and build the DataFrame
parsed_events = []

for event in data["events"]:
    try:
        # Extract case_id, activity, and resource using the LLM chain
        extracted = chain.invoke({"event_text": event["content"]})

        # Add to our list with the timestamp
        parsed_events.append(
            {
                "case:concept:name": extracted.get("case_concept_name"),
                "concept:name": extracted.get("concept_name"),
                "time:timestamp": event["timestamp"],
            }
        )

        print(f"Processed: {extracted.get('concept_name', 'Unknown')[:40]}")

    except Exception as e:
        print(f"Error processing event: {e}")
        # Add a placeholder for failed events
        parsed_events.append(
            {
                "case:concept:name": "UNKNOWN",
                "concept:name": "PARSE_ERROR",
                "time:timestamp": event["timestamp"],
            }
        )

# Create DataFrame
parsed_events_df = pd.DataFrame(parsed_events)
parsed_events_df["time:timestamp"] = pd.to_datetime(parsed_events_df["time:timestamp"])
print(f"\nProcessed {len(parsed_events_df)} events")
print(parsed_events_df.head())

Processed: Purchase Order Submitted
Processed: Quote Confirmation
Processed: Order Approved for Production
Processed: Production Started
Processed: QC Inspection Report
Processed: Shipment Scheduled
Processed: Loaded onto Truck
Processed: Delivery Delayed
Processed: Order Delayed
Processed: Delivery Delay Confirmed
Processed: Delivery Confirmed
Processed: Foundation Footers Installed
Processed: Foundation Inspection Approved
Processed: Invoice Issued

Processed 14 events
  case:concept:name                   concept:name            time:timestamp
0            234123       Purchase Order Submitted 2011-01-01 09:15:23+01:00
1            234123             Quote Confirmation 2011-01-01 14:32:00+01:00
2            234123  Order Approved for Production 2011-01-02 08:45:00+01:00
3            234123             Production Started 2011-01-03 06:30:00+01:00
4            234123           QC Inspection Report 2011-01-04 15:20:00+01:00


# Run the Process Mining Algorithm
Viualize the workflow as a Directly-Follows Graph (DFG) with performance metrics. Shows the average time between activities

In [24]:
# Discover performance DFG - returns tuple (dfg_dict, start_activities, end_activities)
performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(
    log=parsed_events_df
)

# Visualize with performance (time) annotations
output_file = f"{working_directory}/example_2_dfg_performance.svg"
pm4py.save_vis_performance_dfg(
    performance_dfg,
    start_activities,
    end_activities,
    str(output_file),
    rankdir="LR",
)
print(f"Performance DFG saved to: {output_file}")

# Display
with open(output_file, "r") as f:
    svg_content = f.read()
    display(HTML(svg_content))

Performance DFG saved to: /home/garre/development/process-mining-demo/examples/example_2_dfg_performance.svg


## BPMN Process

Visualize the process as a BPMN process. There should be a way to derive flowcharts using a resource_id field, but I have not found documentation in PM4PY regarding this.

In [25]:
# Discover BPMN model using inductive miner
bpmn_model = pm4py.discover_bpmn_inductive(log=parsed_events_df)

# Save the BPMN visualization with swimlanes based on resources
output_file = f"{working_directory}/example_2_bpmn.svg"
pm4py.save_vis_bpmn(
    bpmn_model, str(output_file), parameters={"format": "svg"}, rankdir="LR"
)
print(f"BPMN diagram saved to: {output_file}")

# Read and display diagram as HTML
with open(output_file, "r") as f:
    svg_content = f.read()
    display(HTML(svg_content))

BPMN diagram saved to: /home/garre/development/process-mining-demo/examples/example_2_bpmn.svg
