In [None]:
# ! pip install "unsloth==2025.3.19" vllm wandb
# ! pip uninstall -y typing_extensions &&  pip install typing_extensions==4.11.0

In [None]:
import regex as re

In [None]:
import sys
sys.path.append("/srv/chawak/planning-with-llms/src")

from shared import llm_utils
from shared import unifiedplanning_blocksworld as bw
from shared import prompts
from shared import planbench as pb

import random, os
from datetime import datetime

In [None]:
#from unsloth import FastLanguageModel

In [None]:
base_dir='/srv/chawak/planning-with-llms/results/SFT'
cpt=20430
model_path=base_dir+f'/training/training_22-05/checkpoint-{cpt}'
base_model='/home/chawak/huggingface/models--google--gemma-3-12b-it'

In [None]:
model=FastLanguageModel.from_pretrained(
    model_name="google/gemma-3-12b-it",
    load_in_4bit=False,
    fast_inference=True,
    max_lora_rank=64
    )

In [None]:
lora_model = FastLanguageModel.get_peft_model(
    peft_model[0],
    r=64,  # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],  # Remove QKVO if out of memory
    lora_alpha=64,
    use_gradient_checkpointing="unsloth",  # Enable long context finetuning
    random_state=3407,
)

In [None]:
def make_action_tuples(response):

    #get action tuples
    action_tuples=llm_utils.parse_action_tuples(response)
    if not action_tuples:
        print(f'\n\n Imparsable response')
        return False
    
    return action_tuples

In [None]:
def response2plan(problem,init,goal,response,):
    
    model_plan=None
    #create a blocksworld problem 
    init=prompts.parse_init(init)
    goal=prompts.parse_goal(goal)
    pb.parse_planbench_initial_condition(problem, init)
    pb.parse_planbench_goal_state(problem, goal)
    print(f'\n\nBlocksworld Problem Initial Values:{problem.initial_values}')
    print(f'\nBlocksworld Problem Goal State:{problem.goals}')

    #get model plan
    action_tuples=make_action_tuples(response)
    model_plan=problem.GRPOcreate_plan_from_tuples(action_tuples)
    
    print(f"Model responded with this plan: {model_plan}")
    return action_tuples,model_plan

In [None]:
def apply_plan(problem,model_plan):
    
    #validate and apply    
    simulation=problem.create_seq_simulation()
    va_counter=0
    valid_state,va_counter,distance2goal=problem.GRPO_check_and_apply(simulation,model_plan)
    #check distance to goal for last valid state
    d=problem.actions_to_goal(valid_state)
    distance2goal.append(d)
    
    return valid_state,va_counter,distance2goal

In [None]:
def goal_proximity(distance2goal:list) -> list:
    
    scores=[]
    
    for idx,d in enumerate(distance2goal):
        
        if d == 0: #goal state reached
            break

        if(idx<=len(distance2goal)-2):
            print("-"*20,f"IDX:{idx}")
            d_old=d
            d_new=distance2goal[idx+1]
            score=max(0,(d_old-d_new)*5)
            scores.append(score)
 
    return scores

In [None]:
def get_plan_len(plan):    
    
    plan_len=len(plan.split('\n'))-2

    return plan_len

In [None]:
goal_proximity([4, 3, 2, 1, 0, 1, 0])

In [None]:
#bonus rewards: correct termination, optimality
def bonus_reward(problem,valid_state,plan_len,goldplanlen):
    
    score=0
    #check if plan terminates to goal
    if problem.terminate(valid_state):
        score+=20 #20 for reaching goal
        
        #check if model plan matches the gold plan length
        if plan_len==goldplanlen: score+=10

        #check if model plan superceeds the gold plan length
        if plan_len<goldplanlen: score+=15

    return score

In [None]:
def gold_plan_reward(problem, gold_plan):
    
    score=0

    #get gold-plan plan object
    action_tuples=make_action_tuples(gold_plan)
    gold_plan_ob=problem.GRPOcreate_plan_from_tuples(action_tuples)
    current_state,va_counter,distance2goal= apply_plan(problem,gold_plan_ob)

    #score + 2 for each valid action
    score+= va_counter*2
    #score for when we are moving towards goal state
    print(f"GOLD-PLAN distance metric is: {distance2goal}")
    distance_scores=goal_proximity(distance2goal)
    print(f"GOLD-PLAN proximity scores are: {distance_scores}")
    score+=sum(distance_scores)
    print(f"GOLD-PLAN scores without bonus reward{score}")

    print('GOLD PLAN SCORE IS: ',score)
    return score

In [None]:
format_pattern=r"<think>(.*?)<\/think>.*?\[PLAN\](.*?)\[PLAN END\]"
format_pattern=re.compile(format_pattern, re.DOTALL)

In [None]:
#scores structural adherence of response
def format_reward(prompts,completions, **kwargs) -> list[float]:

    responses= [completion[0]["content"] for completion in completions]
    return [0.0 if not format_pattern.match(response) else 10.0 for response in responses]

In [None]:
#scores plan correctness
def response_score(response,init,goal,gold_plan):

    print('-'*20,"Entering response score",'-'*20)
    #define blocksworld problem 
    problem=bw.BlocksworldProblem()
    score = 0

    #VALID ACTION REWARD:
    #extract valid actions from response
    action_tuples,model_plan=response2plan(problem=problem,init=init,goal=goal,response=response)
    current_state,valid_action_count,distance2goal=apply_plan(problem=problem,model_plan=model_plan)    
    print(f'Distance 2 goal metric:{distance2goal}')
    gold_plan_len=get_plan_len(gold_plan)
    #limited to total number of actions possible, to avoid reward hacking
    score+=min(gold_plan_len*2,valid_action_count*2)
    print(f"Valid actions score is: {score}")

    #PROXIMITY REWARD:
    distance_scores=goal_proximity(distance2goal)
    print(f"Proximity scores are: {distance_scores}")
    score+=sum(distance_scores)

    print(f"Scores without bonus reward: {score}")

    #normalize model-plan's reward by the gold plan reward to 0-60 range
    gold_plan_score=gold_plan_reward(problem=problem,gold_plan=gold_plan)
    score=(score/gold_plan_score)*60

    #bonus rewards for correct termination and optimality
    score+=bonus_reward(problem,current_state,len(action_tuples),gold_plan_len)
    print(f"Scores with bonus reward: {score}")
    
    return score

In [None]:
init="(('violet', 'teal', 'brown'),)"
goal="(('violet',), ('teal',), ('brown',))"
response='''[PLAN]
unstack the brown block from on top of the teal block
blah unstack blah
put down the brown block
[PLAN END]'''
gold_plan='''[PLAN]
unstack the brown block from on top of the teal block
put down the brown block
unstack the teal block from on top of the violet block
put down the teal block
[PLAN END]'''

In [None]:
response_score(response=response,init=init,goal=goal,gold_plan=gold_plan)

In [None]:
def plan_reward(prompts,completions,init, goal, gold_plan):
    responses= [completion[0]["content"] for completion in completions]
    
    scores=[]
    for response in responses:
        
        score=0
        score=response_score(response=response, init=init, goal=goal, gold_plan=gold_plan)
        scores.append(score)            
    return scores


In [None]:
format_pattern=r".*?\[PLAN\](.*?)\[PLAN END\]"
format_pattern=re.compile(format_pattern, re.DOTALL)

In [None]:
input='''i have gibrish here
[PLAN]
unstack the brown block from on top of the teal block
put down the brown block
unstack the teal block from on top of the violet block
put down the teal block
[PLAN END]'''

In [None]:
print(0.0 if not format_pattern.match(input) else 10.0)

In [None]:
import math 

def compute_mean_over_k_steps(metric_list,k_steps):

    metric_chunks=[]

    #split metrics list by step-size
    for i in range(0, len(metric_list), k_steps):
        metric_chunk=metric_list[i:i+k_steps]
        metric_chunks.append(metric_chunk)

    sums=[sum(metric_chunk) for metric_chunk in metric_chunks]
    mean_values=[sum/k_steps for sum in sums]

    print(mean_values)

    return mean_values


In [None]:
compute_mean_over_k_steps(list(range(10)),3)

In [None]:
import regex as re

In [None]:
format_pattern=r"\[PLAN\](.*?)\[PLAN END\]"
format_pattern=re.compile(format_pattern, re.DOTALL)

def format_reward(completions, **kwargs) -> list[float]:
    
    scores=[]
    responses= [completion for completion in completions]
    # print(f"Example propmt is {prompts[0]}")

    for response in responses:

        score=0
        #hard format reward
        if format_pattern.match(response):
            score = 10
        #soft format reward
        elif format_pattern.search(response):
            score = 2
        scores.append(score)

        print(f"Format reward for this response is: {score}")
    
    #for evaluation and epoch logging


    return scores

In [4]:
completions=['''[PLAN]
unstack the red block from on top of the green block
put down the red block
[PLAN END]''']

In [5]:
format_reward(completions=completions)

Format reward for this response is: 10


[10]