# Reinforcement Fine-tuning of o4-mini for Log Classification

Code authored by: Shaw Talebi

### imports

In [1]:
from datasets import load_dataset, concatenate_datasets
import json

from functions import run_inference, calculate_metrics, confusion_matrix

from openai import OpenAI
from dotenv import load_dotenv
import os

In [2]:
# import sk from .env file
load_dotenv()

# connect to openai API
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

## 1) Data Prep

### load dataset

In [3]:
# load data from HF hub
ds = load_dataset("shawhin/HDFS_v1_blocks")
ds

DatasetDict({
    train: Dataset({
        features: ['block_id', 'text', 'label'],
        num_rows: 460048
    })
    dev: Dataset({
        features: ['block_id', 'text', 'label'],
        num_rows: 57506
    })
    test: Dataset({
        features: ['block_id', 'text', 'label'],
        num_rows: 57507
    })
})

### sample train and dev sets

In [4]:
num_train = 100
num_val = 50

# Split training data by class
train_anomalous = ds["train"].filter(lambda x: x["label"] == 1).shuffle(seed=42)
train_normal = ds["train"].filter(lambda x: x["label"] == 0).shuffle(seed=42)

val_anomalous = ds["dev"].filter(lambda x: x["label"] == 1).shuffle(seed=42)
val_normal = ds["dev"].filter(lambda x: x["label"] == 0).shuffle(seed=42)

# Balanced 50-50 split for training
train_sample = concatenate_datasets([
    train_anomalous.select(range(int(num_train/2))),
    train_normal.select(range(int(num_train/2)))
]).shuffle(seed=42)

# Balanced 80-20 split for validation
val_sample = concatenate_datasets([
    val_anomalous.select(range(int(num_val * 0.2))),
    val_normal.select(range(int(num_val * 0.8)))
]).shuffle(seed=42)

print(f"Training samples: {len(train_sample)}")
print(f"Validation samples: {len(val_sample)}")

Training samples: 100
Validation samples: 50


In [5]:
# check class distribution in samples
train_positive = sum(train_sample["label"])
val_positive = sum(val_sample["label"])

print(f"Train: {train_positive} anomalous, {len(train_sample) - train_positive} normal")
print(f"Val: {val_positive} anomalous, {len(val_sample) - val_positive} normal")

Train: 50 anomalous, 50 normal
Val: 10 anomalous, 40 normal


### format training data

In [6]:
def format_example(example):
    """Convert a dataset example to OpenAI RFT JSONL format."""
    return {
        "messages": [
            {"role": "developer", "content": "Classify this HDFS log block as anomalous or normal."},
            {"role": "user", "content": example["text"]}
        ],
        "label": bool(example["label"])  # True = anomalous, False = normal
    }

In [7]:
# format training data
train_data = [format_example(ex) for ex in train_sample]
val_data = [format_example(ex) for ex in val_sample]

# preview a sample
print(json.dumps(train_data[0], indent=2))

{
  "messages": [
    {
      "role": "developer",
      "content": "Classify this HDFS log block as anomalous or normal."
    },
    {
      "role": "user",
      "content": "INFO dfs.DataNode$DataXceiver: Receiving block blk_-3662429616261189633 src: /10.251.91.84:35214 dest: /10.251.91.84:50010\nINFO dfs.DataNode$DataXceiver: Receiving block blk_-3662429616261189633 src: /10.250.6.191:44011 dest: /10.250.6.191:50010\nINFO dfs.DataNode$DataXceiver: Receiving block blk_-3662429616261189633 src: /10.250.6.191:44448 dest: /10.250.6.191:50010\nINFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /user/root/randtxt5/_temporary/_task_200811101024_0012_m_000434_0/part-00434. blk_-3662429616261189633\nINFO dfs.DataNode$PacketResponder: PacketResponder 0 for block blk_-3662429616261189633 terminating\nINFO dfs.DataNode$PacketResponder: Received block blk_-3662429616261189633 of size 67108864 from /10.251.91.84\nINFO dfs.DataNode$PacketResponder: PacketResponder 1 for block blk_-36624296162

In [8]:
# write to JSONL files
train_file_path = "data/train_rft.jsonl"
val_file_path = "data/val_rft.jsonl"

with open(train_file_path, "w") as f:
    for item in train_data:
        f.write(json.dumps(item) + "\n")

with open(val_file_path, "w") as f:
    for item in val_data:
        f.write(json.dumps(item) + "\n")

print(f"Wrote {len(train_data)} training examples to {train_file_path}")
print(f"Wrote {len(val_data)} validation examples to {val_file_path}")

Wrote 100 training examples to data/train_rft.jsonl
Wrote 50 validation examples to data/val_rft.jsonl


### upload files to OpenAI

In [9]:
# upload training file
with open(train_file_path, "rb") as f:
    train_file = client.files.create(file=f, purpose="fine-tune")
print(f"Training file ID: {train_file.id}")

# upload validation file
with open(val_file_path, "rb") as f:
    val_file = client.files.create(file=f, purpose="fine-tune")
print(f"Validation file ID: {val_file.id}")

Training file ID: file-37hAAqyD6RUbyjhmRynvYw
Validation file ID: file-NMJmXjvcbiCE9sqKHyxgKP


## 2) Grader

### create grader

In [10]:
# define Python grader with asymmetric reward structure
# False negatives (missed anomalies) are penalized more heavily than false positives
grader = {
    "type": "python",
    "name": "custom_reward",
    "image_tag": "2025-05-08",
    "source": """
def grade(sample, item):
    try:
        pred = sample["output_json"]["Anomalous"]  # boolean from structured output
        actual = item["label"]  # boolean label from training data
        
        if pred == actual:
            return 1.0  # Correct prediction
        elif actual:  # False negative: missed anomaly
            return 0.0  # Worst outcome
        else:  # False positive: false alarm
            return 0.3  # Bad but not as bad
    except (KeyError, TypeError):
        return 0.0  # Malformed output treated as worst case
"""
}

In [11]:
# define structured output schema
response_format = {
    "type": "json_schema",
    "json_schema": {
        "name": "anomaly_flag",
        "strict": True,
        "schema": {
            "type": "object",
            "properties": {
                "Anomalous": {
                    "type": "boolean",
                    "description": "True if the log block is anomalous, false otherwise."
                },
                "reasoning": {
                    "type": "string",
                    "description": "Explanation of the decision regarding whether the block is anomalous."
                }
            },
            "required": ["Anomalous", "reasoning"],
            "additionalProperties": False
        }
    }
}

## 3) Train Model

### model training

In [12]:
# base model
base_model = "o4-mini-2025-04-16"

# create fine-tuning job with reinforcement learning
job = client.fine_tuning.jobs.create(
    model=base_model,
    suffix = "hdfs_classification",
    training_file=train_file.id,
    validation_file=val_file.id,
    method={
        "type": "reinforcement",
        "reinforcement": {
            "grader": grader,
            "response_format": response_format
        }
    }
)

print(f"Fine-tuning job created: {job.id}")

Fine-tuning job created: ftjob-FlGliAbBOzj7w1L0C7O3C7DG
