### **overview:**

演示了如何创建一个基于语言模型的项目管理应用程序，目标是根据项目目标以及成员提供项目的日程管理。

### **motivation**

项目管理是一项复杂的工程，增加了项目管理的难度。项目经理助理代理”的创建旨在通过在流程中引入自动化、智能化和精确性来改变项目的管理方式。基于大预言模型，通过架构化的信息传递，提高项目管理的效率。

### 输出结果说明

运行程序后，将会输出以下信息：

- Current node: <当前节点>
- iteration_number: <迭代次数>
- project_risk_score_iterations: <项目风险评分迭代>
- insights: <见解>

此外，程序还会生成项目时间线的可视化图表。

### **Key Components**

- 自定义项目描述和成员：支持从本地读取项目描述和成员相关信息
- 流程化协作：不同的agent负责不同的功能，定义工作流，流程化管理项目
- 结构化信息：结构化大语言模型输出，提高沟通效率
- 自我反省并更新项目管理：提示大模型进行风险分析并调整项目管理进程

### **Method Details**

结构化信息

```python

class AgentState(TypedDict):
    """The project manager agent state."""
    project_description: str
    team: Team
    tasks: TaskList
    dependencies: DependencyList
    schedule: ScheduleList
    task_allocations: TaskAllocationList
    risks: RiskList
    iteration_number: int
    max_iteration: int
    insights: List[str]
    schedule_iteration: List[ScheduleList]
    task_allocations_iteration: List[TaskAllocationList]
    risks_iteration: List[RiskListIteration]
    project_risk_score_iterations: List[int]
```

node:

`task_generation_node`：根据项目描述，生成具体的项目管理事件，并估计时间持续时间；

`task_dependency_node`：生成项目事件的依赖关系；

`task_scheduler_node`：为么项目事件规划日程；

`task_allocation_node`：为项目事件分配人员；

`risk_assessment_node`：评估项目计划的风险；

`insight_generation_node`：根据项目日程个人员以及项目的风险提供建议以及打分；

### Conclusion

In [None]:
from datetime import datetime, timedelta
import os
from langgraph.checkpoint.memory import MemorySaver

from langchain_community.chat_models import ChatZhipuAI
from dotenv import load_dotenv
from langgraph.graph import END
import numpy as np

import logging
import pandas as pd
from typing import Annotated, Any, Dict, TypedDict, List, Union
from langchain_core.pydantic_v1 import BaseModel, Field
from langgraph.graph import StateGraph, START,END

load_dotenv(override=True)

In [None]:
llm = ChatZhipuAI(temperature=0.5,model="glm-4-flash")
logger = logging.getLogger(__name__)

In [None]:

# data classes
class Task(BaseModel):
    # id: uuid.UUID = Field(default_factory=uuid.uuid4, description="Unique identifier for the task")
    task_name: str = Field(description="Name of the task")
    task_description: str = Field(description="Description of the task")
    estimated_day: int = Field(description="Estimated number of days to complete the task")

class TaskList(BaseModel):
    tasks: List[Task] = Field(description="List of tasks")

class TaskDependency(BaseModel):
    """Task dependency model"""
    task: Task = Field(description="Task")
    # task: str = Field(description="Task")
    dependent_tasks: List[Task] = Field(description="List of dependent tasks")

class TeamMember(BaseModel):
    name: str = Field(description="Name of the team member")
    profile: str = Field(description="Profile of the team member")

class Team(BaseModel):
    team_members: List[TeamMember] = Field(description="List of team members")

# Iterative assessment
class TaskAllocation(BaseModel):
    """Task allocation class"""
    task: Task = Field(description="Task")
    # task: str = Field(description="Task")
    team_member: TeamMember = Field(description="Team members assigned to the task")

class TaskSchedule(BaseModel):
    """Schedule schedule class"""
    task: Task = Field(description="Task")
    # task: str = Field(description="Task")
    start_day: int = Field(description="Start day of the task")
    end_day: int = Field(description="End day of the task")


class DependencyList(BaseModel):
    """List of task dependencies"""
    dependencies: List[TaskDependency] = Field(description="List of task dependencies")

class ScheduleList(BaseModel):
    """List of task schedules"""
    schedule: List[TaskSchedule] = Field(description="List of task schedules")

class TaskAllocationList(BaseModel):
    """List of task allocations"""
    task_allocations: List[TaskAllocation] = Field(description="List of task allocations")

# Iteration
class TaskAllocationListIteration(BaseModel):
    """List of task allocations for each iteration"""
    task_allocations_iteration: List[TaskAllocationList] = Field(description="List of task allocations for each iteration")

class ScheduleIteration(BaseModel):
    """List of task schedules for each iteration"""
    schedule: List[ScheduleList] = Field(description="List of task schedules for each iteration")

class Risk(BaseModel):
    """Risk of a task"""
    task: Task = Field(description="Task")
    score: str = Field(description="Risk associated with the task")

class RiskList(BaseModel):
    """List of risks for each iteration"""
    risks: List[Risk] = Field(description="List of risks")

class RiskListIteration(BaseModel):
    """List of risks for each iteration"""
    risks_iteration: List[RiskList] = Field(description="List of risks for each iteration")

class AgentState(TypedDict):
    """The project manager agent state."""
    project_description: str
    team: Team
    tasks: TaskList
    dependencies: DependencyList
    schedule: ScheduleList
    task_allocations: TaskAllocationList
    risks: RiskList
    iteration_number: int
    max_iteration: int
    insights: List[str]
    schedule_iteration: List[ScheduleList]
    task_allocations_iteration: List[TaskAllocationList]
    risks_iteration: List[RiskListIteration]
    project_risk_score_iterations: List[int]

In [None]:
def get_default_dependencies(task):
    """Generate default dependencies for tasks"""
    dependencies = []
    for i in range(len(task.tasks)):
        if i == 0:
            dependencies.append(TaskDependency(task=task.tasks[i], dependent_tasks=[]))
        else:
            dependencies.append(TaskDependency(task=task.tasks[i], dependent_tasks=[task.tasks[i-1]]))
    return DependencyList(dependencies=dependencies)


def get_default_schedule(tasks, dependencies):
    """Generate default schedule for tasks"""
    schedule = []
    start_day = 1
    for task in tasks.tasks:
        end_day = start_day + task.estimated_day
        schedule.append(TaskSchedule(task=task, start_day=start_day, end_day=end_day))
        start_day = end_day+1
    return ScheduleList(schedule=schedule)


def get_default_task_allocations(tasks, schedule, team):
    """Generate default task allocations for tasks"""
    task_allocations = []
    for i, task in enumerate(tasks.tasks):
        team_member = team.team_members[i % len(team.team_members)]
        task_allocations.append(TaskAllocation(task=task, team_member=team_member))
    return TaskAllocationList(task_allocations=task_allocations)

def get_default_risks(task_allocations, schedule):
    """Generate default risks for tasks"""
    risks = []
    for i, task_allocation in enumerate(task_allocations.task_allocations):
        risk_score = 1  # Default risk score
        risks.append(Risk(task=task_allocation.task, score=risk_score))
    return RiskList(risks=risks)

In [None]:
def task_generation_node(state: AgentState):
    """LangGraph node that will extract tasks from given project description"""
    description = state["project_description"]
    prompt = f"""
        You are an expert project manager tasked with analyzing the following project description: {description}
        Your objectives are to: 
        1. **Extract Actionable Tasks:**
            - Identify and list all actionable and realistic tasks necessary to complete the project.
            - Provide an estimated number of days required to complete each task.
        2. **Refine Long-Term Tasks:**
            - For any task estimated to take longer than 5 days, break it down into smaller, independent sub-tasks.
        **Requirements:** - Ensure each task is clearly defined and achievable.
            - Maintain logical sequencing of tasks to facilitate smooth project execution."""

    structure_llm = llm.with_structured_output(TaskList)
    try:
        tasks: TaskList = structure_llm.invoke(prompt)
    except Exception as e:
        # If the LLM fails to generate tasks, break the loop
        logger.error(f"API 调用失败: {str(e)}")
        logger.error(f"错误详情: {str(e)}")
        raise e
    return {"tasks": tasks}

def task_dependency_node(state: AgentState):
    """Evaluate the dependencies between the tasks"""
    tasks = state["tasks"]
    prompt = f"""
        You are a skilled project scheduler responsible for mapping out task dependencies.
        Given the following list of tasks: {tasks}
        Your objectives are to:
            1. **Identify Dependencies:**
                - For each task, determine which other tasks must be completed before it can begin (blocking tasks).
            2. **Map Dependent Tasks:** 
                - For every task, list all tasks that depend on its completion.
            3. **you should strictly only output the dict("dependencies": dependencies_list) dependencies_list: list of TaskDependency each item is a dict contains tasks (follow the inputs tasks format) and their dependent_tasks list (a list of task， do not contain other symbol) respectively, and do not output other information.**
            4. ** Do not output the code, you need output the results**
        """
    structure_llm = llm.with_structured_output(DependencyList)
    try:
        dependencies: DependencyList = structure_llm.invoke(prompt)
    except Exception as e:
        dependencies = get_default_dependencies(tasks)
    if dependencies is None:
        dependencies = get_default_dependencies(tasks)

    return {"dependencies": dependencies}


# TODO: schdule node do not return the correct result
def task_scheduler_node(state: AgentState):
    """LangGraph node that will schedule tasks based on dependencies and team availability"""
    dependencies = state["dependencies"]
    tasks = state["tasks"]
    insights = state["insights"] #"" if state["insights"] is None else state["insights"].insights[-1]
    prompt = f"""
        You are an experienced project scheduler tasked with creating an optimized project timeline.
        **Given the Structured Information:**
            - Tasks: {tasks.tasks}
            - Dependencies: {dependencies.dependencies}
            - Previous Insights: {insights}
            - Previous Schedule Iterations (if any): {state["schedule_iteration"]}
        **Your objectives are to:**
            you should strictly only output the schedule: list of TaskSchedule that each item is a dict that contains task , start_day (int) and end_day(int) respectively, and do not output other information.
        ** Do not output the code, you need output the results**
        """
      
    try:
        schedule = []
        output = llm.invoke(prompt).content
        output = output.replace("python\n", "").replace("[", "").replace("]", "").replace("{", "").replace("}", "").replace("\n", "").replace(" ", "").replace("'", "").replace("```", "")
        output = output.split(",")
        for i in range(0, len(output), 3):
            schedule.append(TaskSchedule(task=output[i].replace("task:", ""), start_day=int(output[i+1][10:]), end_day=int(output[i+2][8:])))
    except Exception as e:
        schedule = get_default_schedule(tasks, dependencies)
    
    state["schedule"] = schedule
    state["schedule_iteration"].append(schedule)
    return state

def task_allocation_node(state: AgentState):
    """LangGraph node that will allocate tasks to team members"""
    tasks = state["tasks"]
    schedule = state["schedule"]
    team = state["team"]
    insights = state["insights"] #"" if state["insights"] is None else state["insights"].insights[-1]
    prompt = f"""
        You are a proficient project manager responsible for allocating tasks to team members efficiently.
        **Given:** 
            - **Tasks:** {tasks} 
            - **Schedule:** {schedule} 
            - **Team Members:** {team} 
            - **Previous Insights:** {insights} 
            - **Previous Task Allocations (if any):** {state["task_allocations_iteration"]} 
        **Your objectives are to:** 
            1. **Allocate Tasks:** 
                - Assign each task to a team member based on their expertise and current availability. 
            2. **Optimize Assignments:** 
                - Utilize insights from previous iterations to improve task allocations. 
            3. **you should strictly only output the task_allocations: list of TaskAllocation that each item is a dict that contains task and  team_member (dict contain name and profile), and do not output other information.**
                
        """
    structure_llm = llm.with_structured_output(TaskAllocationList)
    # vanila_output = llm.invoke(prompt).content
    task_allocations: TaskAllocationList = structure_llm.invoke(prompt)
    if task_allocations is None:
        task_allocations = get_default_task_allocations(tasks, schedule, team)
    state["task_allocations"] = task_allocations
    state["task_allocations_iteration"].append(task_allocations)
    return state

def risk_assessment_node(state: AgentState):
    """LangGraph node that analyse risk associated with schedule and allocation of task"""
    schedule = state["schedule"]
    task_allocations=state["task_allocations"]
    prompt = f"""
        You are a seasoned project risk analyst tasked with evaluating the risks associated with the current project plan.
        **Given:**
            - **Task Allocations:** {task_allocations}
            - **Schedule:** {schedule}
            - **Previous Risk Assessments (if any):** {state['risks_iteration']}
        **Your objectives are to:**
            1. **Assess Risks:**
                - Analyze each allocated task and its scheduled timeline to identify potential risks.
            2. **Assign Risk Scores:**
                - Assign a risk score to each task on a scale from 0 (no risk) to 10 (high risk).
            3. **you should strictly only output the risks: list of Risk that each item is a dict that contains task and score, and do not output other information.**
        """
   
    structure_llm = llm.with_structured_output(RiskList)
    try:
        risks: RiskList = structure_llm.invoke(prompt)
    except Exception as e:
        risks = get_default_risks(task_allocations, schedule)
    if risks is None:
        risks = get_default_risks(task_allocations, schedule)
    project_task_risk_scores = [int(risk.score) for risk in risks.risks]
    project_risk_score = sum(project_task_risk_scores)
    state["risks"] = risks
    state["project_risk_score"] = project_risk_score
    state["iteration_number"] += 1
    state["project_risk_score_iterations"].append(project_risk_score)
    state["risks_iteration"].append(risks)
    return state

def insight_generation_node(state: AgentState):
    """LangGraph node that generate insights from the schedule, task allocation, and risk associated"""
    schedule = state["schedule"]
    task_allocations=state["task_allocations"]
    risks = state["risks"]
    prompt = f"""
        You are an expert project manager responsible for generating actionable insights to enhance the project plan.
        **Given:**
            - **Task Allocations:** {task_allocations}
            - **Schedule:** {schedule}
            - **Risk Analysis:** {risks}
        **Your objectives are to:**
            1. **Generate Critical Insights:**
            - Analyze the current task allocations, schedule, and risk assessments to identify areas for improvement.
            - Highlight any potential bottlenecks, resource conflicts, or high-risk tasks that may jeopardize project success.
            2. **Recommend Enhancements:**
            - Suggest adjustments to task assignments or scheduling to mitigate identified risks.
            3. **Origanize the result based on **{insights}:**
                - List of insights, keep the result short and to the point
            
        """
    try:
        insights = llm.invoke(prompt).content
    except Exception as e:
        # If the LLM fails to generate insights, break the loop
        logger.error(f"API 调用失败: {str(e)}")
        logger.error(f"错误详情: {str(e)}")
        raise e
    return {"insights": insights}

def router(state: AgentState):
    """LangGraph node that will route the agent to the appropriate node based on the project description"""
    max_iteration = state["max_iteration"]
    iteration_number = state["iteration_number"]

    if iteration_number < max_iteration:
        if len(state["project_risk_score_iterations"])>1:
            if state["project_risk_score_iterations"][-1] <= state["project_risk_score_iterations"][0]:
                return END
            else:
                return "insight_generator"
        else:
            return "insight_generator"
    else:
        return END

In [None]:
def get_project_description(file_path:str):
    """Read the project description from the file"""
    with open(file_path, 'r') as file:
        content = file.read()

    return content

def get_team(file_path:str):
    """Read the team members from the CSV file"""
    team_df = pd.read_csv(file_path)
    team_members = [
            TeamMember(name=row['Name'], profile=row['Profile Description'])
            for _, row in team_df.iterrows()
        ]
    team = Team(team_members=team_members)

    return team


def visalize_project_timeline(final_state: AgentState):
    number_of_iterations = final_state['iteration_number']

    for i in range(number_of_iterations):
        ## Tasks schedule
        task_schedules = final_state['schedule_iteration'][i].schedule

        t = []
        # Iterate over the task_schedules and append each task's data to the DataFrame
        for task_schedule in task_schedules:
            t.append([
                task_schedule.task.task_name,
                task_schedule.start_day,
                task_schedule.end_day
            ])

        df_schedule = pd.DataFrame(t,columns=['task_name', 'start', 'end'])

        ## Tasks allocation
        task_allocations = final_state['task_allocations_iteration'][i].task_allocations

        t = []
        # Iterate over the task_schedules and append each task's data to the DataFrame
        for task_allocation in task_allocations:
            t.append([
                task_allocation.task.task_name,
                task_allocation.team_member.name
            ])

        df_allocation = pd.DataFrame(t,columns=['task_name', 'team_member'])

        df = df_allocation.merge(df_schedule, on='task_name')

        
        # Get the current date
        current_date = datetime.today()

        # Convert start and end offsets to actual dates
        df['start'] = df['start'].apply(lambda x: current_date + timedelta(days=x))
        df['end'] = df['end'].apply(lambda x: current_date + timedelta(days=x))

        df.rename(columns={'team_member': 'Team Member'}, inplace=True)
        df.sort_values(by='Team Member', inplace=True)
        # Create a Gantt chart
        fig = px.timeline(df, x_start="start", x_end="end", y="task_name", color="Team Member", title=f"Gantt Chart - Iteration:{i+1} ")

        # Update layout for better visualization
        fig.update_layout(
            xaxis_title="Timeline",
            yaxis_title="Tasks",
            yaxis=dict(autorange="reversed"),  # Reverse the y-axis to have tasks in the vertical side
            title_x=0.5
        )
        fig.write_image(f'logs/gantt_chart_iteration_{i+1}.png')
        # Show the plot
        # fig.show()

# Define the new workflow
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("task_generation", task_generation_node)
workflow.add_node("task_dependencies", task_dependency_node)
workflow.add_node("task_scheduler", task_scheduler_node)
workflow.add_node("task_allocator", task_allocation_node)
workflow.add_node("risk_assessor", risk_assessment_node)
workflow.add_node("insight_generator", insight_generation_node)

# Define the workflow
workflow.set_entry_point("task_generation")
workflow.add_edge("task_generation", "task_dependencies")
workflow.add_edge("task_dependencies", "task_scheduler")
workflow.add_edge("task_scheduler", "task_allocator")
workflow.add_edge("task_allocator", "risk_assessor")
workflow.add_conditional_edges("risk_assessor", router, ["insight_generator", END])
workflow.add_edge("insight_generator", "task_scheduler")


# Set up memory
memory = MemorySaver()

# Compile the workflow
logger.info("Compiling the workflow")
graph_plan = workflow.compile(checkpointer=memory)

In [None]:
project_description = get_project_description("src/data/task_data/project_description.txt")
team = get_team("src/data/task_data/team.csv")
logger.info("Project description and team members read successfully")
logger.info("Project description: {}".format(project_description))
logger.info("Team members: {}".format(team))


# # Define the initial state
# TODO: support args input
state_input = {
    "project_description": project_description,
    "team": team,
    "insights": "",
    "iteration_number": 0,
    "max_iteration": 1,
    "schedule_iteration": [],
    "task_allocations_iteration": [],
    "risks_iteration": [],
    "project_risk_score_iterations": []
}


logger.info("Starting the workflow")
config = {"configurable": {"thread_id": "1"}}
for event in graph_plan.stream(state_input, config, stream_mode=["updates"]):
    "Print the different nodes as the agent progresses"
    print(f"Current node: {next(iter(event[1]))}")

final_state = graph_plan.get_state(config).values
print("iteration_number", final_state['iteration_number'])
print("project_risk_score_iterations", final_state['project_risk_score_iterations'])
print("insights", final_state['insights'])
visalize_project_timeline(final_state)
