In [19]:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

In [20]:
class BatsmanState(TypedDict):
    runs:int
    balls:int
    fours:int
    sixes:int

    sr:float
    ballsPerBoundary:float
    boundaryPercentage:float
    summary:str
    

In [21]:
def calculate_sr(state:BatsmanState):
    sr=(state['runs']/state["balls"])*100

    return {"sr":sr}

def calculate_bpb(state:BatsmanState):
    bpb=state["balls"]/(state["fours"]+state["sixes"])

    return {"ballsPerBoundary":bpb} # partial state as parallel workflow

def calculate_bp(state:BatsmanState):
    bp=(((state["fours"]*4)+(state["sixes"]*6))/state["balls"])*100

    return {'boundaryPercentage':bp}

def generate_summary(state:BatsmanState):
    summary=f"""
Strike Rate - {state['sr']} 
Ball Per Boundary - {state['ballsPerBoundary']} 
Boundary Percentage - {state['boundaryPercentage']}
"""
    
    return {"summary":summary}

In [22]:
graph=StateGraph(BatsmanState)

graph.add_node("calculate_sr",calculate_sr)
graph.add_node("calculate_bpb",calculate_bpb)
graph.add_node("calculate_bp",calculate_bp)
graph.add_node("generate_summary",generate_summary)

graph.add_edge(START,"calculate_sr")
graph.add_edge(START,"calculate_bpb")
graph.add_edge(START,"calculate_bp")

graph.add_edge("calculate_sr","generate_summary")
graph.add_edge("calculate_bpb","generate_summary")
graph.add_edge("calculate_bp","generate_summary")

graph.add_edge("generate_summary",END)

workflow=graph.compile()

In [23]:
initial_state={
    "runs":600,
    "balls":300,
    "fours":6,
    "sixes":60
}

final_state=workflow.invoke(initial_state)

In [24]:
print(final_state)

{'runs': 600, 'balls': 300, 'fours': 6, 'sixes': 60, 'sr': 200.0, 'ballsPerBoundary': 4.545454545454546, 'boundaryPercentage': 128.0, 'summary': '\nStrike Rate - 200.0 \nBall Per Boundary - 4.545454545454546 \nBoundary Percentage - 128.0\n'}
