### Utils

In [1]:
import base64
import os
import json
from datetime import datetime
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage
import requests
import re
import json
from typing import Dict, Any

def load_markdown_to_str(file_path):
	with open(file_path, 'r', encoding='utf-8') as md_file:
		markdown_content = md_file.read()
	return markdown_content

def load_latest_sprint_status(base_path):
    """
    Find the latest sprint directory and load the project-sprint-status.md file.
    
    Args:
    base_path (str): Path to the directory containing sprint folders.
    
    Returns:
    str: Content of the project-sprint-status.md file from the latest sprint.
    """
    try:
        # List all directories in the base path
        directories = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]
        
        # Filter and sort sprint directories
        sprint_dirs = sorted([d for d in directories if d.startswith('sprint') and d[6:].isdigit()],
                             key=lambda x: int(x[6:]),
                             reverse=True)
        
        if not sprint_dirs:
            return "No sprint directories found."
        
        # Get the latest sprint directory
        latest_sprint = sprint_dirs[0]
        sprint_path = os.path.join(base_path, latest_sprint)
        
        # Look for project-sprint-status.md in the latest sprint directory
        status_file = os.path.join(sprint_path, 'project-sprint-status.md')
        
        if os.path.exists(status_file):
            with open(status_file, 'r', encoding='utf-8') as f:
                return f.read()
        else:
            return f"project-sprint-status.md not found in {latest_sprint}."
    
    except Exception as e:
        return f"An error occurred: {str(e)}"
    
    
def load_latest_sprint_backlog(base_path):
    """
    Find the latest sprint directory and load the project-sprint-backlog.json file.
    
    Args:
    base_path (str): Path to the directory containing sprint folders.
    
    Returns:
    dict: Content of the project-sprint-backlog.json file from the latest sprint.
    """
    try:
        # List all directories in the base path
        directories = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]
        
        # Filter and sort sprint directories
        sprint_dirs = sorted([d for d in directories if d.startswith('sprint') and d[6:].isdigit()],
                             key=lambda x: int(x[6:]),
                             reverse=True)
        
        if not sprint_dirs:
            return {"error": "No sprint directories found."}
        
        # Get the latest sprint directory
        latest_sprint = sprint_dirs[0]
        sprint_path = os.path.join(base_path, latest_sprint)
        
        # Look for project-sprint-backlog.json in the latest sprint directory
        backlog_file = os.path.join(sprint_path, 'project-sprint-backlog.json')
        
        if os.path.exists(backlog_file):
            with open(backlog_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        else:
            return {"error": f"project-sprint-backlog.json not found in {latest_sprint}."}
    
    except Exception as e:
        return {"error": f"An error occurred: {str(e)}"}
    
def export_transcript(state, folder_path):
    # Create the folder if it doesn't exist
    files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    id = len(files) + 1
    transcript = state["transcript"]
    
    filename = state["meeting_type"].replace(" ","_") + str(id) + ".txt"
    
    os.makedirs(os.path.join(folder_path), exist_ok=True)
    
    
    # Construct the full file path
    file_path = os.path.join(folder_path, filename)
    
    # Write the string to a text file
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(transcript)

def export_state(state, folder_path, filename):
    # Create the folder if it doesn't exist
    files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    id = len(files) + 1
    
    filename = filename+str(id)+".json"
    
    os.makedirs(folder_path, exist_ok=True)
    
    # Construct the full file path
    file_path = os.path.join(folder_path, filename)
    
    # Write the state dict to a JSON file
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(state, f, indent=4)

def load_txt_to_str(file_path):
    with open(file_path, 'r', encoding='utf-8') as txt_file:
        text_content = txt_file.read()
    return text_content

def load_from_json(file_path):
    """
    Load data from a JSON file.
    
    Args:
    file_path (str): Path to the JSON file.
    
    Returns:
    dict: A dictionary containing the loaded JSON data.
    """
    try:
        with open(file_path, 'r') as file:
            return json.load(file)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return {}
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON format in file {file_path}")
        return {}
    
def render_mermaid_diagram(diagram_code: str) -> str:
    # Encode the Mermaid code
    encoded_diagram = base64.b64encode(diagram_code.encode('utf-8')).decode('utf-8')
    
    # Make a request to the Mermaid rendering service
    url = f"https://mermaid.ink/img/{encoded_diagram}"
    response = requests.get(url)
    
    if response.status_code == 200:
        # Return the URL of the rendered image
        return url
    else:
        # If rendering failed, return the original Mermaid code
        return f"```mermaid\n{diagram_code}\n```"
    
def format_mermaid(input_string):
    # Step 1: Remove redundant "```mermaid" at the start and end
    cleaned_string = input_string.replace('```mermaid', '').replace('```', '')
    
    # Step 2: Replace escaped newlines with actual newlines
    formatted_string = cleaned_string.replace(r'\n', '\n')
    
    # Step 3: Strip any leading/trailing whitespace
    formatted_string = formatted_string.strip()

    return formatted_string	


def export_meeting_history(state, output_file='meeting_history.json'):
    """
    Export the meeting history to a JSON file.
    
    Args:
    state (dict): The state dictionary containing the meeting history.
    output_file (str): The name of the output file. Defaults to 'meeting_history.json'.
    
    Returns:
    None
    """
    meeting_history = state.get("meeting_history", [])
    
    # Ensure meeting_history is a list
    if not isinstance(meeting_history, list):
        meeting_history = [meeting_history]
    
    try:
        with open(output_file, 'w') as file:
            json.dump(meeting_history, file, indent=2)
        print(f"Meeting history exported successfully to {output_file}")
    except IOError:
        print(f"Error: Unable to write to file {output_file}")

    return state  # Return the state to maintain consistency with your workflow


def manage_sprint_folders(state, project_folder):
    """
    Manages sprint folders based on the meeting type.
    Creates a new sprint folder if necessary and adds required files.
    
    :param state: The current state dictionary
    :param project_folder: Path to the project folder
    :return: Updated state with new sprint information
    """
    if "planning" in state.get("meeting_type", "").lower() or "plan" in state.get("meeting_type", "").lower():
        # List all directories in the project folder
        directories = [d for d in os.listdir(project_folder) if os.path.isdir(os.path.join(project_folder, d))]
        
        # Filter and find the highest sprint number
        sprint_numbers = [int(re.findall(r'\d+', d)[0]) for d in directories if d.startswith("sprint") and re.findall(r'\d+', d)]
        
        if sprint_numbers:
            new_sprint_number = max(sprint_numbers) + 1
        else:
            new_sprint_number = 1
        
        # Create new sprint folder
        new_sprint_folder = os.path.join(project_folder, f"sprint{new_sprint_number}")
        os.makedirs(new_sprint_folder, exist_ok=True)
        
        # Create project_sprint_status.md
        status_file_path = os.path.join(new_sprint_folder, "project_sprint_status.md")
        with open(status_file_path, 'w') as status_file:
            status_file.write(f"# Sprint {new_sprint_number} Status\n\nStatus details will be updated here.")
        
        # Create project_sprint_backlog.json
        backlog_file_path = os.path.join(new_sprint_folder, "project_sprint_backlog.json")
        initial_backlog = {}
        with open(backlog_file_path, 'w') as backlog_file:
            json.dump(initial_backlog, backlog_file, indent=2)
        
        # Update state with new sprint information
        state["current_sprint_number"] = new_sprint_number
        state["current_sprint_folder"] = new_sprint_folder
        state["sprint_status_file"] = status_file_path
        state["sprint_backlog_file"] = backlog_file_path
        
        print(f"Created new sprint folder: {new_sprint_folder}")
    else:
        print("Meeting type does not indicate a planning session. No new sprint folder created.")
    
    return state


def load_json(file_path: str) -> Dict[str, Any]:
    """
    Reads a JSON file and returns its contents as a dictionary.

    :param file_path: The path to the JSON file to be read
    :return: A dictionary containing the data from the JSON file
    :raises FileNotFoundError: If the specified file does not exist
    :raises json.JSONDecodeError: If the file is not valid JSON
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
        print(f"Successfully loaded JSON from {file_path}")
        return data
    except FileNotFoundError:
        print(f"Error: The file {file_path} was not found.")
        raise
    except json.JSONDecodeError as e:
        print(f"Error: The file {file_path} is not valid JSON. Error: {str(e)}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred while reading {file_path}: {str(e)}")
        raise


def get_latest_sprint_folder(project_folder: str) -> str:
    """
    Scans the project folder for sprint folders and returns the name of the latest sprint folder.

    :param project_folder: Path to the project folder (e.g., 'project1/')
    :return: Name of the latest sprint folder (e.g., 'sprint5'), or None if no sprint folders are found
    """
    # List all items in the project folder
    items = os.listdir(project_folder)

    # Filter for sprint folders and extract their numbers
    sprint_folders = []
    for item in items:
        if os.path.isdir(os.path.join(project_folder, item)):
            match = re.match(r'sprint(\d+)', item, re.IGNORECASE)
            if match:
                sprint_number = int(match.group(1))
                sprint_folders.append((item, sprint_number))

    # Sort sprint folders by number (descending) and return the latest
    if sprint_folders:
        latest_sprint = max(sprint_folders, key=lambda x: x[1])
        print(f"Latest sprint folder found: {latest_sprint[0]}")
        return latest_sprint[0]
    else:
        print("No sprint folders found.")
        return None


def export_markdown(content, file_path):
    """
    Export content to a Markdown file.
    
    Args:
    content (str): The content to be written to the file.
    file_path (str): The path where the file should be saved.
    """
    try:
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)
        print(f"Successfully exported Markdown to {file_path}")
    except Exception as e:
        print(f"Error exporting Markdown: {str(e)}")

def export_json(content, file_path):
    """
    Export content to a JSON file.
    
    Args:
    content (dict): The content to be written to the file.
    file_path (str): The path where the file should be saved.
    """
    try:
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(content, f, indent=2)
        print(f"Successfully exported JSON to {file_path}")
    except Exception as e:
        print(f"Error exporting JSON: {str(e)}")
        
def load_latest_json(folder_path: str) -> Dict[str, Any]:
    """
    Reads the latest JSON file from a given folder and returns its contents as a dictionary.

    :param folder_path: The path to the folder containing JSON files
    :return: A dictionary containing the data from the latest JSON file
    :raises FileNotFoundError: If no JSON files are found in the specified folder
    :raises json.JSONDecodeError: If the latest file is not valid JSON
    """
    try:
        # Get all JSON files in the folder
        json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]
        
        if not json_files:
            raise FileNotFoundError(f"No JSON files found in {folder_path}")
        
        # Find the latest JSON file
        latest_file = max(json_files, key=lambda f: os.path.getmtime(os.path.join(folder_path, f)))
        latest_file_path = os.path.join(folder_path, latest_file)
        
        # Read and parse the latest JSON file
        with open(latest_file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
        
        print(f"Successfully loaded latest JSON from {latest_file_path}")
        return data
    
    except FileNotFoundError:
        print(f"Error: No JSON files found in {folder_path}")
        raise
    except json.JSONDecodeError as e:
        print(f"Error: The file {latest_file_path} is not valid JSON. Error: {str(e)}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred while reading from {folder_path}: {str(e)}")
        raise
    
def load_second_latest_json(folder_path: str) -> Dict[str, Any]:
    """
    Reads the second latest JSON file from a given folder and returns its contents as a dictionary.

    :param folder_path: The path to the folder containing JSON files
    :return: A dictionary containing the data from the second latest JSON file
    :raises FileNotFoundError: If fewer than two JSON files are found in the specified folder
    :raises json.JSONDecodeError: If the second latest file is not valid JSON
    """
    try:
        # Get all JSON files in the folder
        json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]
        
        if len(json_files) < 2:
            raise FileNotFoundError(f"Fewer than two JSON files found in {folder_path}")
        
        # Sort JSON files by modification time, newest first
        sorted_files = sorted(json_files, key=lambda f: os.path.getmtime(os.path.join(folder_path, f)), reverse=True)
        
        # Get the second latest file
        second_latest_file = sorted_files[1]
        second_latest_file_path = os.path.join(folder_path, second_latest_file)
        
        # Read and parse the second latest JSON file
        with open(second_latest_file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
        
        print(f"Successfully loaded second latest JSON from {second_latest_file_path}")
        return data
    
    except FileNotFoundError:
        print(f"Error: Fewer than two JSON files found in {folder_path}")
        raise
    except json.JSONDecodeError as e:
        print(f"Error: The file {second_latest_file_path} is not valid JSON. Error: {str(e)}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred while reading from {folder_path}: {str(e)}")
        raise 
def load_employee_profiles(file_path):
    """
    Load employee profiles from a JSON file.
    
    Args:
    file_path (str): Path to the JSON file containing employee profiles.
    
    Returns:
    dict: A dictionary of employee profiles.
    """
    try:
        with open(file_path, 'r') as file:
            return json.load(file)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return {}
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON format in file {file_path}")
        return {}

### Prompts

In [6]:
BACKLOG_ITEM_SELECTOR_PROMPT = """
You are an AI assistant helping with Sprint Planning.
Your task is to select appropriate backlog items for the upcoming sprint and generate new items based on the sprint planning note.

Sprint Planning Note:
####
{sprint_planning_note}
####

Current Backlog:
####
{current_backlog}
####

Please perform the following tasks:
1. Analyze the sprint planning note and the current backlog.
2. Select appropriate items from the current backlog for the upcoming sprint.
3. Generate new user stories, epics, or tasks based on the discussion in the sprint planning note.
4. Prioritize the selected and newly generated items.
5. Provide a brief justification for each selected or generated item.

Output the results in the following JSON format:
{{
  "selected_items": [
    {{
      "id": "item_id",
      "type": "epic/story/task",
      "title": "Item title",
      "description": "Item description",
      "priority": "high/medium/low",
      "justification": "Reason for selection"
    }}
  ],
  "new_items": [
    {{
      "type": "epic/story/task",
      "title": "New item title",
      "description": "New item description",
      "priority": "high/medium/low",
      "justification": "Reason for generation"
    }}
  ]
}}

Ensure that the selected items align with the team's capacity and the sprint's goals.
"""

CAPACITY_CALCULATOR_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to calculate the team's capacity for the upcoming sprint.

Sprint Details:
- Sprint Duration: 14 days
- Working Days: 10 days

Team Member Profiles:
####
{employee_profiles}
####

Historical Team Velocity:
- Average velocity: 40 story points per sprint

Please calculate the team's capacity for this sprint:
1. Consider each team member's availability (accounting for time off, other commitments).
2. Use the historical team velocity as a baseline.
3. Adjust the capacity based on any factors mentioned in the employee profiles or sprint details.

Provide the results in the following JSON format:
{{
  "team_capacity": {{
    "total_story_points": number,
    "total_available_hours": number
  }},
  "individual_capacity": {{
    "employee_name": {{
      "available_days": number,
      "estimated_story_points": number,
      "estimated_hours": number
    }}
  }},
  "explanation": "Brief explanation of the capacity calculation and any adjustments made"
}}
"""

TASK_BREAKDOWN_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to break down the selected backlog items into smaller, manageable tasks and provide effort estimates.

Selected Backlog Items:
####
{selected_backlog_items}
####

Team Capacity:
####
{team_capacity}
####

Please perform the following tasks:
1. Break down each backlog item into smaller, specific tasks.
2. Provide an effort estimate for each task in story points or hours.
3. Ensure the total effort aligns with the team's capacity.

Output the results in the following JSON format:
{{
  "tasks": [
    {{
      "parent_item_id": "id of the parent backlog item",
      "task_id": "unique task id",
      "title": "Task title",
      "description": "Task description",
      "estimate": {{
        "unit": "story_points/hours",
        "value": number
      }},
      "dependencies": ["task_id of dependent tasks, if any"]
    }}
  ],
  "total_effort": {{
    "story_points": number,
    "hours": number
  }}
}}

Ensure that the tasks are specific, measurable, and aligned with the Definition of Done.
"""
TASK_ASSIGNER_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to suggest initial task assignments for team members based on their skills and capacity.

Team Member Profiles:
####
{employee_profiles}
####

Tasks:
####
{tasks}
####

Individual Capacity:
####
{individual_capacity}
####

Please perform the following tasks:
1. Analyze each team member's skills and capacity.
2. Suggest task assignments that best match each team member's abilities and available capacity.
3. Ensure a balanced workload across the team.

Output the results in the following JSON format:
{{
  "assignments": {{
    "employee_name": [
      {{
        "task_id": "assigned task id",
        "rationale": "Brief explanation for this assignment"
      }}
    ]
  }},
  "unassigned_tasks": [
    {{
      "task_id": "unassigned task id",
      "reason": "Reason for not assigning"
    }}
  ]
}}

Provide a brief explanation for any tasks left unassigned or any potential overallocation.
"""

SPRINT_VISUALIZER_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to create Gantt chart representations of the sprint plan.

Sprint Details:
- Start Date: {sprint_start_date}
- End Date: {sprint_end_date}

Tasks and Assignments:
####
{tasks_and_assignments}
####

Please perform the following tasks:
1. Create a Mermaid Gantt chart representation for the entire sprint, showing all tasks, their durations, and dependencies.
2. Create individual Mermaid Gantt charts for each team member, showing their assigned tasks.

Output the results in the following format:
{{
  "sprint_gantt_chart": "Mermaid Gantt chart code for the entire sprint",
  "individual_gantt_charts": {{
    "employee_name": "Mermaid Gantt chart code for the individual"
  }}
}}

Ensure that the Gantt charts clearly visualize the sprint timeline, task dependencies, and workload distribution.
"""

GLOBAL_BACKLOG_UPDATER_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to update the global backlog with the results of the sprint planning session.

Current Global Backlog:
####
{current_backlog}
####

Sprint Planning Results:
####
{sprint_planning_results}
####

Please perform the following tasks:
1. Integrate the newly generated items into the global backlog.
2. Update the status of items selected for the sprint.
3. Update effort estimates and priorities based on the sprint planning discussion.
4. Ensure the backlog remains well-organized and prioritized.

Output the updated global backlog in a structured format that can be easily parsed and modified in future sprints.

Provide a brief summary of the changes made to the global backlog.
"""

PERSONAL_REPORT_GENERATOR_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to generate personalized sprint planning reports for each team member.

Team Member: {employee_name}

Employee Profile:
####
{employee_profile}
####

Assigned Tasks:
####
{assigned_tasks}
####

Sprint Gantt Chart:
####
{sprint_gantt_chart}
####

Please create a personalized sprint planning report for this team member. The report should include:
1. A brief overview of the sprint goals and objectives.
2. A list of tasks assigned to the team member, including descriptions and effort estimates.
3. Any specific challenges or areas of focus for the team member this sprint.
4. The team member's personal Gantt chart.
5. Any relevant dependencies or collaborations with other team members.
6. A motivational message encouraging the team member for the upcoming sprint.

Format the report in Markdown, making it easy to read and understand.
"""

SPRINT_VISUALIZER_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to create Gantt chart representations of the sprint plan using Mermaid syntax.

Sprint Details:
- Start Date: {sprint_start_date}
- End Date: {sprint_end_date}

Tasks and Assignments:
{tasks_and_assignments}

Please perform the following tasks:
1. Create a Mermaid Gantt chart representation for the entire sprint, showing all tasks, their durations, and dependencies.
2. Create individual Mermaid Gantt charts for each team member, showing their assigned tasks.

Output the results in the following format:
{{
  "sprint_gantt_chart": "Mermaid Gantt chart code for the entire sprint",
  "individual_gantt_charts": {{
    "employee_name": "Mermaid Gantt chart code for the individual"
  }}
}}

Ensure that the Gantt charts clearly visualize the sprint timeline, task dependencies, and workload distribution.
"""

PERSONAL_REPORT_GENERATOR_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to generate a personalized sprint planning report for a team member.

Team Member: {employee_name}

Employee Profile:
{employee_profile}

Assigned Tasks:
{assigned_tasks}

Personal Gantt Chart:
{sprint_gantt_chart}

Please create a personalized sprint planning report for this team member. The report should include:
1. A brief overview of the sprint goals and objectives.
2. A list of tasks assigned to the team member, including descriptions and effort estimates.
3. Any specific challenges or areas of focus for the team member this sprint.
4. The team member's personal Gantt chart.
5. Any relevant dependencies or collaborations with other team members.
6. A motivational message encouraging the team member for the upcoming sprint.

Format the report in Markdown, making it easy to read and understand.
"""

SPRINT_SUMMARY_GENERATOR_PROMPT = """
You are an AI assistant helping with Sprint Planning. Your task is to generate a comprehensive sprint summary based on the planning session outcomes.

Sprint Planning Note:
{sprint_planning_note}

Selected Backlog Items:
{selected_backlog_items}

Team Capacity:
{team_capacity}

Tasks:
{tasks}

Task Assignments:
{task_assignments}

Sprint Gantt Chart:
{sprint_gantt_chart}

Please generate a comprehensive sprint summary that includes:
1. Sprint goals and objectives
2. Selected backlog items and their priorities
3. Team capacity and workload distribution
4. Key tasks and their assignments
5. Sprint timeline and important milestones
6. Potential risks or challenges identified during planning
7. Any important decisions made during the planning session
8. Next steps or action items for the team

Format the summary in Markdown, ensuring it's well-structured and easy to read. This summary will be used as a reference throughout the sprint and in sprint review meetings.
"""

In [13]:
import re
from typing import List, Dict, Any, Tuple

def parse_backlog_items(content: str) -> List[Dict[str, Any]]:
    items = []
    pattern = re.compile(r'BACKLOG ITEM\n(.*?)(?=\nBACKLOG ITEM|\Z)', re.DOTALL)
    for match in pattern.finditer(content):
        item_content = match.group(1)
        item = {}
        for line in item_content.split('\n'):
            if ':' in line:
                key, value = line.split(':', 1)
                item[key.strip().lower()] = value.strip()
        items.append(item)
    return items

def parse_capacity(content: str) -> Tuple[Dict[str, int], Dict[str, Dict[str, int]]]:
    team_capacity = {}
    individual_capacity = {}
    
    team_pattern = re.compile(r'TEAM CAPACITY\n(.*?)(?=\nINDIVIDUAL CAPACITY|\Z)', re.DOTALL)
    individual_pattern = re.compile(r'INDIVIDUAL CAPACITY\n(.*)', re.DOTALL)
    
    team_match = team_pattern.search(content)
    if team_match:
        for line in team_match.group(1).split('\n'):
            if ':' in line:
                key, value = line.split(':', 1)
                team_capacity[key.strip()] = int(value.strip())
    
    individual_match = individual_pattern.search(content)
    if individual_match:
        for line in individual_match.group(1).split('\n'):
            if ':' in line:
                name, capacity = line.split(':', 1)
                individual_capacity[name.strip()] = {}
                for item in capacity.split(','):
                    if '=' in item:
                        key, value = item.split('=')
                        individual_capacity[name.strip()][key.strip()] = int(value.strip())
    
    return team_capacity, individual_capacity

def parse_tasks(content: str) -> List[Dict[str, Any]]:
    tasks = []
    pattern = re.compile(r'TASK\n(.*?)(?=\nTASK|\Z)', re.DOTALL)
    for match in pattern.finditer(content):
        task_content = match.group(1)
        task = {}
        for line in task_content.split('\n'):
            if ':' in line:
                key, value = line.split(':', 1)
                task[key.strip().lower()] = value.strip()
        if 'estimated_effort' in task:
            task['estimated_effort'] = int(task['estimated_effort'])
        tasks.append(task)
    return tasks

def parse_task_assignments(content: str) -> Dict[str, List[str]]:
    assignments = {}
    pattern = re.compile(r'(.*?):\s*(.*)')
    for line in content.split('\n'):
        match = pattern.match(line)
        if match:
            employee, tasks = match.groups()
            assignments[employee.strip()] = [task.strip() for task in tasks.split(',')]
    return assignments

def parse_gantt_charts(content: str) -> Tuple[str, Dict[str, str]]:
    sprint_chart = ""
    personal_charts = {}
    
    sprint_pattern = re.compile(r'SPRINT GANTT CHART\n```mermaid\n(.*?)```', re.DOTALL)
    personal_pattern = re.compile(r'PERSONAL GANTT CHART: (.*?)\n```mermaid\n(.*?)```', re.DOTALL)
    
    sprint_match = sprint_pattern.search(content)
    if sprint_match:
        sprint_chart = sprint_match.group(1)
    
    for match in personal_pattern.finditer(content):
        name, chart = match.groups()
        personal_charts[name.strip()] = chart
    
    return sprint_chart, personal_charts

def format_backlog_items(items: List[Dict[str, Any]]) -> str:
    formatted = ""
    for item in items:
        formatted += "BACKLOG ITEM\n"
        for key, value in item.items():
            formatted += f"{key.capitalize()}: {value}\n"
        formatted += "---\n"
    return formatted

def format_team_capacity(capacity: Dict[str, int]) -> str:
    return "TEAM CAPACITY\n" + "\n".join(f"{key}: {value}" for key, value in capacity.items())

def format_employee_profiles(profiles: Dict[str, str]) -> str:
    return "\n\n".join(f"Employee: {name}\n{profile}" for name, profile in profiles.items())

def format_tasks(tasks: List[Dict[str, Any]]) -> str:
    formatted = ""
    for task in tasks:
        formatted += "TASK\n"
        for key, value in task.items():
            formatted += f"{key.capitalize()}: {value}\n"
        formatted += "---\n"
    return formatted

def format_individual_capacity(capacity: Dict[str, Dict[str, int]]) -> str:
    formatted = "INDIVIDUAL CAPACITY\n"
    for name, cap in capacity.items():
        formatted += f"{name}: " + ", ".join(f"{key}={value}" for key, value in cap.items()) + "\n"
    return formatted

def format_tasks_and_assignments(tasks: List[Dict[str, Any]], assignments: Dict[str, List[str]]) -> str:
    formatted = "TASKS\n" + format_tasks(tasks) + "\nASSIGNMENTS\n"
    for employee, assigned_tasks in assignments.items():
        formatted += f"{employee}: {', '.join(assigned_tasks)}\n"
    return formatted

def format_assigned_tasks(tasks: List[str]) -> str:
    return "Assigned Tasks:\n" + "\n".join(f"- {task}" for task in tasks)
def format_task_assignments(assignments: Dict[str, List[str]]) -> str:
    formatted = "TASK ASSIGNMENTS\n"
    for employee, tasks in assignments.items():
        formatted += f"{employee}: {', '.join(tasks)}\n"
    return formatted

### State

In [15]:
from abc import ABC, abstractmethod
from typing import Dict, List, TypedDict

class SprintPlanningState(TypedDict):
    sprint_planning_note: str
    current_backlog: str
    employee_profiles: Dict[str, str]
    selected_backlog_items: List[Dict]
    team_capacity: Dict[str, int]
    tasks: List[Dict]
    task_assignments: Dict[str, List[str]]
    sprint_gantt_chart: str
    personal_gantt_charts: Dict[str, str]
    personal_reports: Dict[str, str]
    sprint_summary: str
    sprint_start_date: str
    sprint_end_date: str
    
class Graph(ABC):
    @abstractmethod
    def create_graph(self):
        pass
    
    @abstractmethod
    def run_graph(self, project_folder):
        pass

### Node functions

In [25]:
import json
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, AIMessage
from langchain_anthropic import ChatAnthropic
from  dotenv import load_dotenv
import os

load_dotenv()
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
#model = ChatAnthropic(model="claude-3-haiku-20240307", anthropic_api_key=anthropic_api_key)
model = ChatAnthropic(model="claude-3-5-sonnet-20240620", anthropic_api_key=anthropic_api_key, max_tokens= 8192)

def backlog_item_selector_node(state: SprintPlanningState) -> SprintPlanningState:
    formatted_prompt = BACKLOG_ITEM_SELECTOR_PROMPT.format(
        sprint_planning_note=state.get("sprint_planning_note"),
        current_backlog=state.get("current_backlog"),
    )
    messages = [
        SystemMessage(content=formatted_prompt),
        HumanMessage(content="Select and generate backlog items for the sprint.")
    ]
    
    response = model.invoke(messages)
    
    # Parse the response using regex or string manipulation
    selected_items = parse_backlog_items(response.content)
    print(response.content)
    print(selected_items)
    state['selected_backlog_items'] = selected_items
    return state

def capacity_calculator_node(state: SprintPlanningState) -> SprintPlanningState:
    formatted_prompt = CAPACITY_CALCULATOR_PROMPT.format(
        employee_profiles=state.get("employee_profiles"),
    )
    messages = [
        SystemMessage(content=formatted_prompt),
        HumanMessage(content="Calculate the team's capacity for this sprint.")
    ]
    
    response = model.invoke(messages)
    
    # Parse the response using regex or string manipulation
    team_capacity, individual_capacity = parse_capacity(response.content)
    state['team_capacity'] = team_capacity
    state['individual_capacity'] = individual_capacity
    return state

def task_breakdown_node(state: SprintPlanningState) -> SprintPlanningState:
    formatted_prompt = TASK_BREAKDOWN_PROMPT.format(
        selected_backlog_items=format_backlog_items(state['selected_backlog_items']),
        team_capacity=format_team_capacity(state['team_capacity'])
    )
    messages = [
        SystemMessage(content=formatted_prompt),
        HumanMessage(content="Break down the selected backlog items into tasks.")
    ]
    
    response = model.invoke(messages)
    
    # Parse the response using regex or string manipulation
    tasks = parse_tasks(response.content)
    state['tasks'] = tasks
    return state

def task_assigner_node(state: SprintPlanningState) -> SprintPlanningState:
    formatted_prompt = TASK_ASSIGNER_PROMPT.format(
        employee_profiles=format_employee_profiles(state['employee_profiles']),
        tasks=format_tasks(state['tasks']),
        individual_capacity=format_individual_capacity(state['individual_capacity'])
    )
    messages = [
        SystemMessage(content=formatted_prompt),
        HumanMessage(content="Suggest task assignments for team members.")
    ]
    
    response = model.invoke(messages)
    
    # Parse the response using regex or string manipulation
    task_assignments = parse_task_assignments(response.content)
    state['task_assignments'] = task_assignments
    return state

def sprint_visualizer_node(state: SprintPlanningState) -> SprintPlanningState:
    formatted_prompt = SPRINT_VISUALIZER_PROMPT.format(
        sprint_start_date=state['sprint_start_date'],
        sprint_end_date=state['sprint_end_date'],
        tasks_and_assignments=format_tasks_and_assignments(state['tasks'], state['task_assignments'])
    )
    messages = [
        SystemMessage(content=formatted_prompt),
        HumanMessage(content="Create Gantt chart representations of the sprint plan.")
    ]
    
    response = model.invoke(messages)
    
    # Parse the response to extract Mermaid diagrams
    sprint_gantt_chart, personal_gantt_charts = parse_gantt_charts(response.content)
    state['sprint_gantt_chart'] = sprint_gantt_chart
    state['personal_gantt_charts'] = personal_gantt_charts
    return state

def personal_report_generator_node(state: SprintPlanningState) -> SprintPlanningState:
    state['personal_reports'] = {}
    
    for employee, profile in state['employee_profiles'].items():
        formatted_prompt = PERSONAL_REPORT_GENERATOR_PROMPT.format(
            employee_name=employee,
            employee_profile=profile,
            assigned_tasks=format_assigned_tasks(state['task_assignments'].get(employee, [])),
            sprint_gantt_chart=state['personal_gantt_charts'].get(employee, '')
        )
        messages = [
            SystemMessage(content=formatted_prompt),
            HumanMessage(content=f"Generate a personal sprint planning report for {employee}.")
        ]
        
        response = model.invoke(messages)
        
        state['personal_reports'][employee] = response.content
    
    return state

def sprint_summary_generator_node(state: SprintPlanningState) -> SprintPlanningState:
    formatted_prompt = SPRINT_SUMMARY_GENERATOR_PROMPT.format(
        sprint_planning_note=state['sprint_planning_note'],
        selected_backlog_items=format_backlog_items(state['selected_backlog_items']),
        team_capacity=format_team_capacity(state['team_capacity']),
        tasks=format_tasks(state['tasks']),
        task_assignments=format_task_assignments(state['task_assignments']),
        sprint_gantt_chart=state['sprint_gantt_chart']
    )
    messages = [
        SystemMessage(content=formatted_prompt),
        HumanMessage(content="Generate a comprehensive sprint summary.")
    ]
    
    response = model.invoke(messages)
    
    state['sprint_summary'] = response.content
    return state

### Graph

In [8]:
from langgraph.graph import StateGraph, END
class SprintPlanning(Graph):
    def __init__(self):
        self.workflow = None
        
    def create_graph(self):
        workflow = StateGraph(SprintPlanningState)
        
        # Add nodes
        workflow.add_node("backlog_item_selector", backlog_item_selector_node)
        workflow.add_node("capacity_calculator", capacity_calculator_node)
        workflow.add_node("task_breakdown", task_breakdown_node)
        workflow.add_node("task_assigner", task_assigner_node)
        workflow.add_node("sprint_visualizer", sprint_visualizer_node)
        workflow.add_node("personal_report_generator", personal_report_generator_node)
        workflow.add_node("sprint_summary_generator", sprint_summary_generator_node)
        
        # Define edges
        workflow.add_edge("backlog_item_selector", "capacity_calculator")
        workflow.add_edge("capacity_calculator", "task_breakdown")
        workflow.add_edge("task_breakdown", "task_assigner")
        workflow.add_edge("task_assigner", "sprint_visualizer")
        workflow.add_edge("sprint_visualizer", "personal_report_generator")
        workflow.add_edge("personal_report_generator", "sprint_summary_generator")
        workflow.add_edge("sprint_summary_generator", END)
        
        workflow.set_entry_point("backlog_item_selector")
        
        self.workflow = workflow.compile()
    
    def run_graph(self, project_folder) -> SprintPlanningState:
        if self.workflow is None:
            raise ValueError("Graph has not been created. Call create_graph() first.")
        update_state =  load_latest_json(project_folder)
        note_taker_state = load_second_latest_json(project_folder)
        state = SprintPlanningState(
            sprint_planning_note=note_taker_state["note_final"],
            current_backlog=update_state["updated_sprint_backlog"],
            employee_profiles=update_state["employee_profiles"],
            sprint_start_date=update_state["sprint_start_date"],
            sprint_end_date=update_state["sprint_end_date"],
		)
        
        return self.workflow.invoke(state)

In [16]:
project_folder="../../../data_/project1/"
update_state = load_json(os.path.join(project_folder,"state-logs" , "update_state3.json"))
note_taker_state = load_json(os.path.join(project_folder,"state-logs" , "note_taker2.json"))

Successfully loaded JSON from ../../../data_/project1/state-logs\update_state3.json
Successfully loaded JSON from ../../../data_/project1/state-logs\note_taker2.json


In [19]:
update_state


{'meeting_note': 'Meeting Note: Project Kickoff and Initial Sprint Planning for HealthTrack Pro\n\nDate: June 17, 2024\nAttendees: Sarah Chen (Project Manager/Scrum Master), Alex Rodriguez (Senior Full-Stack Developer), Emily Watson (Frontend Developer), Michael Kim (Backend Developer), Olivia Martinez (QA Engineer/DevOps Specialist), Liam Foster (UI/UX Designer)\n\n1. Project Overview\n   - HealthTrack Pro: Comprehensive web application for personal health management\n   - Started on June 10, 2024\n   - Key components:\n     a. User Authentication and Profile Management\n     b. Activity Tracking\n     c. Nutrition Logging and Analysis\n     d. Health Metrics Dashboard\n     e. Goal Setting and Progress Tracking\n     f. Recommendation Engine\n     g. Social Features\n     h. Integration with popular fitness devices and apps\n\n2. Technology Stack\n   - Frontend: React.js, TypeScript, Tailwind CSS\n   - Backend: Node.js, Express.js, PostgreSQL\n   - DevOps: Docker, AWS, Jenkins\n   - 

In [20]:
note_taker_state


{'company_data': '# TechNova Solutions\n\n## Company Overview\nTechNova Solutions is a small, dynamic IT company specializing in web application development. With a team of 6 skilled professionals, they focus on creating innovative, user-friendly web solutions for small to medium-sized businesses.\n\n## Current Project: HealthTrack Pro\nTechNova is developing HealthTrack Pro, a comprehensive web application for personal health management. This application allows users to track their daily activities, nutrition, and health metrics, and provides insights and recommendations for a healthier lifestyle.\n\n## Team Structure\n1. ** Sarah Chen - Project Manager / Scrum Master**\n   - Oversees project progress, manages timelines, and facilitates communication\n   - Has a background in both frontend and backend development\n\n2. ** Alex Rodriguez - Senior Full-Stack Developer**\n   - Leads technical decisions and architecture design\n   - Proficient in both frontend and backend technologies\n\n

In [22]:
state  = SprintPlanningState(
    sprint_planning_note=note_taker_state["note_final"],
    current_backlog=update_state["updated_sprint_backlog"],
    sprint_state = "",
    employee_profiles=load_employee_profiles(os.path.join(project_folder,"employee-profiles.json")),
    selected_backlog_items=[],
    team_capacity={},
    tasks=[],
    task_assignments={},
    sprint_gantt_chart="",
    personal_gantt_charts={},
    personal_reports={},
    sprint_summary="",
    sprint_start_date="",
    sprint_end_date="",
)

In [26]:
backlog_item_selector_node(state)
state["selected_backlog_items"]

Based on the sprint planning note and the current backlog, I've selected appropriate items and generated new ones for the upcoming sprint. Here's the result in the requested JSON format:

{
  "selected_items": [
    {
      "id": "1",
      "type": "task",
      "title": "Set up initial project structure",
      "description": "Create and configure repositories for frontend and backend",
      "priority": "high",
      "justification": "This is a foundational task that needs to be completed first to enable other development work."
    },
    {
      "id": "2",
      "type": "task",
      "title": "Configure development environments",
      "description": "Set up ESLint, Prettier, TypeScript, and Tailwind CSS",
      "priority": "high",
      "justification": "Essential for maintaining code quality and consistency across the team."
    },
    {
      "id": "5",
      "type": "task",
      "title": "Create initial database schema",
      "description": "Design and implement the initial d

[]