## Load & Prepare Data

In [3]:
import pandas as pd

train_df = pd.read_parquet("datasets/pm100/job_data_train.parquet")
test_df  = pd.read_parquet("datasets/pm100/job_data_test.parquet")

print("Train size:", len(train_df))
print("Test  size:", len(test_df))

Train size: 155764
Test  size: 66756


In [4]:
# Metadata columns included in retrieval context + prompt
META_COLS = [
    'User ID',
    'Requested Number of Nodes',
    'Requested Number of CPU',
    'Requested Number of GPU',
    'Total Requested Memory',
    'Requested Time',
    'Submit Time'
]

TARGET_COL = "Run Time"

In [5]:
def row_to_ora_text(row):
    """Convert a row into ORA metadata/prediction formatted text."""
    meta = ""
    for key in META_COLS:
        meta += f"metadata:{key}:{row[key]}\n"
    meta = meta.strip()

    # If prediction exists (train), include it
    if TARGET_COL in row and not pd.isna(row[TARGET_COL]):
        return meta + f"\nprediction:{row[TARGET_COL]}"
    else:
        return meta

In [6]:
train_texts = train_df.apply(row_to_ora_text, axis=1).tolist()
test_texts  = test_df.apply(row_to_ora_text,  axis=1).tolist()

## Build or Load the Vectorstore

In [7]:
from langchain_ollama import OllamaEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from tqdm import tqdm

import os

In [8]:
embedding = OllamaEmbeddings(model="nomic-embed-text")

In [9]:
vectorstore_path = "vectorstore_pm100"

def vectorstore_exists(path):
    return os.path.exists(os.path.join(path, "chroma.sqlite3"))

if vectorstore_exists(vectorstore_path):
    print("✓ Found existing vectorstore — loading from disk.")
    vectorstore = Chroma(
        embedding_function=embedding,
        persist_directory=vectorstore_path
    )

else:
    print("⏳ No vectorstore found — building from documents (slow).")

    train_documents = [
        Document(
            page_content=train_texts[i],
            metadata={"userid": int(train_df.iloc[i]["User ID"])}
        )
        for i in range(len(train_df))
    ]

    vectorstore = Chroma.from_documents(
        documents=train_documents,
        embedding=embedding,
        persist_directory=vectorstore_path
    )

    print("✓ Vectorstore built and automatically persisted to disk.")

✓ Found existing vectorstore — loading from disk.


## RAG Prompt Template

In [10]:
from langchain_core.prompts import ChatPromptTemplate

ORA_PROMPT =  """
You are an expert in job runtime duration prediction. 
There are some retrieved history jobs that similar to the job waitting to predict.
They are displaied through a diff format. 
{context}

Please predict the job runtime duration based on its matedata, script, and retrieved jobs.
The matedata and script of the job waitting to predict is:
{question}

Your output should only include the runtime, e.g. 10 s. 
It means that the script is likely to run for 10 seconds. 
Note: DO NOT OUTPUT ANYTHING OTHER THAN THE RUNTIME.
"""

prompt = ChatPromptTemplate.from_template(ORA_PROMPT)

## Load local model

In [11]:
from langchain_ollama import ChatOllama

llm = ChatOllama(
    model="qwen2.5:7b-instruct",
    temperature=0.1
)

## Numeric extraction from model output

In [12]:
import re

def extract_number(s):
    nums = re.findall(r"[0-9]+\.?[0-9]*", s)
    if len(nums) == 0:
        return None
    return float(nums[0])


## Prepare test list structure

In [13]:
test_list = []

for i in range(len(test_df)):
    r = test_df.iloc[i]
    test_list.append({
        "userid": int(r["User ID"]),
        "time_submit": float(r["Submit Time"]),
        "data": test_texts[i]
    })

## Run Inference Over the Test Set

In [14]:
output_path = "predictions/pm100"
os.makedirs(output_path, exist_ok=True)

output_file = os.path.join(output_path, "predictions.csv")

In [15]:
N = len(test_list)
B = 10  # number of batches
batch_size = N // B

In [16]:
if os.path.exists(output_file):
    print("✓ Found previous results, loading...")
    existing_results = pd.read_csv(output_file)
    processed_count = len(existing_results)
    print(f"✓ Resuming — {processed_count} samples already completed.")
else:
    print("No previous results found — starting fresh.")
    existing_results = pd.DataFrame()
    processed_count = 0

No previous results found — starting fresh.


In [17]:
import numpy as np
from tqdm import tqdm

K = 5  # same as your original top_k_generic / top_k_user

for batch_id in range(B):
    start = batch_id * batch_size
    end   = (batch_id + 1) * batch_size if batch_id < B-1 else N

    # Skip batches already completed
    if processed_count >= end:
        print(f"Batch {batch_id+1}/{B} already completed — skipping.")
        continue

    print(f"\n--- Processing batch {batch_id+1}/{B} ({start}:{end}) ---")

    batch_preds = []
    batch_gts   = []
    batch_rows  = []

    for i in tqdm(range(start, end)):
        if i < processed_count:
            continue  # already done from previous run

        item = test_list[i]
        question = item["data"]
        userid   = item["userid"]
        gt       = float(test_df.iloc[i][TARGET_COL])

        # -------- RETRIEVAL --------
        retrieved_docs = vectorstore.similarity_search(
            question, k=K, filter={"userid": userid}
        )

        context = "\n\n".join([doc.page_content for doc in retrieved_docs])

        # -------- LLM --------
        msg = prompt.invoke({"context": context, "question": question})
        response = llm.invoke(msg.messages)
        pred = extract_number(response.content)

        # fallback
        if pred is None or np.isnan(pred):
            pred = np.mean([
                float(d.page_content.split("prediction:")[1])
                for d in retrieved_docs if "prediction:" in d.page_content
            ])

        # store
        batch_preds.append(pred)
        batch_gts.append(gt)
        batch_rows.append({
            "userid":       userid,
            "time_submit":  item["time_submit"],
            "gt_runtime":   gt,
            "pred_runtime_ora": float(pred)
        })

        # -------- ONLINE UPDATE --------
        new_doc = Document(
            page_content=question + f"\nprediction:{gt}",
            metadata={"userid": userid}
        )
        vectorstore.add_documents([new_doc])

    # -------- Save batch results --------
    df_batch = pd.DataFrame(batch_rows)

    if os.path.exists(output_file):
        df_batch.to_csv(output_file, mode="a", index=False, header=False)
    else:
        df_batch.to_csv(output_file, index=False)

    print(f"✓ Saved batch {batch_id+1}/{B} to {output_file}")

    # update progress
    processed_count += len(batch_rows)



--- Processing batch 1/10 (0:6675) ---


100%|████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:58:30<00:00,  1.07s/it]


✓ Saved batch 1/10 to results/pm100\predictions.csv

--- Processing batch 2/10 (6675:13350) ---


100%|████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:56:48<00:00,  1.05s/it]


✓ Saved batch 2/10 to results/pm100\predictions.csv

--- Processing batch 3/10 (13350:20025) ---


100%|████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:59:22<00:00,  1.07s/it]


✓ Saved batch 3/10 to results/pm100\predictions.csv

--- Processing batch 4/10 (20025:26700) ---


100%|████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:55:35<00:00,  1.04s/it]


✓ Saved batch 4/10 to results/pm100\predictions.csv

--- Processing batch 5/10 (26700:33375) ---


100%|████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:46:02<00:00,  1.05it/s]


✓ Saved batch 5/10 to results/pm100\predictions.csv

--- Processing batch 6/10 (33375:40050) ---


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:47:25<00:00,  1.04it/s]


✓ Saved batch 6/10 to results/pm100\predictions.csv

--- Processing batch 7/10 (40050:46725) ---


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:50:07<00:00,  1.01it/s]


✓ Saved batch 7/10 to results/pm100\predictions.csv

--- Processing batch 8/10 (46725:53400) ---


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:55:04<00:00,  1.03s/it]


✓ Saved batch 8/10 to results/pm100\predictions.csv

--- Processing batch 9/10 (53400:60075) ---


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6675/6675 [1:59:28<00:00,  1.07s/it]


✓ Saved batch 9/10 to results/pm100\predictions.csv

--- Processing batch 10/10 (60075:66756) ---


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6681/6681 [2:12:32<00:00,  1.19s/it]

✓ Saved batch 10/10 to results/pm100\predictions.csv





## Save Predictions With User ID + Submit Time

In [None]:
# Save the results in the file with all other predictions

df = pd.read_csv("predictions/pm100/predictions.csv")

df["pred_runtime_llm"] = y_pred

df.to_csv("predictions/pm100/predictions.csv", index=False)

## Load Predictions

In [18]:
print("\n✓ Loading full saved results...")
res = pd.read_csv(output_file)

gts = res["gt_runtime"].values
preds = res["pred_runtime_ora"].values


✓ Loading full saved results...


## Compute Metrics

In [19]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import MinMaxScaler

In [20]:
scaler = MinMaxScaler()
scaler.fit(train_df[TARGET_COL].values.reshape(-1, 1))

gt_scaled   = scaler.transform(np.array(gts).reshape(-1,1)).reshape(-1)
pred_scaled = scaler.transform(np.array(preds).reshape(-1,1)).reshape(-1)

In [21]:
mae_raw = mean_absolute_error(gts, preds)
mse_raw = mean_squared_error(gts, preds)
r2_raw  = r2_score(gts, preds)

print("Raw MAE:", mae_raw)
print("Raw MSE:", mse_raw)
print("Raw R2:",  r2_raw)

Raw MAE: 215.2998082569357
Raw MSE: 7173488.371082749
Raw R2: 0.9586862104268463


In [22]:
mae = mean_absolute_error(gt_scaled, pred_scaled)
mse = mean_squared_error(gt_scaled, pred_scaled)
r2  = r2_score(gt_scaled, pred_scaled)

print("MAE:", mae)
print("MSE:", mse)
print("R2:",  r2)

MAE: 0.002492472890216899
MSE: 0.0009613997374420564
R2: 0.9586862104268462


In [23]:
def convert_to_hms(time_value, unit="seconds"):
    if unit == "minutes":
        time_value *= 60  # Convert minutes to seconds
    hours, remainder = divmod(time_value, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}"

In [24]:
print(f"Average prediction error (hh:mm:ss)  :  {convert_to_hms(int(mae_raw))}")

Average prediction error (hh:mm:ss)  :  00:03:35
