# Project titel: Langgraph "Automated Workflow Management System Using State Graphs"

Contributor: Rajeev singh sisodiya

#Project Overview:
This project involves developing an automated workflow management system utilizing state graphs, a method to represent and manage complex workflows in a structured and efficient manner. The system is designed to handle various tasks based on user input, process data, make decisions, assign tasks, and interact with APIs, all while maintaining a log of activities.

The core of the system is built around a WorkflowState class that captures the current state of the workflow, including user inputs, data, decisions, task assignments, and other key variables. The workflow is managed by defining nodes (functions) that perform specific tasks like collecting user input, retrieving data, making decisions, assigning tasks, coordinating processes, interacting with APIs, processing data, sending notifications, logging activities, generating output, and finally terminating the workflow.

The state transitions between these nodes are governed by conditional logic, ensuring that the workflow follows a path that adapts to the current state and decisions made during execution. The workflow is compiled into a state graph using the StateGraph class, enabling the automated execution of tasks in sequence or based on conditional branches.

This approach provides a robust solution for automating and managing workflows, especially in scenarios requiring conditional decision-making and interaction with external systems via APIs. The system is extensible, allowing for the addition of new nodes and logic as needed to accommodate more complex workflows.











In [None]:
!pip install langgraph

Collecting langgraph
  Downloading langgraph-0.2.14-py3-none-any.whl.metadata (13 kB)
Collecting langchain-core<0.3,>=0.2.27 (from langgraph)
  Downloading langchain_core-0.2.36-py3-none-any.whl.metadata (6.2 kB)
Collecting langgraph-checkpoint<2.0.0,>=1.0.2 (from langgraph)
  Downloading langgraph_checkpoint-1.0.6-py3-none-any.whl.metadata (4.5 kB)
Collecting jsonpatch<2.0,>=1.33 (from langchain-core<0.3,>=0.2.27->langgraph)
  Downloading jsonpatch-1.33-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting langsmith<0.2.0,>=0.1.75 (from langchain-core<0.3,>=0.2.27->langgraph)
  Downloading langsmith-0.1.106-py3-none-any.whl.metadata (13 kB)
Collecting tenacity!=8.4.0,<9.0.0,>=8.1.0 (from langchain-core<0.3,>=0.2.27->langgraph)
  Downloading tenacity-8.5.0-py3-none-any.whl.metadata (1.2 kB)
Collecting jsonpointer>=1.9 (from jsonpatch<2.0,>=1.33->langchain-core<0.3,>=0.2.27->langgraph)
  Downloading jsonpointer-3.0.0-py2.py3-none-any.whl.metadata (2.3 kB)
Collecting httpx<1,>=0.23.0 (from l

In [15]:
from langgraph.graph import StateGraph, END
from typing import List
from langgraph.graph import MessagesState

# Define the state with necessary keys
class WorkflowState(MessagesState):
    user_input: str
    data: List[str]
    decision: str
    task_assigned: bool
    api_success: bool
    notification_sent: bool
    log: List[str]

# Define the nodes
def collect_user_input(state: WorkflowState) -> WorkflowState:
    # Simulate user input collection and validation
    if state.get("user_input") == "valid":
        state.update({"log": state.get("log", []) + ["User input valid"]})
    else:
        state.update({"log": state.get("log", []) + ["User input invalid"]})
    return state

def data_retrieval(state: WorkflowState) -> WorkflowState:
    # Simulate data retrieval
    if state.get("user_input") == "valid":
        state.update({"data": ["data1", "data2"], "log": state.get("log", []) + ["Data retrieved"]})
    else:
        state.update({"log": state.get("log", []) + ["Data retrieval failed"]})
    return state

def decision_making(state: WorkflowState) -> WorkflowState:
    # Simulate decision making
    if "data1" in state.get("data", []):
        state.update({"decision": "proceed", "log": state.get("log", []) + ["Decision made to proceed"]})
    else:
        state.update({"decision": "alternative", "log": state.get("log", []) + ["Decision made to take alternative action"]})
    return state

def task_assignment(state: WorkflowState) -> WorkflowState:
    # Simulate task assignment
    if state.get("decision") == "proceed":
        state.update({"task_assigned": True, "log": state.get("log", []) + ["Task assigned"]})
    else:
        state.update({"task_assigned": False, "log": state.get("log", []) + ["Task not assigned"]})
    return state

def process_coordination(state: WorkflowState) -> WorkflowState:
    # Simulate process coordination
    if state.get("task_assigned"):
        state.update({"log": state.get("log", []) + ["Process coordinated successfully"]})
    else:
        state.update({"log": state.get("log", []) + ["Process coordination failed"]})
    return state

def api_interaction(state: WorkflowState) -> WorkflowState:
    # Simulate API interaction
    if state.get("task_assigned"):
        state.update({"api_success": True, "log": state.get("log", []) + ["API interaction successful"]})
    else:
        state.update({"api_success": False, "log": state.get("log", []) + ["API interaction failed"]})
    return state

def data_processing(state: WorkflowState) -> WorkflowState:
    # Simulate data processing
    if state.get("api_success"):
        state.update({"log": state.get("log", []) + ["Data processed successfully"]})
    else:
        state.update({"log": state.get("log", []) + ["Data processing failed"]})
    return state

def notification(state: WorkflowState) -> WorkflowState:
    # Simulate notification sending
    if state.get("api_success"):
        state.update({"notification_sent": True, "log": state.get("log", []) + ["Notification sent"]})
    else:
        state.update({"notification_sent": False, "log": state.get("log", []) + ["Notification not sent"]})
    return state

def logging(state: WorkflowState) -> WorkflowState:
    # Simulate logging
    state.update({"log": state.get("log", []) + ["Logging complete"]})
    return state

def output_generation(state: WorkflowState) -> WorkflowState:
    # Simulate output generation
    if state.get("api_success"):
        state.update({"log": state.get("log", []) + ["Output generated successfully"]})
    else:
        state.update({"log": state.get("log", []) + ["Output generation failed"]})
    return state

def workflow_termination(state: WorkflowState) -> WorkflowState:
    # Simulate workflow termination
    state.update({"log": state.get("log", []) + ["Workflow terminated"]})
    return state

# Create the graph
workflow = StateGraph(WorkflowState)

# Add nodes to the graph
workflow.add_node("collect_user_input", collect_user_input)
workflow.add_node("data_retrieval", data_retrieval)
workflow.add_node("decision_making", decision_making)
workflow.add_node("task_assignment", task_assignment)
workflow.add_node("process_coordination", process_coordination)
workflow.add_node("api_interaction", api_interaction)
workflow.add_node("data_processing", data_processing)
workflow.add_node("notification", notification)
workflow.add_node("logging", logging)
workflow.add_node("output_generation", output_generation)
workflow.add_node("workflow_termination", workflow_termination)

# Define the edges with conditional logic
workflow.set_entry_point("collect_user_input")
workflow.add_conditional_edges("collect_user_input", lambda s: "data_retrieval" if s.get("user_input") == "valid" else END)
workflow.add_edge("data_retrieval", "decision_making")
workflow.add_conditional_edges("decision_making", lambda s: "task_assignment" if s.get("decision") == "proceed" else END)
workflow.add_edge("task_assignment", "process_coordination")
workflow.add_edge("process_coordination", "api_interaction")
workflow.add_edge("api_interaction", "data_processing")
workflow.add_edge("data_processing", "notification")
workflow.add_edge("notification", "logging")
workflow.add_edge("logging", "output_generation")
workflow.add_edge("output_generation", "workflow_termination")
workflow.add_edge("workflow_termination", END)

# Compile the graph
app = workflow.compile()

# Example invocation
initial_state = {
    "user_input": "valid",
    "messages": [],
    "log": []
}
result = app.invoke(initial_state)
print(result)



{'messages': [], 'user_input': 'valid', 'data': ['data1', 'data2'], 'decision': 'proceed', 'task_assigned': True, 'api_success': True, 'notification_sent': True, 'log': ['User input valid', 'Data retrieved', 'Decision made to proceed', 'Task assigned', 'Process coordinated successfully', 'API interaction successful', 'Data processed successfully', 'Notification sent', 'Logging complete', 'Output generated successfully', 'Workflow terminated']}


#Final Verification

In [16]:
{
  'messages': [],
  'user_input': 'valid',
  'data': ['data1', 'data2'],
  'decision': 'proceed',
  'task_assigned': True,
  'api_success': True,
  'notification_sent': True,
  'log': [
    'User input valid',
    'Data retrieved',
    'Decision made to proceed',
    'Task assigned',
    'Process coordinated successfully',
    'API interaction successful',
    'Data processed successfully',
    'Notification sent',
    'Logging complete',
    'Output generated successfully',
    'Workflow terminated'
  ]
}


{'messages': [],
 'user_input': 'valid',
 'data': ['data1', 'data2'],
 'decision': 'proceed',
 'task_assigned': True,
 'api_success': True,
 'notification_sent': True,
 'log': ['User input valid',
  'Data retrieved',
  'Decision made to proceed',
  'Task assigned',
  'Process coordinated successfully',
  'API interaction successful',
  'Data processed successfully',
  'Notification sent',
  'Logging complete',
  'Output generated successfully',
  'Workflow terminated']}

Accuracy:

The above results are accurate and consistent with the expected workflow behavior. Each step was completed successfully, and all log entries correctly reflect the progress through the workflow. The final state includes all expected data and log entries.







#Implementation of a workflow management system
Below code is a Python implementation of a workflow management system using a state graph model, which is designed to handle and automate various stages of a workflow process based on user input and predefined logic. The system is built using the langgraph library, which facilitates the creation and manipulation of state graphs, making it easier to manage complex workflows.



In [14]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Literal, Union
from langgraph.graph import MessagesState


# Define the state with necessary keys
class WorkflowState(MessagesState):
    user_input: str
    data: List[str]
    decision: str
    task_assigned: bool
    api_success: bool
    notification_sent: bool
    log: List[str]

# Define the nodes
def collect_user_input(state: WorkflowState) -> WorkflowState:
    # Simulate user input collection and validation
    if state.get("user_input") == "valid":
        return {"log": ["User input valid"]}
    else:
        return {"log": ["User input invalid"]}

# Create the graph
workflow = StateGraph(WorkflowState)

# Add nodes to the graph
# Ensure that the 'data_retrieval' node is added BEFORE the conditional edge is defined
workflow.add_node("collect_user_input", collect_user_input)
workflow.add_node("data_retrieval", data_retrieval) # Add this line if it was missing
# ... (rest of your node additions remain unchanged)

# Define the edges with conditional logic
workflow.set_entry_point("collect_user_input")
# Double-check the node name in the conditional edge definition for typos
workflow.add_conditional_edges("collect_user_input", lambda s: "data_retrieval" if s.get("user_input") == "valid" else END)
# ... (rest of your edge definitions remain unchanged)

# Compile the graph
app = workflow.compile()

# Example invocation
initial_state = {
    "user_input": "valid",
    "messages": [],
    "log": []
}
result = app.invoke(initial_state)
print(result)



{'messages': [], 'user_input': 'valid', 'data': ['data1', 'data2'], 'log': ['Data retrieved']}


Summary:
The result indicates that the workflow has progressed successfully. The user input was valid, leading to the successful retrieval of data ('data1' and 'data2'), and this action was logged. There are no error messages or issues reported, suggesting that the workflow is functioning as intended up to this point.